Source code for pay.models.payment_providers.datatrans

from __future__ import annotations

import requests
import transaction

from decimal import Decimal
from functools import cached_property
from markupsafe import Markup
from onegov.core.orm.mixins import dict_property, meta_property
from onegov.core.utils import append_query_param
from onegov.pay import log
from onegov.pay.errors import DatatransPaymentError, DatatransApiError
from onegov.pay.models.payment import Payment
from onegov.pay.models.payment_provider import PaymentProvider
from onegov.pay.utils import Price
from pydantic import AliasChoices, AliasPath, BaseModel, ConfigDict, Field
from pydantic.alias_generators import to_camel
from pydantic_extra_types.currency_code import Currency  # noqa: TC002
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import flag_modified
from uuid import UUID, uuid4, uuid5
from wtforms.widgets import html_params


from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.core.request import CoreRequest
    from onegov.pay.types import FeePolicy
    from sqlalchemy.orm import relationship
    from transaction.interfaces import ITransaction


# our payment ids are generated by using the token and a UUID namespace
[docs] DATATRANS_NAMESPACE = UUID('e4d0beb6-c1e7-4a90-859f-421491470e46')
# NOTE: This is just the subset of properties we care about
[docs] class DatatransTransaction(BaseModel):
[docs] model_config = ConfigDict( frozen=True, alias_generator=to_camel )
[docs] transaction_id: str
[docs] merchant_id: str
[docs] type: Literal['payment', 'credit', 'card_check']
[docs] status: Literal[ 'initialized', 'challenge_required', 'challenge_ongoing', 'authenticated', 'authorized', 'settled', 'canceled', 'transmitted', 'failed' ]
[docs] refno: str
[docs] currency: Currency
[docs] amount: int | None = Field( default=None, validation_alias=AliasChoices( AliasPath('detail', 'authorize', 'amount'), AliasPath('detail', 'settle', 'amount'), ) )
[docs] def raise_if_cannot_be_settled(self) -> None: if self.type != 'payment': raise DatatransPaymentError('incorrect transaction type') if self.status != 'authorized': raise DatatransPaymentError('payment was not authorized') if not self.amount: raise DatatransPaymentError('could not retrieve payment amount')
[docs] class DatatransClient: def __init__( self, merchant_id: str | None, password: str | None, sandbox: bool = False ) -> None:
[docs] self.merchant_id = merchant_id
[docs] self.session = requests.Session()
if merchant_id is not None: self.session.auth = (merchant_id, password or '')
[docs] self.base_url = ( f'https://api.{"sandbox." if sandbox else ""}datatrans.com/v1' )
[docs] def raise_for_status(self, res: requests.Response) -> None: if res.status_code == 400: error = res.json()['error'] raise DatatransApiError( error['code'], error['message'], error.get('terminal', False) ) res.raise_for_status()
[docs] def status(self, transaction_id: str) -> DatatransTransaction: res = self.session.get( f'{self.base_url}/transactions/{transaction_id}' ) self.raise_for_status(res) return DatatransTransaction.model_validate_json(res.content)
[docs] def init( self, amount: Decimal | None = None, currency: str = 'CHF', **extra: Any, ) -> str: """ Initializes a transaction and returns the transaction_id. """ if amount is not None: payload = { 'currency': currency, 'amount': round(amount * 100), 'refno': str(uuid4()), **extra } else: payload = { 'currency': currency, 'refno': str(uuid4()), **extra } res = self.session.post( f'{self.base_url}/transactions', json=payload ) self.raise_for_status(res) return res.json()['transactionId']
[docs] def settle(self, tx: DatatransTransaction) -> None: tx.raise_if_cannot_be_settled() if tx.merchant_id != self.merchant_id: raise DatatransPaymentError('invalid merchant_id') res = self.session.post( f'{self.base_url}/transactions/{tx.transaction_id}/settle', json={ 'amount': tx.amount, 'currency': tx.currency, 'refno': tx.refno, } ) self.raise_for_status(res)
[docs] def refund(self, tx: DatatransTransaction) -> str | None: if tx.merchant_id != self.merchant_id: raise DatatransPaymentError('invalid merchant_id') if tx.type != 'payment': raise DatatransPaymentError('incorrect transaction type') if tx.status in ('settled', 'authorized'): # transaction can be cancelled res = self.session.post( f'{self.base_url}/transactions/{tx.transaction_id}/cancel', json={} ) self.raise_for_status(res) return None elif tx.status == 'transmitted': # actual refund required res = self.session.post( f'{self.base_url}/transactions/{tx.transaction_id}/credit', json={ 'currency': tx.currency, 'amount': tx.amount, 'refno': str(uuid4()), } ) self.raise_for_status(res) return res.json()['transactionId'] elif tx.status == 'canceled': # transaction is already canceled return None else: raise AssertionError('invalid transaction state')
[docs] class DatatransSettleManager: """ Settles an open datatrans charge when the transaction finishes. If there is an error during this step, it is logged, but the transaction still continues successfully. The user is then supposed to manually settle the charge. """
[docs] transaction_manager = transaction.manager
def __init__( self, client: DatatransClient, tx: DatatransTransaction, ) -> None:
[docs] self.client = client
[docs] self.tx = tx
@classmethod
[docs] def settle_charge( cls, client: DatatransClient, tx: DatatransTransaction, ) -> None: transaction.get().join(cls(client, tx))
[docs] def sortKey(self) -> str: return 'datatrans_settle'
[docs] def tpc_vote(self, transaction: ITransaction) -> None: # make sure the transaction is still alive tx = self.client.status(self.tx.transaction_id) # make sure nothing weird happened tx.raise_if_cannot_be_settled() assert tx.merchant_id == self.client.merchant_id assert tx.refno == self.tx.refno assert tx.currency == self.tx.currency
[docs] def tpc_finish(self, transaction: ITransaction) -> None: try: self.client.settle(self.tx) except Exception: # we can never fail or we might end up with an inconsistent # database -> so must swallow any errors and report them log.exception( 'Datatrans settle with transaction_id %s failed', self.tx.transaction_id )
[docs] def commit(self, transaction: ITransaction) -> None: pass
[docs] def abort(self, transaction: ITransaction) -> None: pass
[docs] def tpc_begin(self, transaction: ITransaction) -> None: pass
[docs] def tpc_abort(self, transaction: ITransaction) -> None: pass
[docs] class DatatransFeePolicy: """ All Datarans fee calculations in one place. """ # TODO: There may be an additional fee based on the selected # payment method based on the contract signed with the # responsible financial partner, like e.g. Twint, but # we can't really predict that (although maybe there is # a good lower/upper bound we can use) # NOTE: This fixed fee currently assumes CHF
[docs] fixed = 0.29
@classmethod
[docs] def from_amount(cls, amount: Decimal | float) -> float: return cls.fixed
@classmethod
[docs] def compensate(cls, amount: Decimal | float) -> float: return round(float(amount) + cls.fixed, 2)
[docs] class DatatransPayment(Payment):
[docs] __mapper_args__ = {'polymorphic_identity': 'datatrans'}
[docs] fee_policy: FeePolicy = DatatransFeePolicy
#: the refno of the transaction
[docs] refno: dict_property[str | None] = meta_property()
#: the transaction ids of any refunds
[docs] refunds: dict_property[list[str]] = meta_property(default=list)
if TYPE_CHECKING: # our provider should always be DatatransProvdider, we could # assert if we really wanted to make sure, but it would # add a lot of assertions...
[docs] provider: relationship[DatatransProvider]
# NOTE: We don't seem to get information about fees from datatrans # so the only thing we know for sure is that a customer will # have to pay 0.29 CHF for every transaction to datatrans. @property
[docs] def fee(self) -> Decimal: """ The calculated fee or the effective fee if available. """ assert self.amount is not None return Decimal(self.fee_policy.from_amount(self.amount))
@property
[docs] def remote_url(self) -> str: if self.provider.livemode: base = 'https://admin.datatrans.com/TrDetail.jsp?tid={}' else: base = 'https://admin.sandbox.datatrans.com/TrDetail.jsp?tid={}' return base.format(self.remote_id)
@property
[docs] def transaction(self) -> DatatransTransaction: assert self.remote_id return self.provider.client.status(self.remote_id)
[docs] def refund(self) -> str | None: refund = self.provider.client.refund(self.transaction) self.state = 'cancelled' if refund is not None: self.refunds = [*self.refunds, refund] flag_modified(self, 'meta') return refund
[docs] def sync(self, remote_obj: DatatransTransaction | None = None) -> None: if self.refunds: refund_tx = self.provider.client.status(self.refunds[-1]) if refund_tx.status in ('settled', 'transmitted'): # the refund already went through self.state = 'cancelled' return elif refund_tx.status not in ('failed', 'canceled'): # the refund is still pending, let's not update yet return # the refund failed or got canceled, so we need to use # the status of the original transaction if remote_obj is None: remote_obj = self.transaction match remote_obj.status: case 'transmitted': self.state = 'paid' case 'settled': # TODO: Do we want a separate state for this? pass case 'canceled': self.state = 'cancelled' case 'failed': self.state = 'failed' case _: self.state = 'open'
[docs] class DatatransProvider(PaymentProvider[DatatransPayment]):
[docs] __mapper_args__ = {'polymorphic_identity': 'datatrans'}
[docs] fee_policy: FeePolicy = DatatransFeePolicy
#: Whether or not this is a Sandbox account
[docs] sandbox: dict_property[bool] = meta_property(default=False)
#: The public Datatrans merchant name
[docs] merchant_name: dict_property[str | None] = meta_property()
#: The Datatrans merchant id
[docs] merchant_id: dict_property[str | None] = meta_property()
#: The password used for API calls
[docs] password: dict_property[str | None] = meta_property()
#: The HMAC key used for signing webhook calls
[docs] webhook_key: dict_property[str | None] = meta_property()
#: Should the fee be charged to the customer or not?
[docs] charge_fee_to_customer: dict_property[bool | None] = meta_property()
[docs] def adjust_price(self, price: Price | None) -> Price | None: if price and self.charge_fee_to_customer: new_price = self.fee_policy.compensate(price.amount) new_fee = self.fee_policy.from_amount(new_price) return Price( new_price, price.currency, new_fee, price.credit_card_payment ) return price
@property
[docs] def livemode(self) -> bool: return not self.sandbox
@property
[docs] def payment_class(self) -> type[DatatransPayment]: return DatatransPayment
@property
[docs] def title(self) -> str: return 'Datatrans'
@property
[docs] def url(self) -> str: if self.sandbox: return 'https://admin.sandbox.datatrans.com/' else: return 'https://admin.datatrans.com/'
@property
[docs] def public_identity(self) -> str: return self.merchant_name or ''
@property
[docs] def identity(self) -> str | None: return self.merchant_id
@cached_property
[docs] def client(self) -> DatatransClient: return DatatransClient(self.merchant_id, self.password, self.sandbox)
@property
[docs] def connected(self) -> bool: # NOTE: It seems like the only way to check this is to initialize # a transaction. I'm not sure whether we can always omit the # price or if it depends on which payment methods are enabled. # It's not great, that we have to provide dummy urls here... try: self.client.init( redirect={ 'successUrl': self.url, 'cancelUrl': self.url, 'errorUrl': self.url, } ) except Exception: log.exception('Datrans connection failed') return False else: return True
[docs] def charge( self, amount: Decimal, currency: str, token: str # transaction_id ) -> DatatransPayment: # ensure the transaction can be settled tx = self.client.status(token) tx.raise_if_cannot_be_settled() if tx.currency != currency or tx.amount != round(amount * 100): raise DatatransPaymentError('Invalid payment amount') if not tx.refno: raise DatatransPaymentError('refno is missing') session = object_session(self) payment = self.payment( id=uuid5(DATATRANS_NAMESPACE, tx.refno), amount=amount, currency=currency, remote_id=token, refno=tx.refno, state='open' ) assert self.merchant_id is not None DatatransSettleManager.settle_charge(self.client, tx) # we do *not* want to lose this information, so even though the # caller should make sure the payment is stored, we make sure session.add(payment) return payment
[docs] def get_token(self, request: CoreRequest) -> str | None: """ Extracts this provider's specific token from the request. """ token = request.params.get('datatransTrxId') if not isinstance(token, str): return None return token
[docs] def checkout_button( self, label: str, amount: Decimal | None, currency: str | None, complete_url: str, request: CoreRequest, **extra: Any ) -> Markup: """ Generates the html for the checkout button. """ if amount is None: amount = Decimal(0) currency = 'CHF' if currency is None else currency else: assert currency is not None extra_params = {} if locale := request.locale: extra_params['language'] = locale.split('_')[0] complete_url = append_query_param( complete_url, 'session_nonce', request.get_session_nonce() ) transaction_id = self.client.init( amount=amount, currency=currency, autoSettle=False, redirect={ 'successUrl': complete_url, 'cancelUrl': request.url, 'errorUrl': complete_url, 'startTarget': '_top', 'returnTarget': '_top', 'method': 'POST', }, webhook={ 'url': request.link(self, 'webhook'), }, **extra_params ) params: dict[str, Any] = {'data-transaction-id': transaction_id} if self.sandbox: params['data-sandbox'] = True return Markup( '<button class="checkout-button datatrans button"' '{html_params}>{label}</button>' ).format( label=label, html_params=Markup(html_params(**params)) # noqa: RUF035 )
[docs] def sync(self) -> None: session = object_session(self) query = session.query(self.payment_class) # TODO: We currenly only sync open payments, although it may # be possible for paid payments to transition to failed # or cancelled if there's a chargeback of some kind... # Maybe it would be better to just sync any payment # where a change occurred in the past six months. # This would keep the volume of API requests bounded # while yielding more accurate results, but maybe the # webhook is better suited for that... query = query.filter(self.payment_class.state == 'open') for payment in query: payment.sync()