Source code for org.views.form_submission

""" Renders and handles defined forms, turning them into submissions. """
from __future__ import annotations

import morepath

from onegov.core.security import Public, Private
from onegov.form.collection import SurveyCollection
from onegov.form.models.submission import SurveySubmission
from onegov.org.cli import close_ticket
from onegov.ticket import TicketCollection
from onegov.form import (
    FormCollection,
    PendingFormSubmission,
    CompleteFormSubmission
)
from onegov.org import _, OrgApp
from onegov.org.layout import FormSubmissionLayout, SurveySubmissionLayout
from onegov.org.mail import send_ticket_mail
from onegov.org.utils import user_group_emails_for_new_ticket
from onegov.org.models import TicketMessage, SubmissionMessage
from onegov.pay import PaymentError, Price
from purl import URL
from webob.exc import HTTPNotFound


from typing import Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterable
    from onegov.core.types import RenderData
    from onegov.form import Form, FormSubmission
    from onegov.org.request import OrgRequest
    from webob import Response


[docs] def copy_query( request: OrgRequest, url: str, fields: Iterable[str] ) -> str: url_obj = URL(url) for field in fields: # FIXME: Technically this is incorrect when a field can have # multiple values, should we switch to getall? Do we # have to use a different method on URL to set them # in that case? value = request.GET.get(field, None) if value is None: continue url_obj = url_obj.query_param(field, value) return url_obj.as_string()
[docs] def get_price( request: OrgRequest, form: Form, submission: FormSubmission ) -> Price | None: total = form.total() if 'price' in submission.meta: if total is not None: total += Price(**submission.meta['price']) else: total = Price(**submission.meta['price']) return request.app.adjust_price(total)
@OrgApp.html(model=PendingFormSubmission, template='submission.pt', permission=Public, request_method='GET') @OrgApp.html(model=PendingFormSubmission, template='submission.pt', permission=Public, request_method='POST') @OrgApp.html(model=CompleteFormSubmission, template='submission.pt', permission=Private, request_method='GET') @OrgApp.html(model=CompleteFormSubmission, template='submission.pt', permission=Private, request_method='POST')
[docs] def handle_pending_submission( self: PendingFormSubmission | CompleteFormSubmission, request: OrgRequest, layout: FormSubmissionLayout | None = None ) -> RenderData | Response: """ Renders a pending submission, takes it's input and allows the user to turn the submission into a complete submission, once all data is valid. This view has two states, a completable state where the form values are displayed without a form and an edit state, where a form is rendered to change the values. Takes the following query parameters for customization:: * ``edit`` render the view in the edit state * ``return-to`` the view redirects to this url once complete * ``title`` a custom title (required if external submission) * ``quiet`` no success messages are rendered if present """ collection = FormCollection(request.session) form = request.get_form(self.form_class, data=self.data) form.action = request.link(self) form.model = self if 'edit' not in request.GET: form.validate() if not request.POST: form.ignore_csrf_error() elif not form.errors: collection.submissions.update(self, form) completable = not form.errors and 'edit' not in request.GET price = get_price(request, form, self) # check minimum price total if set current_total_amount = price and price.amount or 0.0 minimum_total_amount = self.minimum_price_total or 0.0 if current_total_amount < minimum_total_amount: if price is not None: currency = price.currency else: # We just pick the first currency from any pricing rule we can find # if we can't find any, then we fall back to 'CHF'. Although that # should be an invalid form definition. currency = 'CHF' for field in form._fields.values(): if not hasattr(field, 'pricing'): continue rules = field.pricing.rules if not rules: continue currency = next(iter(rules.values())).currency break completable = False request.alert( _( 'The total amount for the currently entered data ' 'is ${total} but has to be at least ${minimum}. ' 'Please adjust your inputs.', mapping={ 'total': Price(current_total_amount, currency), 'minimum': Price(minimum_total_amount, currency) } ) ) if completable and 'return-to' in request.GET: if 'quiet' not in request.GET: request.success(_('Your changes were saved')) # the default url should actually never be called return request.redirect(request.url) if 'title' in request.GET: title = request.GET['title'] else: assert self.form is not None title = self.form.title # retain some parameters in links (the rest throw away) form.action = copy_query( request, form.action, ('return-to', 'title', 'quiet')) edit_url_obj = URL(copy_query( request, request.link(self), ('title', ))) # the edit url always points to the editable state edit_url_obj = edit_url_obj.query_param('edit', '') edit_url = edit_url_obj.as_string() email = self.email or self.get_email_field_data(form) if price: assert email is not None assert request.locale is not None checkout_button = request.app.checkout_button( button_label=request.translate(_('Pay Online and Complete')), title=title, price=price, email=email, locale=request.locale ) else: checkout_button = None return { 'layout': layout or FormSubmissionLayout(self, request, title), 'title': title, 'form': form, 'completable': completable, 'edit_link': edit_url, 'complete_link': request.link(self, 'complete'), 'model': self, 'price': price, 'checkout_button': checkout_button }
@OrgApp.view(model=PendingFormSubmission, name='complete', permission=Public, request_method='POST') @OrgApp.view(model=CompleteFormSubmission, name='complete', permission=Private, request_method='POST')
[docs] def handle_complete_submission( self: PendingFormSubmission | CompleteFormSubmission, request: OrgRequest ) -> Response: form = request.get_form(self.form_class) form.process(data=self.data) form.model = self # we're not really using a csrf protected form here (the complete form # button is basically just there so we can use a POST instead of a GET) form.validate() form.ignore_csrf_error() if form.errors: return morepath.redirect(request.link(self)) else: if self.state == 'complete': self.data.changed() # type:ignore[attr-defined] # trigger updates request.success(_('Your changes were saved')) assert self.name is not None return morepath.redirect(request.link( FormCollection(request.session).scoped_submissions( self.name, ensure_existance=False) )) else: provider = request.app.default_payment_provider token = request.params.get('payment_token') if not isinstance(token, str): token = None price = get_price(request, form, self) payment = self.process_payment(price, provider, token) # FIXME: Custom error message for PaymentError? if not payment or isinstance(payment, PaymentError): request.alert(_('Your payment could not be processed')) return morepath.redirect(request.link(self)) elif payment is not True: self.payment = payment window = self.registration_window if window and not window.accepts_submissions(self.spots): request.alert(_('Registrations are no longer possible')) return morepath.redirect(request.link(self)) show_submission = request.params.get('send_by_email') == 'yes' self.meta['show_submission'] = show_submission self.meta.changed() # type:ignore[attr-defined] collection = FormCollection(request.session) submission_id = self.id # Expunges the submission from the session collection.submissions.complete_submission(self) # make sure accessing the submission doesn't flush it, because # it uses sqlalchemy utils observe, which doesn't like premature # flushing at all with collection.session.no_autoflush: ticket = TicketCollection(request.session).open_ticket( handler_code=self.meta.get('handler_code', 'FRM'), handler_id=self.id.hex ) TicketMessage.create(ticket, request, 'opened') assert self.email is not None send_ticket_mail( request=request, template='mail_ticket_opened.pt', subject=_('Your request has been registered'), ticket=ticket, receivers=(self.email, ), content={ 'model': ticket, 'form': form, 'show_submission': self.meta['show_submission'] } ) directory_user_group_recipients = user_group_emails_for_new_ticket( request, ticket ) if request.email_for_new_tickets: send_ticket_mail( request=request, template='mail_ticket_opened_info.pt', subject=_('New ticket'), ticket=ticket, receivers=( request.email_for_new_tickets, *directory_user_group_recipients, ), content={'model': ticket}, ) request.app.send_websocket( channel=request.app.websockets_private_channel, message={ 'event': 'browser-notification', 'title': request.translate(_('New ticket')), 'created': ticket.created.isoformat() } ) if request.auto_accept(ticket): try: # FIXME: Was the auto_accept_user being None the only # way this could raise ValueError previously? # If so refactor this to a simple if/else if request.auto_accept_user is None: raise ValueError() ticket.accept_ticket(request.auto_accept_user) # We need to reload the object with the correct polymorphic # type submission = collection.submissions.by_id( submission_id, state='complete', current_only=True ) assert isinstance(submission, CompleteFormSubmission) handle_submission_action( submission, request, 'confirmed', True, raises=True ) except ValueError: if request.is_manager: request.warning(_('Your request could not be ' 'accepted automatically!')) else: close_ticket( ticket, request.auto_accept_user, request ) request.success(_('Thank you for your submission!')) return morepath.redirect(request.link(ticket, 'status'))
@OrgApp.view(model=CompleteFormSubmission, name='ticket', permission=Private)
[docs] def view_submission_ticket( self: CompleteFormSubmission, request: OrgRequest ) -> Response: ticket = TicketCollection(request.session).by_handler_id(self.id.hex) if not ticket: raise HTTPNotFound() return request.redirect(request.link(ticket))
@OrgApp.view(model=CompleteFormSubmission, name='confirm-registration', permission=Private, request_method='POST')
[docs] def handle_accept_registration( self: CompleteFormSubmission, request: OrgRequest ) -> Response | None: return handle_submission_action(self, request, 'confirmed')
@OrgApp.view(model=CompleteFormSubmission, name='deny-registration', permission=Private, request_method='POST')
[docs] def handle_deny_registration( self: CompleteFormSubmission, request: OrgRequest ) -> Response | None: return handle_submission_action(self, request, 'denied')
@OrgApp.view(model=CompleteFormSubmission, name='cancel-registration', permission=Private, request_method='POST')
[docs] def handle_cancel_registration( self: CompleteFormSubmission, request: OrgRequest ) -> Response | None: return handle_submission_action(self, request, 'cancelled')
[docs] def handle_submission_action( self: CompleteFormSubmission, request: OrgRequest, action: Literal['confirmed', 'denied', 'cancelled'], ignore_csrf: bool = False, raises: bool = False, no_messages: bool = False, force_email: bool = False ) -> Response | None: if not ignore_csrf: request.assert_valid_csrf_token() if action == 'confirmed': subject = _('Your registration has been confirmed') success = _('The registration has been confirmed') failure = _('The registration could not be confirmed because the ' 'maximum number of participants has been reached') def execute() -> bool: if self.registration_window and self.claimed is None: return self.claim() return False elif action == 'denied': subject = _('Your registration has been denied') success = _('The registration has been denied') failure = _('The registration could not be denied') def execute() -> bool: if self.registration_window and self.claimed is None: self.disclaim() return True return False elif action == 'cancelled': subject = _('Your registration has been cancelled') success = _('The registration has been cancelled') failure = _('The registration could not be cancelled') def execute() -> bool: if self.registration_window and self.claimed: self.disclaim() return True return False else: raise AssertionError('unreachable') if execute(): assert self.email is not None ticket = TicketCollection(request.session).by_handler_id(self.id.hex) assert ticket is not None send_ticket_mail( request=request, template='mail_registration_action.pt', receivers=(self.email, ), ticket=ticket, content={ 'model': self, 'action': action, 'ticket': ticket, 'form': self.form_obj, 'show_submission': self.meta.get('show_submission') }, subject=subject, force=force_email ) SubmissionMessage.create(ticket, request, action) if not no_messages: request.success(success) else: if raises: raise ValueError(request.translate(failure)) if not no_messages: request.alert(failure) return None return request.redirect(request.link(self))
@OrgApp.html(model=SurveySubmission, template='survey_submission.pt', permission=Public, request_method='GET') @OrgApp.html(model=SurveySubmission, template='survey_submission.pt', permission=Public, request_method='POST')
[docs] def handle_survey_submission( self: SurveySubmission, request: OrgRequest, layout: SurveySubmissionLayout | None = None ) -> RenderData | Response: """ Renders a pending submission, takes it's input and allows the user to turn the submission into a complete submission, once all data is valid. """ collection = SurveyCollection(request.session) form = request.get_form(self.form_class, data=self.data) form.action = request.link(self) form.model = self if 'edit' not in request.GET: form.validate() if not request.POST: form.ignore_csrf_error() elif not form.errors: collection.submissions.update(self, form) completable = not form.errors and 'edit' not in request.GET if completable and 'return-to' in request.GET: if 'quiet' not in request.GET: request.success(_('Your changes were saved')) # the default url should actually never be called return request.redirect(request.url) if 'title' in request.GET: title = request.GET['title'] else: assert self.survey is not None title = self.survey.title # retain some parameters in links (the rest throw away) form.action = copy_query( request, form.action, ('return-to', 'title', 'quiet')) edit_url_obj = URL(copy_query( request, request.link(self), ('title', ))) # the edit url always points to the editable state edit_url_obj = edit_url_obj.query_param('edit', '') edit_url = edit_url_obj.as_string() layout = layout or SurveySubmissionLayout(self, request, title) layout.editbar_links = [] return { 'layout': layout or SurveySubmissionLayout(self, request, title), 'title': title, 'form': form, 'completable': completable, 'edit_link': edit_url, 'complete_link': request.link(self, 'complete'), 'model': self, 'price': None, }