from __future__ import annotations
import requests
import transaction
from datetime import datetime # noqa: TC003
from decimal import Decimal
from functools import cached_property
from markupsafe import Markup
from onegov.core.crypto import random_token
from onegov.core.orm.mixins import dict_property, meta_property
from onegov.core.utils import append_query_param
from onegov.pay import log
from onegov.pay.errors import SaferpayPaymentError, SaferpayApiError
from onegov.pay.models.payment import Payment
from onegov.pay.models.payment_provider import PaymentProvider
from onegov.pay.utils import Price
from pydantic import (
field_serializer,
AliasChoices,
AliasGenerator,
AliasPath,
BaseModel,
ConfigDict,
)
from pydantic.alias_generators import to_pascal
from pydantic_extra_types.currency_code import Currency # noqa: TC002
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import flag_modified
from uuid import UUID, uuid4, uuid5
from wtforms.widgets import html_params
from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
from onegov.core.request import CoreRequest
from onegov.pay.types import FeePolicy
from sqlalchemy.orm import relationship
from transaction.interfaces import ITransaction
# our payment ids are generated by using the token and a UUID namespace
[docs]
WORLDLINE_NAMESPACE = UUID('56334bdd-3a9a-423f-9469-7572e7f8ae1c')
[docs]
SAFERPAY_SPEC_VERSION = '1.44'
[docs]
class SaferpayAmount(BaseModel):
[docs]
model_config = ConfigDict(
frozen=True,
alias_generator=AliasGenerator(
validation_alias=lambda path: AliasChoices(
to_pascal(path),
path
),
serialization_alias=to_pascal
)
)
[docs]
currency_code: Currency
@field_serializer('value', when_used='json')
[docs]
def serialize_value(self, value: int) -> str:
return str(value)
# NOTE: This is just the subset of properties we care about
[docs]
class SaferpayTransaction(BaseModel):
[docs]
model_config = ConfigDict(
frozen=True,
alias_generator=AliasGenerator(
validation_alias=lambda path: AliasChoices(
AliasPath('Transaction', to_pascal(path)),
path,
),
serialization_alias=to_pascal
)
)
[docs]
six_transaction_reference: str
[docs]
type: Literal['PAYMENT', 'REFUND']
[docs]
status: Literal[
'AUTHORIZED',
'CANCELED',
'CAPTURED',
'PENDING',
]
[docs]
order_id: str | None = None
[docs]
capture_id: str | None = None
[docs]
def raise_if_cannot_be_captured(self, type: str = 'PAYMENT') -> None:
if self.type != type:
raise SaferpayPaymentError('incorrect transaction type')
if self.status != 'AUTHORIZED':
raise SaferpayPaymentError('payment was not authorized')
[docs]
class SaferpayClient:
def __init__(
self,
customer_id: str | None,
terminal_id: str | None,
api_username: str | None,
api_password: str | None,
sandbox: bool = False
) -> None:
[docs]
self.customer_id = customer_id
[docs]
self.terminal_id = terminal_id
[docs]
self.session = requests.Session()
if api_username:
self.session.auth = (api_username, api_password or '')
[docs]
self.base_url = (
f'https://{"test" if sandbox else "www"}.saferpay.com/api'
)
[docs]
def raise_for_status(self, res: requests.Response) -> None:
if res.status_code in (400, 402):
error = res.json()
raise SaferpayApiError(
error['ErrorName'],
error['ErrorMessage'],
error['Behavior'],
error.get('ErrorDetail', None)
)
res.raise_for_status()
[docs]
def transaction_reference(
self,
*,
transaction_id: str | None = None,
order_id: str | None = None,
) -> dict[str, Any]:
if transaction_id is not None:
assert order_id is None
return {'TransactionId': transaction_id}
else:
assert order_id is not None
return {'OrderId': order_id}
[docs]
def capture_reference(
self,
*,
capture_id: str | None = None,
transaction_id: str | None = None,
order_id: str | None = None,
) -> dict[str, Any]:
if capture_id is not None:
return {'CaptureId': capture_id}
elif transaction_id is not None:
return {'TransactionId': transaction_id}
else:
assert order_id is not None
return {'OrderId': order_id}
[docs]
def inquire(
self,
*,
transaction_id: str | None = None,
order_id: str | None = None,
) -> SaferpayTransaction:
res = self.session.post(
f'{self.base_url}/Payment/v1/Transaction/Inquire',
json={
'RequestHeader': self.request_header(),
'TransactionReference': self.transaction_reference(
transaction_id=transaction_id,
order_id=order_id,
)
},
timeout=(5, 10)
)
self.raise_for_status(res)
return SaferpayTransaction.model_validate_json(res.content)
[docs]
def init(
self,
amount: Decimal,
currency: str,
return_url: str,
description: str,
**extra: Any,
) -> tuple[str, str]:
""" Initializes a transaction and returns a token and redirect url. """
res = self.session.post(
f'{self.base_url}/Payment/v1/PaymentPage/Initialize',
json={
'RequestHeader': self.request_header(),
'TerminalId': self.terminal_id,
'Payment': {
'Description': description,
'OrderId': str(uuid4()),
'Amount': {
'Value': str(round(amount * 100)),
'CurrencyCode': currency,
},
},
'ReturnUrl': {
'Url': return_url,
},
**extra,
},
timeout=(5, 10)
)
self.raise_for_status(res)
data = res.json()
return data['Token'], data['RedirectUrl']
[docs]
def assert_transaction(self, token: str) -> SaferpayTransaction:
""" Check the status of a transaction using its token. """
res = self.session.post(
f'{self.base_url}/Payment/v1/PaymentPage/Assert',
json={
'RequestHeader': self.request_header(),
'Token': token
},
timeout=(5, 10)
)
self.raise_for_status(res)
return SaferpayTransaction.model_validate_json(res.content)
[docs]
def capture(
self,
tx: SaferpayTransaction,
expected_type: str = 'PAYMENT',
) -> tuple[bool, str | None]:
"""
Capture a transaction given its transaction or order id.
Returns a `tuple` containing a `bool` of whether or not the
transaction was captured or is still pending, and its
corresponding `capture_id`, when given.
"""
tx.raise_if_cannot_be_captured(expected_type)
res = self.session.post(
f'{self.base_url}/Payment/v1/Transaction/Capture',
json={
'RequestHeader': self.request_header(),
'TransactionReference': self.transaction_reference(
transaction_id=tx.id
),
},
timeout=(5, 10)
)
self.raise_for_status(res)
data = res.json()
return data['Status'] == 'CAPTURED', data.get('CaptureId')
[docs]
def assert_capture(
self,
*,
capture_id: str | None = None,
transaction_id: str | None = None,
order_id: str | None = None,
) -> tuple[bool, str]:
"""
Check the status of a once pending capture.
Returns a `tuple` containing a `bool` of whether or not the
transaction was captured or is still pending, and its
corresponding `transaction_id`.
"""
res = self.session.post(
f'{self.base_url}/Payment/v1/Transaction/Capture',
json={
'RequestHeader': self.request_header(),
'CaptureReference': self.capture_reference(
capture_id=capture_id,
transaction_id=transaction_id,
order_id=order_id,
),
},
timeout=(5, 10)
)
self.raise_for_status(res)
data = res.json()
if transaction_id is None:
transaction_id = data['transactionId']
elif transaction_id != data['transactionId']:
raise SaferpayPaymentError(
'Mismatching transaction_id in assert capture'
)
return data['Status'] == 'CAPTURED', transaction_id
[docs]
def refund(self, tx: SaferpayTransaction) -> SaferpayTransaction | None:
""" Refund the given transaction. May return a refund transaction. """
if tx.type != 'PAYMENT':
raise SaferpayPaymentError('incorrect transaction type')
if tx.status == 'AUTHORIZED':
# transaction can be cancelled
res = self.session.post(
f'{self.base_url}/Payment/v1/Transaction/Cancel',
json={
'RequestHeader': self.request_header(),
'TransactionReference': self.transaction_reference(
transaction_id=tx.id,
)
},
timeout=(5, 10)
)
self.raise_for_status(res)
return None
elif tx.status in ('PENDING', 'CAPTURED'):
# actual refund required
res = self.session.post(
f'{self.base_url}/Payment/v1/Transaction/Refund',
json={
'RequestHeader': self.request_header(),
'Refund': {
'Amount': tx.amount.model_dump(by_alias=True),
'OrderId': str(uuid4()),
'RestrictRefundAmountToCapturedAmount': True,
},
'CaptureReference': self.capture_reference(
capture_id=tx.capture_id,
transaction_id=tx.id,
order_id=tx.order_id,
),
},
timeout=(5, 10)
)
self.raise_for_status(res)
refund_tx = SaferpayTransaction.model_validate_json(res.content)
if refund_tx.status == 'AUTHORIZED':
try:
# try to capture but if we can't don't stress about it
# they can try to manually capture it
self.capture(refund_tx, 'REFUND')
except Exception:
log.exception('Failed to capture Saferpay refund')
return refund_tx
elif tx.status == 'CANCELED':
# transaction is already canceled
return None
else:
raise AssertionError('invalid transaction state')
[docs]
class SaferpayCaptureManager:
""" Captures an authorized Saferpay charge when the transaction finishes.
If there is an error during this step, it is logged, but the transaction
still continues successfully.
The user is then supposed to manually capture the charge.
"""
[docs]
transaction_manager = transaction.manager
def __init__(
self,
client: SaferpayClient,
tx: SaferpayTransaction,
) -> None:
@classmethod
[docs]
def capture_charge(
cls,
client: SaferpayClient,
tx: SaferpayTransaction,
) -> None:
transaction.get().join(cls(client, tx))
[docs]
def sortKey(self) -> str:
return 'saferpay_capture'
[docs]
def tpc_vote(self, transaction: ITransaction) -> None:
# make sure the transaction is still alive
tx = self.client.inquire(transaction_id=self.tx.id)
# make sure nothing weird happened
tx.raise_if_cannot_be_captured()
assert tx.order_id == self.tx.order_id
assert tx.amount == self.tx.amount
[docs]
def tpc_finish(self, transaction: ITransaction) -> None:
try:
self.client.capture(self.tx)
except Exception:
# we can never fail or we might end up with an inconsistent
# database -> so must swallow any errors and report them
log.exception(
'Saferpay capture with transaction_id %s failed',
self.tx.id
)
[docs]
def commit(self, transaction: ITransaction) -> None:
pass
[docs]
def abort(self, transaction: ITransaction) -> None:
pass
[docs]
def tpc_begin(self, transaction: ITransaction) -> None:
pass
[docs]
def tpc_abort(self, transaction: ITransaction) -> None:
pass
[docs]
class WorldlineFeePolicy:
""" All Worldline fee calculations in one place. """
# NOTE: This fixed fee currently assumes CHF
@classmethod
[docs]
def from_amount(cls, amount: Decimal | float) -> float:
return round(float(amount) * cls.percentage + cls.fixed, 2)
@classmethod
[docs]
def compensate(cls, amount: Decimal | float) -> float:
return round((float(amount) + cls.fixed) / (1 - cls.percentage), 2)
[docs]
class SaferpayPayment(Payment):
[docs]
__mapper_args__ = {'polymorphic_identity': 'worldline_saferpay'}
[docs]
fee_policy: FeePolicy = WorldlineFeePolicy
#: the order_id of the transaction
[docs]
order_id: dict_property[str | None] = meta_property()
#: The SIX transaction reference
[docs]
six_transaction_reference: dict_property[str | None] = meta_property()
#: the capture_id of the transaction
[docs]
capture_id: dict_property[str | None] = meta_property()
#: the transaction ids of any refunds
[docs]
refunds: dict_property[list[str]] = meta_property(default=list)
if TYPE_CHECKING:
# our provider should always be WordlineSaferpayProvdider, we could
# assert if we really wanted to make sure, but it would
# add a lot of assertions...
[docs]
provider: relationship[WorldlineSaferpay]
# NOTE: We don't seem to get information about fees from datatrans
# so the only thing we know for sure is that a customer will
# have to pay 0.29 CHF for every transaction to datatrans.
@property
[docs]
def fee(self) -> Decimal:
""" The calculated fee or the effective fee if available. """
assert self.amount is not None
return Decimal(self.fee_policy.from_amount(self.amount))
@property
[docs]
def remote_url(self) -> str:
if self.provider.livemode:
base = 'https://www.saferpay.com/BO/Commerce/JournalDetail?gxid={}'
else:
base = 'https://test.saferpay.com/BO/Commerce/JournalDetail?gxid={}'
return base.format(self.remote_id)
@property
[docs]
def transaction(self) -> SaferpayTransaction:
assert self.remote_id
return self.provider.client.inquire(transaction_id=self.remote_id)
[docs]
def refund(self) -> bool:
if self.refunds:
existing_refund = self.provider.client.inquire(
transaction_id=self.refunds[-1]
)
if existing_refund.status in ('PENDING', 'CAPTURED'):
return False
elif existing_refund.status == 'AUTHORIZED':
self.provider.client.capture(existing_refund, 'REFUND')
return False
# the refund must have been canceled, so let's make a new one
refund = self.provider.client.refund(self.transaction)
self.state = 'cancelled'
if refund is not None:
self.refunds = [*self.refunds, refund.id]
flag_modified(self, 'meta')
return True
[docs]
def sync(self, remote_obj: SaferpayTransaction | None = None) -> None:
if self.refunds:
try:
refund_tx = self.provider.client.inquire(
transaction_id=self.refunds[-1]
)
except Exception:
log.exception(
'Failed to synchronize Saferpay refund transaction %s',
self.refunds[-1]
)
return
if refund_tx.status == 'CAPTURED':
# the refund already went through
self.state = 'cancelled'
return
elif refund_tx.status not in ('AUTHORIZED', 'PENDING'):
# the refund is still pending, let's not update yet
return
# the refund failed or got canceled, so we need to use
# the status of the original transaction
if remote_obj is None:
try:
remote_obj = self.transaction
except Exception as exc:
if isinstance(exc, SaferpayApiError) and exc.name in (
'TRANSACTION_ABORTED',
'TRANSACTION_DECLINED',
):
self.state = 'failed'
return
log.exception(
'Failed to synchronize Saferpay transaction %s',
self.remote_id
)
return
if self.capture_id != remote_obj.capture_id:
self.capture_id = remote_obj.capture_id
match remote_obj.status:
case 'CAPTURED':
self.state = 'paid'
case 'PENDING':
# TODO: Do we want a separate state for this?
pass
case 'CANCELED':
self.state = 'cancelled'
case _:
self.state = 'open'
[docs]
class WorldlineSaferpay(PaymentProvider[SaferpayPayment]):
[docs]
__mapper_args__ = {'polymorphic_identity': 'worldline_saferpay'}
[docs]
fee_policy: FeePolicy = WorldlineFeePolicy
#: Whether or not this is a Sandbox account
[docs]
sandbox: dict_property[bool] = meta_property(default=False)
#: The public Worldline customer name
[docs]
customer_name: dict_property[str | None] = meta_property()
#: The Worldline customer id
[docs]
customer_id: dict_property[str | None] = meta_property()
#: The Worldline terminal id
[docs]
terminal_id: dict_property[str | None] = meta_property()
#: The username used for API calls
[docs]
api_username: dict_property[str | None] = meta_property()
#: The password used for API calls
[docs]
api_password: dict_property[str | None] = meta_property()
#: Should the fee be charged to the customer or not?
[docs]
charge_fee_to_customer: dict_property[bool | None] = meta_property()
[docs]
def adjust_price(self, price: Price | None) -> Price | None:
if price and self.charge_fee_to_customer:
new_price = self.fee_policy.compensate(price.amount)
new_fee = self.fee_policy.from_amount(new_price)
return Price(
new_price,
price.currency,
new_fee,
price.credit_card_payment
)
return price
@property
[docs]
def livemode(self) -> bool:
return not self.sandbox
@property
[docs]
def payment_class(self) -> type[SaferpayPayment]:
return SaferpayPayment
@property
[docs]
def title(self) -> str:
return 'Worldline Saferpay'
@property
[docs]
def url(self) -> str:
return f'https://{"test" if self.sandbox else "www"}.saferpay.com/BO/'
@property
[docs]
def public_identity(self) -> str:
return self.customer_name or ''
@property
[docs]
def identity(self) -> str | None:
return self.customer_id
@cached_property
[docs]
def client(self) -> SaferpayClient:
return SaferpayClient(
customer_id=self.customer_id,
terminal_id=self.terminal_id,
api_username=self.api_username,
api_password=self.api_password,
sandbox=self.sandbox,
)
@property
[docs]
def connected(self) -> bool:
# we're using an idempotent API endpoint here
try:
res = self.client.session.get(
f'{self.client.base_url}/rest/customers/'
f'{self.customer_id}/license',
headers={
'Saferpay-ApiVersion': SAFERPAY_SPEC_VERSION,
'Saferpay-RequestId': str(uuid4()),
},
json={},
timeout=(5, 10),
)
self.client.raise_for_status(res)
except Exception:
log.exception('Saferpay connection failed')
return False
else:
return True
[docs]
def charge(
self,
amount: Decimal,
currency: str,
token: str # transaction_id
) -> SaferpayPayment:
# ensure the transaction can be settled
tx = self.client.assert_transaction(token)
if tx.type != 'PAYMENT' or tx.status not in ('AUTHORIZED', 'CAPTURED'):
raise SaferpayPaymentError('Invalid transaction type')
if (
tx.amount.currency_code != currency
or tx.amount.value != round(amount * 100)
):
raise SaferpayPaymentError('Invalid payment amount')
if not tx.order_id:
raise SaferpayPaymentError('order_id is missing')
session = object_session(self)
payment = self.payment(
id=uuid5(WORLDLINE_NAMESPACE, tx.order_id),
amount=amount,
currency=currency,
remote_id=tx.id,
order_id=tx.order_id,
six_transaction_reference=tx.six_transaction_reference,
state='paid' if tx.status == 'CAPTURED' else 'open'
)
if tx.status == 'AUTHORIZED':
# we need to capture the charge
assert self.customer_id is not None
SaferpayCaptureManager.capture_charge(self.client, tx)
# we do *not* want to lose this information, so even though the
# caller should make sure the payment is stored, we make sure
session.add(payment)
return payment
[docs]
def get_token(self, request: CoreRequest) -> str | None:
""" Extracts this provider's specific token from the request. """
if not (signed_nonce := request.GET.get('saferpay_nonce')):
return None
app = request.app
salt = app.sign(request.browser_session._token, 'saferpay_salt')
nonce = app.unsign(signed_nonce, salt)
if not nonce or not (signed_token := app.cache.get(nonce)):
return None
# make sure the nonce can't be reused
app.cache.delete(nonce)
return app.unsign(signed_token, salt)
[docs]
def sync(self) -> None:
# FIXME: This may lead to complaints, since it might look like
# we're polling payments
session = object_session(self)
query = session.query(self.payment_class)
query = query.filter(self.payment_class.state == 'open')
for payment in query:
payment.sync()
@property
[docs]
def payment_via_get(self) -> bool:
return True