from __future__ import annotations
from collections import OrderedDict
from onegov.core.security import Private
from onegov.core.utils import append_query_param
from onegov.form import merge_forms
from onegov.org import OrgApp, _
from onegov.org.forms import (
DateRangeForm, ExportForm, PaymentSearchForm, TicketInvoiceSearchForm)
from onegov.org.layout import (
PaymentCollectionLayout, TicketInvoiceCollectionLayout)
from onegov.org.mail import send_ticket_mail
from onegov.org.models import PaymentMessage
from onegov.core.elements import Link
from sedate import align_range_to_day, standardize_date, as_datetime
from onegov.org.pdf.ticket import TicketsPdf
from onegov.pay import Payment
from onegov.pay import PaymentCollection
from onegov.pay import PaymentProviderCollection
from onegov.pay.errors import DatatransApiError, SaferpayApiError
from onegov.pay.models.payment import ManualPayment
from onegov.pay.models.payment_providers.datatrans import DatatransPayment
from onegov.pay.models.payment_providers.stripe import StripePayment
from onegov.pay.models.payment_providers.worldline_saferpay import (
SaferpayPayment)
from onegov.ticket import TicketInvoice, TicketInvoiceItem
from onegov.ticket import TicketInvoiceCollection
from sqlalchemy.orm import joinedload
from webob.response import Response
from webob import exc, Response as WebobResponse
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Sequence
from datetime import datetime
from onegov.core.types import JSON_ro, RenderData
from onegov.org.request import OrgRequest
from onegov.pay.types import AnyPayableBase
from sqlalchemy.orm import Session
from typing import type_check_only
from webob import Response
@type_check_only
[docs]
EMAIL_SUBJECTS = {
'marked-as-paid': _('Your payment has been received'),
'marked-as-unpaid': _('Your payment has been withdrawn'),
'refunded': _('Your payment has been refunded')
}
[docs]
def send_ticket_notifications(
payment: Payment,
request: OrgRequest,
change: str
) -> None:
ticket = payment.ticket
if not ticket:
return
# create a notification in the chat
PaymentMessage.create(payment, ticket, request, change)
if change == 'captured':
return
# send an e-mail
email = ticket.snapshot.get('email') or ticket.handler.email
assert email is not None
send_ticket_mail(
request=request,
template='mail_payment_change.pt',
subject=EMAIL_SUBJECTS[change],
receivers=(email, ),
ticket=ticket,
content={
'model': ticket,
'payment': payment,
'change': change
}
)
[docs]
def handle_pdf_response(
self: TicketInvoiceCollection,
request: OrgRequest
) -> WebobResponse:
# Export a pdf of all invoiced, without pagination limit
tickets = [
ticket
for invoice in self.by_invoiced(True).subset()
if (ticket := invoice.ticket) is not None
]
if not tickets:
request.warning(_('No tickets found for PDF generation'))
return request.redirect(request.class_link(PaymentCollection))
filename = 'Payments.pdf'
multi_pdf = TicketsPdf.from_tickets(request, tickets)
return Response(
multi_pdf.read(),
content_type='application/pdf',
content_disposition=f'inline; filename={filename}'
)
@OrgApp.form(
model=PaymentCollection,
template='payments.pt',
form=PaymentSearchForm,
permission=Private
)
[docs]
def view_payments(
self: PaymentCollection,
request: OrgRequest,
form: PaymentSearchForm,
layout: PaymentCollectionLayout | None = None
) -> RenderData | WebobResponse:
layout = layout or PaymentCollectionLayout(self, request)
if form.submitted(request):
form.update_model(self)
return request.redirect(request.link(self))
request.include('payments')
if not form.errors:
form.apply_model(self)
providers = {
provider.id: provider
for provider in PaymentProviderCollection(request.session).query()
}
# Process reservation dates into a display-ready format
reservation_dates = self.reservation_dates_by_batch()
reservation_dates_formatted = {
payment_id: (
f"{layout.format_date(start, 'date')} - "
f"{layout.format_date(end, 'date')}"
) if start != end else layout.format_date(start, 'date')
for payment_id, (start, end) in reservation_dates.items()
}
return {
'title': _('Receivables'),
'form': form,
'layout': layout,
'payments': self.batch,
'tickets': self.tickets_by_batch(),
'reservation_dates_formatted': reservation_dates_formatted,
'providers': providers,
'pdf_export_link': append_query_param(request.class_link(
TicketInvoiceCollection,
{
'ticket_group': self.ticket_group,
'ticket_start': self.ticket_start,
'ticket_end': self.ticket_end,
'reservation_start': self.reservation_start,
'reservation_end': self.reservation_end,
},
), 'format', 'pdf')
}
@OrgApp.json(
model=PaymentCollection,
name='batch-set-payment-state',
request_method='POST',
permission=Private
)
[docs]
def handle_batch_set_payment_state(
self: PaymentCollection,
request: OrgRequest
) -> JSON_ro:
request.assert_valid_csrf_token()
payment_ids = request.json_body.get('payment_ids', [])
state = request.json_body.get('state')
if state not in ('invoiced', 'paid', 'open'):
raise exc.HTTPBadRequest()
invoiced: bool | None
match state:
case 'invoiced':
invoiced = True
case 'open':
invoiced = False
case 'paid':
invoiced = None
payments_query = self.session.query(Payment).distinct().filter(
Payment.id.in_(payment_ids)
).options(
joinedload(Payment.linked_invoice_items)
.joinedload(TicketInvoiceItem.invoice)
)
# State sequence is assumed to be: 'open' -> 'invoiced' - 'paid'
updated_count = 0
for payment in payments_query:
if not isinstance(payment, ManualPayment):
continue
if payment.state != state:
if payment.state == 'open' and state == 'invoiced':
payment.state = state
if payment.state == 'invoiced' and state == 'paid':
payment.state = state
# backwards
if payment.state == 'invoiced' and state == 'open':
payment.state = state
if payment.state == 'paid' and state == 'invoiced':
payment.state = state
# update the paid/invoiced state of any linked invoices
for item in payment.linked_invoice_items:
paid = item.payments[-1].state == 'paid'
if item.paid is not paid:
item.paid = paid
if invoiced is not None and item.invoice.invoiced != invoiced:
item.invoice.invoiced = invoiced
updated_count += 1
if updated_count > 0:
messages = {
'invoiced': _('${count} payments marked as invoiced.',
mapping={'count': updated_count}),
'paid': _('${count} payments marked as paid.',
mapping={'count': updated_count}),
'open': _('${count} payments marked as unpaid.',
mapping={'count': updated_count}),
}
request.success(messages[state])
return {'status': 'success', 'message': 'OK'}
@OrgApp.form(
model=TicketInvoiceCollection,
template='invoices.pt',
form=TicketInvoiceSearchForm,
permission=Private
)
[docs]
def view_invoices(
self: TicketInvoiceCollection,
request: OrgRequest,
form: TicketInvoiceSearchForm,
layout: TicketInvoiceCollectionLayout | None = None
) -> RenderData | WebobResponse:
layout = layout or TicketInvoiceCollectionLayout(self, request)
if request.params.get('format') == 'pdf':
return handle_pdf_response(self, request)
if form.submitted(request):
form.update_model(self)
return request.redirect(request.link(self))
request.include('invoicing')
if not form.errors:
form.apply_model(self)
# Process reservation dates into a display-ready format
reservation_dates = self.reservation_dates_by_batch()
reservation_dates_formatted = {
payment_id: (
f"{layout.format_date(start, 'date')} - "
f"{layout.format_date(end, 'date')}"
) if start != end else layout.format_date(start, 'date')
for payment_id, (start, end) in reservation_dates.items()
}
return {
'title': _('Invoices'),
'form': form,
'layout': layout,
'invoices': self.batch,
'reservation_dates_formatted': reservation_dates_formatted,
'pdf_export_link': append_query_param(request.url, 'format', 'pdf')
}
@OrgApp.json(
model=TicketInvoiceCollection,
name='batch-set',
request_method='POST',
permission=Private
)
[docs]
def handle_batch_set(
self: TicketInvoiceCollection,
request: OrgRequest
) -> JSON_ro:
request.assert_valid_csrf_token()
invoice_ids = request.json_body.get('invoice_ids', [])
state = request.json_body.get('state')
if state not in ('invoiced', 'uninvoiced', 'paid', 'open'):
raise exc.HTTPBadRequest()
invoiced: bool | None
payment_state: str | None
match state:
case 'invoiced':
invoiced = True
payment_state = 'invoiced'
message = _('${count} invoices marked as invoiced.')
case 'uninvoiced':
invoiced = False
payment_state = 'open'
message = _('${count} invoices marked as uninvoiced.')
case 'open':
invoiced = None
payment_state = None
message = _('${count} invoices marked as unpaid.')
case 'paid':
invoiced = None
payment_state = 'paid'
message = _('${count} invoices marked as paid.')
invoices_query = self.session.query(TicketInvoice).filter(
TicketInvoice.id.in_(invoice_ids)
).options(
joinedload(TicketInvoice.items)
.selectinload(TicketInvoiceItem.payments)
)
updated_count = 0
for invoice in invoices_query:
updated = False
if invoiced is not None:
if invoice.invoiced is not invoiced:
invoice.invoiced = invoiced
updated = True
elif invoice.total_amount <= 0.0:
# there should be no payments to update
continue
# update the state of any linked payments, as long as it is
# a valid state transition for that kind of payment
for item in invoice.items:
for payment in item.payments:
if payment.source != 'manual':
continue
if payment_state == 'invoiced':
if payment.state == 'open':
payment.state = 'invoiced'
updated = True
elif payment_state == 'open':
if payment.state == 'invoiced':
payment.state = 'open'
updated = True
elif payment_state == 'paid':
if payment.state == 'invoiced':
item.paid = True
payment.state = 'paid'
updated = True
elif payment.state == 'paid' and not item.paid:
item.paid = True
else:
assert payment_state is None
assert invoiced is None
# NOTE: We're more generous in this direction and allow
# two state transitions in one action, since it's
# possible to mark the payment as paid without
# marking the invoice as invoiced.
target_state = 'invoiced' if invoice.invoiced else 'open'
if payment.state == 'paid':
payment.state = target_state
item.paid = False
updated = True
elif payment.state == target_state and item.paid:
item.paid = False
if updated:
updated_count += 1
if updated_count > 0:
request.success(message % {'count': updated_count})
return {'status': 'success', 'message': 'OK'}
@OrgApp.form(
model=PaymentCollection,
name='export',
template='form.pt',
permission=Private,
form=merge_forms(DateRangeForm, ExportForm)
)
[docs]
def export_payments(
self: PaymentCollection,
request: OrgRequest,
form: PaymentExportForm,
layout: PaymentCollectionLayout | None = None
) -> RenderData | Response:
layout = layout or PaymentCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('Export'), '#'))
layout.editbar_links = None # type:ignore[assignment]
if form.submitted(request):
start, end = align_range_to_day(
standardize_date(as_datetime(form.data['start']), layout.timezone),
standardize_date(as_datetime(form.data['end']), layout.timezone),
layout.timezone)
return form.as_export_response(
run_export(
session=request.session,
start=start,
end=end,
nested=form.format == 'json',
formatter=layout.export_formatter(form.format)
)
)
return {
'title': _('Export'),
'layout': layout,
'form': form
}
[docs]
def run_export(
session: Session,
start: datetime,
end: datetime,
nested: bool,
formatter: Callable[[Any], object]
) -> Sequence[dict[str, Any]]:
collection = PaymentCollection(session, start=start, end=end)
payments = tuple(collection.subset())
if not payments:
return ()
links = collection.payment_links_by_batch(payments)
assert links is not None
def transform(
payment: Payment,
links: list[AnyPayableBase]
) -> dict[str, Any]:
r: dict[str, Any] = OrderedDict()
r['source'] = formatter(payment.source)
r['source_id'] = formatter(payment.remote_id)
r['source_references'] = (
tuple(formatter(r) for r in payment.remote_references)
if nested
else formatter('\n'.join(payment.remote_references))
)
r['state'] = formatter(payment.state)
r['currency'] = formatter(payment.currency)
r['gross'] = formatter(payment.amount)
r['net'] = formatter(round(payment.net_amount, 2))
r['fee'] = formatter(round(payment.fee, 2))
r['payout_id'] = formatter(payment.meta.get('payout_id'))
r['payout_date'] = formatter(payment.meta.get('payout_date'))
r['references'] = (
tuple(formatter(l.payable_reference) for l in links)
if nested
else formatter('\n'.join(l.payable_reference for l in links))
)
return r
return tuple(transform(p, links[p.id]) for p in payments)
@OrgApp.view(
model=Payment,
name='mark-as-paid',
request_method='POST',
permission=Private
)
[docs]
def mark_as_paid(self: Payment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
send_ticket_notifications(self, request, 'marked-as-paid')
request.success(_('The ticket was marked as paid'))
assert self.source == 'manual'
self.state = 'paid'
self.sync_invoice_items()
@OrgApp.view(
model=Payment,
name='mark-as-unpaid',
request_method='POST',
permission=Private
)
[docs]
def mark_as_unpaid(self: Payment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
send_ticket_notifications(self, request, 'marked-as-unpaid')
request.success(_('The ticket was marked as unpaid'))
assert self.source == 'manual'
self.state = 'open'
self.sync_invoice_items()
@OrgApp.view(
model=StripePayment,
name='capture',
request_method='POST',
permission=Private
)
[docs]
def capture_stripe(self: StripePayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
send_ticket_notifications(self, request, 'captured')
request.success(_('The payment was captured'))
assert self.source == 'stripe_connect'
self.charge.capture()
self.sync_invoice_items()
@OrgApp.view(
model=StripePayment,
name='refund',
request_method='POST',
permission=Private
)
[docs]
def refund_stripe(self: StripePayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
send_ticket_notifications(self, request, 'refunded')
request.success(_('The payment was refunded'))
assert self.source == 'stripe_connect'
self.refund()
self.sync_invoice_items()
@OrgApp.view(
model=DatatransPayment,
name='capture',
request_method='POST',
permission=Private
)
[docs]
def capture_datatrans(self: DatatransPayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
tx = self.transaction
if tx.status == 'settled':
request.info(
_('The payment is already captured but is still processing')
)
return
send_ticket_notifications(self, request, 'captured')
request.success(_('The payment was captured'))
assert self.source == 'datatrans'
if tx.status == 'transmitted':
self.sync(tx)
else:
self.provider.client.settle(tx)
self.sync_invoice_items()
@OrgApp.view(
model=DatatransPayment,
name='refund',
request_method='POST',
permission=Private
)
[docs]
def refund_datatrans(self: DatatransPayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
assert self.source == 'datatrans'
try:
self.refund()
except DatatransApiError:
request.alert(_('Could not refund the payment'))
else:
send_ticket_notifications(self, request, 'refunded')
request.success(_('The payment was refunded'))
@OrgApp.view(
model=SaferpayPayment,
name='capture',
request_method='POST',
permission=Private
)
[docs]
def capture_saferpay(self: SaferpayPayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
tx = self.transaction
if tx.status == 'PENDING':
request.info(
_('The payment is already captured but is still processing')
)
return
if tx.status == 'CANCELED':
request.alert(_('Could not capture the payment'))
else:
request.success(_('The payment was captured'))
send_ticket_notifications(self, request, 'captured')
assert self.source == 'worldline_saferpay'
if tx.status != 'AUTHORIZED':
self.sync(tx)
else:
self.provider.client.capture(tx)
self.sync_invoice_items()
@OrgApp.view(
model=SaferpayPayment,
name='refund',
request_method='POST',
permission=Private
)
[docs]
def refund_saferpay(self: SaferpayPayment, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
assert self.source == 'worldline_saferpay'
try:
new_refund = self.refund()
except SaferpayApiError:
request.alert(_('Could not refund the payment'))
else:
self.sync_invoice_items()
if new_refund:
send_ticket_notifications(self, request, 'refunded')
request.success(_('The payment was refunded'))