Source code for pay.models.payment_providers.worldline_saferpay

from __future__ import annotations

import requests
import transaction

from datetime import datetime  # noqa: TC003
from decimal import Decimal
from functools import cached_property
from markupsafe import Markup
from onegov.core.crypto import random_token
from onegov.core.orm.mixins import dict_property, meta_property
from onegov.core.utils import append_query_param
from onegov.pay import log
from onegov.pay.errors import SaferpayPaymentError, SaferpayApiError
from onegov.pay.models.payment import Payment
from onegov.pay.models.payment_provider import PaymentProvider
from onegov.pay.utils import Price
from pydantic import (
    field_serializer,
    AliasChoices,
    AliasGenerator,
    AliasPath,
    BaseModel,
    ConfigDict,
)
from pydantic.alias_generators import to_pascal
from pydantic_extra_types.currency_code import Currency  # noqa: TC002
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import flag_modified
from uuid import UUID, uuid4, uuid5
from wtforms.widgets import html_params


from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.core.request import CoreRequest
    from onegov.pay.types import FeePolicy
    from sqlalchemy.orm import relationship
    from transaction.interfaces import ITransaction


# our payment ids are generated by using the token and a UUID namespace
[docs] WORLDLINE_NAMESPACE = UUID('56334bdd-3a9a-423f-9469-7572e7f8ae1c')
[docs] SAFERPAY_SPEC_VERSION = '1.44'
[docs] class SaferpayAmount(BaseModel):
[docs] model_config = ConfigDict( frozen=True, alias_generator=AliasGenerator( validation_alias=lambda path: AliasChoices( to_pascal(path), path ), serialization_alias=to_pascal ) )
[docs] currency_code: Currency
[docs] value: int
@field_serializer('value', when_used='json')
[docs] def serialize_value(self, value: int) -> str: return str(value)
# NOTE: This is just the subset of properties we care about
[docs] class SaferpayTransaction(BaseModel):
[docs] model_config = ConfigDict( frozen=True, alias_generator=AliasGenerator( validation_alias=lambda path: AliasChoices( AliasPath('Transaction', to_pascal(path)), path, ), serialization_alias=to_pascal ) )
[docs] id: str
[docs] date: datetime
[docs] six_transaction_reference: str
[docs] type: Literal['PAYMENT', 'REFUND']
[docs] status: Literal[ 'AUTHORIZED', 'CANCELED', 'CAPTURED', 'PENDING', ]
[docs] amount: SaferpayAmount
[docs] order_id: str | None = None
[docs] capture_id: str | None = None
[docs] def raise_if_cannot_be_captured(self, type: str = 'PAYMENT') -> None: if self.type != type: raise SaferpayPaymentError('incorrect transaction type') if self.status != 'AUTHORIZED': raise SaferpayPaymentError('payment was not authorized')
[docs] class SaferpayClient: def __init__( self, customer_id: str | None, terminal_id: str | None, api_username: str | None, api_password: str | None, sandbox: bool = False ) -> None:
[docs] self.customer_id = customer_id
[docs] self.terminal_id = terminal_id
[docs] self.session = requests.Session()
if api_username: self.session.auth = (api_username, api_password or '')
[docs] self.base_url = ( f'https://{"test" if sandbox else "www"}.saferpay.com/api' )
[docs] def request_header(self) -> dict[str, Any]: return { 'SpecVersion': SAFERPAY_SPEC_VERSION, 'CustomerId': self.customer_id, 'RequestId': str(uuid4()), # we don't currently retry requests 'RetryIndicator': 0, }
[docs] def raise_for_status(self, res: requests.Response) -> None: if res.status_code in (400, 402): error = res.json() raise SaferpayApiError( error['ErrorName'], error['ErrorMessage'], error['Behavior'], error.get('ErrorDetail', None) ) res.raise_for_status()
[docs] def transaction_reference( self, *, transaction_id: str | None = None, order_id: str | None = None, ) -> dict[str, Any]: if transaction_id is not None: assert order_id is None return {'TransactionId': transaction_id} else: assert order_id is not None return {'OrderId': order_id}
[docs] def capture_reference( self, *, capture_id: str | None = None, transaction_id: str | None = None, order_id: str | None = None, ) -> dict[str, Any]: if capture_id is not None: return {'CaptureId': capture_id} elif transaction_id is not None: return {'TransactionId': transaction_id} else: assert order_id is not None return {'OrderId': order_id}
[docs] def inquire( self, *, transaction_id: str | None = None, order_id: str | None = None, ) -> SaferpayTransaction: res = self.session.post( f'{self.base_url}/Payment/v1/Transaction/Inquire', json={ 'RequestHeader': self.request_header(), 'TransactionReference': self.transaction_reference( transaction_id=transaction_id, order_id=order_id, ) }, timeout=(5, 10) ) self.raise_for_status(res) return SaferpayTransaction.model_validate_json(res.content)
[docs] def init( self, amount: Decimal, currency: str, return_url: str, description: str, **extra: Any, ) -> tuple[str, str]: """ Initializes a transaction and returns a token and redirect url. """ res = self.session.post( f'{self.base_url}/Payment/v1/PaymentPage/Initialize', json={ 'RequestHeader': self.request_header(), 'TerminalId': self.terminal_id, 'Payment': { 'Description': description, 'OrderId': str(uuid4()), 'Amount': { 'Value': str(round(amount * 100)), 'CurrencyCode': currency, }, }, 'ReturnUrl': { 'Url': return_url, }, **extra, }, timeout=(5, 10) ) self.raise_for_status(res) data = res.json() return data['Token'], data['RedirectUrl']
[docs] def assert_transaction(self, token: str) -> SaferpayTransaction: """ Check the status of a transaction using its token. """ res = self.session.post( f'{self.base_url}/Payment/v1/PaymentPage/Assert', json={ 'RequestHeader': self.request_header(), 'Token': token }, timeout=(5, 10) ) self.raise_for_status(res) return SaferpayTransaction.model_validate_json(res.content)
[docs] def capture( self, tx: SaferpayTransaction, expected_type: str = 'PAYMENT', ) -> tuple[bool, str | None]: """ Capture a transaction given its transaction or order id. Returns a `tuple` containing a `bool` of whether or not the transaction was captured or is still pending, and its corresponding `capture_id`, when given. """ tx.raise_if_cannot_be_captured(expected_type) res = self.session.post( f'{self.base_url}/Payment/v1/Transaction/Capture', json={ 'RequestHeader': self.request_header(), 'TransactionReference': self.transaction_reference( transaction_id=tx.id ), }, timeout=(5, 10) ) self.raise_for_status(res) data = res.json() return data['Status'] == 'CAPTURED', data.get('CaptureId')
[docs] def assert_capture( self, *, capture_id: str | None = None, transaction_id: str | None = None, order_id: str | None = None, ) -> tuple[bool, str]: """ Check the status of a once pending capture. Returns a `tuple` containing a `bool` of whether or not the transaction was captured or is still pending, and its corresponding `transaction_id`. """ res = self.session.post( f'{self.base_url}/Payment/v1/Transaction/Capture', json={ 'RequestHeader': self.request_header(), 'CaptureReference': self.capture_reference( capture_id=capture_id, transaction_id=transaction_id, order_id=order_id, ), }, timeout=(5, 10) ) self.raise_for_status(res) data = res.json() if transaction_id is None: transaction_id = data['transactionId'] elif transaction_id != data['transactionId']: raise SaferpayPaymentError( 'Mismatching transaction_id in assert capture' ) return data['Status'] == 'CAPTURED', transaction_id
[docs] def refund(self, tx: SaferpayTransaction) -> SaferpayTransaction | None: """ Refund the given transaction. May return a refund transaction. """ if tx.type != 'PAYMENT': raise SaferpayPaymentError('incorrect transaction type') if tx.status == 'AUTHORIZED': # transaction can be cancelled res = self.session.post( f'{self.base_url}/Payment/v1/Transaction/Cancel', json={ 'RequestHeader': self.request_header(), 'TransactionReference': self.transaction_reference( transaction_id=tx.id, ) }, timeout=(5, 10) ) self.raise_for_status(res) return None elif tx.status in ('PENDING', 'CAPTURED'): # actual refund required res = self.session.post( f'{self.base_url}/Payment/v1/Transaction/Refund', json={ 'RequestHeader': self.request_header(), 'Refund': { 'Amount': tx.amount.model_dump(by_alias=True), 'OrderId': str(uuid4()), 'RestrictRefundAmountToCapturedAmount': True, }, 'CaptureReference': self.capture_reference( capture_id=tx.capture_id, transaction_id=tx.id, order_id=tx.order_id, ), }, timeout=(5, 10) ) self.raise_for_status(res) refund_tx = SaferpayTransaction.model_validate_json(res.content) if refund_tx.status == 'AUTHORIZED': try: # try to capture but if we can't don't stress about it # they can try to manually capture it self.capture(refund_tx, 'REFUND') except Exception: log.exception('Failed to capture Saferpay refund') return refund_tx elif tx.status == 'CANCELED': # transaction is already canceled return None else: raise AssertionError('invalid transaction state')
[docs] class SaferpayCaptureManager: """ Captures an authorized Saferpay charge when the transaction finishes. If there is an error during this step, it is logged, but the transaction still continues successfully. The user is then supposed to manually capture the charge. """
[docs] transaction_manager = transaction.manager
def __init__( self, client: SaferpayClient, tx: SaferpayTransaction, ) -> None:
[docs] self.client = client
[docs] self.tx = tx
@classmethod
[docs] def capture_charge( cls, client: SaferpayClient, tx: SaferpayTransaction, ) -> None: transaction.get().join(cls(client, tx))
[docs] def sortKey(self) -> str: return 'saferpay_capture'
[docs] def tpc_vote(self, transaction: ITransaction) -> None: # make sure the transaction is still alive tx = self.client.inquire(transaction_id=self.tx.id) # make sure nothing weird happened tx.raise_if_cannot_be_captured() assert tx.order_id == self.tx.order_id assert tx.amount == self.tx.amount
[docs] def tpc_finish(self, transaction: ITransaction) -> None: try: self.client.capture(self.tx) except Exception: # we can never fail or we might end up with an inconsistent # database -> so must swallow any errors and report them log.exception( 'Saferpay capture with transaction_id %s failed', self.tx.id )
[docs] def commit(self, transaction: ITransaction) -> None: pass
[docs] def abort(self, transaction: ITransaction) -> None: pass
[docs] def tpc_begin(self, transaction: ITransaction) -> None: pass
[docs] def tpc_abort(self, transaction: ITransaction) -> None: pass
[docs] class WorldlineFeePolicy: """ All Worldline fee calculations in one place. """ # NOTE: This fixed fee currently assumes CHF
[docs] percentage = 0.017
[docs] fixed = 0.19
@classmethod
[docs] def from_amount(cls, amount: Decimal | float) -> float: return round(float(amount) * cls.percentage + cls.fixed, 2)
@classmethod
[docs] def compensate(cls, amount: Decimal | float) -> float: return round((float(amount) + cls.fixed) / (1 - cls.percentage), 2)
[docs] class SaferpayPayment(Payment):
[docs] __mapper_args__ = {'polymorphic_identity': 'worldline_saferpay'}
[docs] fee_policy: FeePolicy = WorldlineFeePolicy
#: the order_id of the transaction
[docs] order_id: dict_property[str | None] = meta_property()
#: The SIX transaction reference
[docs] six_transaction_reference: dict_property[str | None] = meta_property()
#: the capture_id of the transaction
[docs] capture_id: dict_property[str | None] = meta_property()
#: the transaction ids of any refunds
[docs] refunds: dict_property[list[str]] = meta_property(default=list)
if TYPE_CHECKING: # our provider should always be WordlineSaferpayProvdider, we could # assert if we really wanted to make sure, but it would # add a lot of assertions...
[docs] provider: relationship[WorldlineSaferpay]
# NOTE: We don't seem to get information about fees from datatrans # so the only thing we know for sure is that a customer will # have to pay 0.29 CHF for every transaction to datatrans. @property
[docs] def fee(self) -> Decimal: """ The calculated fee or the effective fee if available. """ assert self.amount is not None return Decimal(self.fee_policy.from_amount(self.amount))
@property
[docs] def remote_url(self) -> str: if self.provider.livemode: base = 'https://www.saferpay.com/BO/Commerce/JournalDetail?gxid={}' else: base = 'https://test.saferpay.com/BO/Commerce/JournalDetail?gxid={}' return base.format(self.remote_id)
@property
[docs] def transaction(self) -> SaferpayTransaction: assert self.remote_id return self.provider.client.inquire(transaction_id=self.remote_id)
[docs] def refund(self) -> bool: if self.refunds: existing_refund = self.provider.client.inquire( transaction_id=self.refunds[-1] ) if existing_refund.status in ('PENDING', 'CAPTURED'): return False elif existing_refund.status == 'AUTHORIZED': self.provider.client.capture(existing_refund, 'REFUND') return False # the refund must have been canceled, so let's make a new one refund = self.provider.client.refund(self.transaction) self.state = 'cancelled' if refund is not None: self.refunds = [*self.refunds, refund.id] flag_modified(self, 'meta') return True
[docs] def sync(self, remote_obj: SaferpayTransaction | None = None) -> None: if self.refunds: try: refund_tx = self.provider.client.inquire( transaction_id=self.refunds[-1] ) except Exception: log.exception( 'Failed to synchronize Saferpay refund transaction %s', self.refunds[-1] ) return if refund_tx.status == 'CAPTURED': # the refund already went through self.state = 'cancelled' return elif refund_tx.status not in ('AUTHORIZED', 'PENDING'): # the refund is still pending, let's not update yet return # the refund failed or got canceled, so we need to use # the status of the original transaction if remote_obj is None: try: remote_obj = self.transaction except Exception as exc: if isinstance(exc, SaferpayApiError) and exc.name in ( 'TRANSACTION_ABORTED', 'TRANSACTION_DECLINED', ): self.state = 'failed' return log.exception( 'Failed to synchronize Saferpay transaction %s', self.remote_id ) return if self.capture_id != remote_obj.capture_id: self.capture_id = remote_obj.capture_id match remote_obj.status: case 'CAPTURED': self.state = 'paid' case 'PENDING': # TODO: Do we want a separate state for this? pass case 'CANCELED': self.state = 'cancelled' case _: self.state = 'open'
[docs] class WorldlineSaferpay(PaymentProvider[SaferpayPayment]):
[docs] __mapper_args__ = {'polymorphic_identity': 'worldline_saferpay'}
[docs] fee_policy: FeePolicy = WorldlineFeePolicy
#: Whether or not this is a Sandbox account
[docs] sandbox: dict_property[bool] = meta_property(default=False)
#: The public Worldline customer name
[docs] customer_name: dict_property[str | None] = meta_property()
#: The Worldline customer id
[docs] customer_id: dict_property[str | None] = meta_property()
#: The Worldline terminal id
[docs] terminal_id: dict_property[str | None] = meta_property()
#: The username used for API calls
[docs] api_username: dict_property[str | None] = meta_property()
#: The password used for API calls
[docs] api_password: dict_property[str | None] = meta_property()
#: Should the fee be charged to the customer or not?
[docs] charge_fee_to_customer: dict_property[bool | None] = meta_property()
[docs] def adjust_price(self, price: Price | None) -> Price | None: if price and self.charge_fee_to_customer: new_price = self.fee_policy.compensate(price.amount) new_fee = self.fee_policy.from_amount(new_price) return Price( new_price, price.currency, new_fee, price.credit_card_payment ) return price
@property
[docs] def livemode(self) -> bool: return not self.sandbox
@property
[docs] def payment_class(self) -> type[SaferpayPayment]: return SaferpayPayment
@property
[docs] def title(self) -> str: return 'Worldline Saferpay'
@property
[docs] def url(self) -> str: return f'https://{"test" if self.sandbox else "www"}.saferpay.com/BO/'
@property
[docs] def public_identity(self) -> str: return self.customer_name or ''
@property
[docs] def identity(self) -> str | None: return self.customer_id
@cached_property
[docs] def client(self) -> SaferpayClient: return SaferpayClient( customer_id=self.customer_id, terminal_id=self.terminal_id, api_username=self.api_username, api_password=self.api_password, sandbox=self.sandbox, )
@property
[docs] def connected(self) -> bool: # we're using an idempotent API endpoint here try: res = self.client.session.get( f'{self.client.base_url}/rest/customers/' f'{self.customer_id}/license', headers={ 'Saferpay-ApiVersion': SAFERPAY_SPEC_VERSION, 'Saferpay-RequestId': str(uuid4()), }, json={}, timeout=(5, 10), ) self.client.raise_for_status(res) except Exception: log.exception('Saferpay connection failed') return False else: return True
[docs] def charge( self, amount: Decimal, currency: str, token: str # transaction_id ) -> SaferpayPayment: # ensure the transaction can be settled tx = self.client.assert_transaction(token) if tx.type != 'PAYMENT' or tx.status not in ('AUTHORIZED', 'CAPTURED'): raise SaferpayPaymentError('Invalid transaction type') if ( tx.amount.currency_code != currency or tx.amount.value != round(amount * 100) ): raise SaferpayPaymentError('Invalid payment amount') if not tx.order_id: raise SaferpayPaymentError('order_id is missing') session = object_session(self) payment = self.payment( id=uuid5(WORLDLINE_NAMESPACE, tx.order_id), amount=amount, currency=currency, remote_id=tx.id, order_id=tx.order_id, six_transaction_reference=tx.six_transaction_reference, state='paid' if tx.status == 'CAPTURED' else 'open' ) if tx.status == 'AUTHORIZED': # we need to capture the charge assert self.customer_id is not None SaferpayCaptureManager.capture_charge(self.client, tx) # we do *not* want to lose this information, so even though the # caller should make sure the payment is stored, we make sure session.add(payment) return payment
[docs] def get_token(self, request: CoreRequest) -> str | None: """ Extracts this provider's specific token from the request. """ if not (signed_nonce := request.GET.get('saferpay_nonce')): return None app = request.app salt = app.sign(request.browser_session._token, 'saferpay_salt') nonce = app.unsign(signed_nonce, salt) if not nonce or not (signed_token := app.cache.get(nonce)): return None # make sure the nonce can't be reused app.cache.delete(nonce) return app.unsign(signed_token, salt)
[docs] def checkout_button( self, label: str, amount: Decimal | None, currency: str | None, complete_url: str, request: CoreRequest, **extra: Any ) -> Markup: """ Generates the html for the checkout button. """ if amount is None: amount = Decimal(0) currency = 'CHF' if currency is None else currency else: assert currency is not None # append a saferpay nonce to the complete url app = request.app nonce = random_token() salt = app.sign(request.browser_session._token, 'saferpay_salt') signed_nonce = app.sign(nonce, salt) complete_url = append_query_param( complete_url, 'saferpay_nonce', signed_nonce ) token, redirect_url = self.client.init( amount=amount, currency=currency, return_url=complete_url, description=extra['description'], ) # store the token using the nonce app.cache.set(nonce, app.sign(token, salt)) params: dict[str, Any] = {'data-redirect-url': redirect_url} return Markup( '<button class="checkout-button saferpay button"' '{html_params}>{label}</button>' ).format( label=label, html_params=Markup(html_params(**params)) # nosec: B704 )
[docs] def sync(self) -> None: # FIXME: This may lead to complaints, since it might look like # we're polling payments session = object_session(self) query = session.query(self.payment_class) query = query.filter(self.payment_class.state == 'open') for payment in query: payment.sync()
@property
[docs] def payment_via_get(self) -> bool: return True