Source code for feriennet.views.booking

from __future__ import annotations

import morepath
import urllib.parse

from collections import defaultdict, OrderedDict
from datetime import date
from decimal import Decimal
from markupsafe import Markup
from onegov.activity import Activity, AttendeeCollection
from onegov.activity import Attendee
from onegov.activity import Booking
from onegov.activity import BookingCollection
from onegov.activity import Occasion
from onegov.core.custom import json
from onegov.core.elements import Link, Confirm, Intercooler
from onegov.core.orm import as_selectable_from_path
from onegov.core.security import Public, Personal, Secret
from onegov.core.templates import render_macro, render_template
from onegov.core.utils import normalize_for_url, module_path
from onegov.feriennet import FeriennetApp, _
from onegov.feriennet.layout import (
    BookingCollectionLayout, DefaultLayout, GroupInviteLayout)
from onegov.feriennet.models import AttendeeCalendar, GroupInvite
from onegov.feriennet.utils import decode_name
from onegov.feriennet.views.shared import users_for_select_element
from onegov.org.layout import DefaultMailLayout
from onegov.user import User
from purl import URL
from sortedcontainers import SortedList
from sqlalchemy import select, and_, not_
from sqlalchemy.orm import contains_eager
from uuid import UUID


from typing import Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Collection, Iterable
    from onegov.activity.models import Period, PeriodMeta
    from onegov.activity.models.booking import BookingState
    from onegov.core.elements import Trait
    from onegov.core.types import RenderData
    from onegov.feriennet.request import FeriennetRequest
    from sqlalchemy.orm import Query, Session
    from typing import NamedTuple
    from webob import Response

[docs] class RelatedAttendeeRow(NamedTuple):
[docs] occasion_id: UUID
[docs] attendee_count: int
[docs] parent: str
[docs] parent_username: str
[docs] children: int
[docs] phone: str | None
[docs] place: str | None
[docs] email: str
[docs] booking_state: BookingState
[docs] DELETABLE_STATES = ('open', 'cancelled', 'denied', 'blocked')
[docs] def all_bookings(collection: BookingCollection) -> list[Booking]: """ Loads all bookings together with the linked occasions, attendees and activities. This is somewhat of a heavy query, but it beats having to load all these things separately. """ query = collection.query() query = query.join(Booking.attendee) query = query.join(Booking.occasion) query = query.join(Occasion.activity) query = query.options(contains_eager(Booking.attendee)) query = query.options( contains_eager(Booking.occasion). contains_eager(Occasion.activity)) query = query.order_by(Booking.attendee_id) query = query.order_by(Activity.name) query = query.order_by(Occasion.order) return query.all()
[docs] def group_bookings( period: Period | PeriodMeta, bookings: Iterable[Booking] ) -> dict[Attendee, dict[BookingState, SortedList[Booking]]]: """ Takes a (small) list of bookings and groups them by attendee and state and sorting them by date. """ state_order = ( 'open', 'accepted', 'denied', 'blocked', 'cancelled' ) if period.wishlist_phase: def state(booking: Booking) -> BookingState: return 'open' else: def state(booking: Booking) -> BookingState: return booking.state grouped: dict[Attendee, dict[BookingState, SortedList[Booking]]] = {} for b in sorted(bookings, key=lambda b: state_order.index(state(b))): if b.attendee not in grouped: # I tried using a SortedDict here, but chameleon has problems # dealing with it, so an ordered dict is used instead grouped[b.attendee] = OrderedDict() if state(b) not in grouped[b.attendee]: grouped[b.attendee][state(b)] = SortedList( # type:ignore key=lambda b: b.order) grouped[b.attendee][state(b)].add(b) return grouped
[docs] def total_by_bookings( period: Period | PeriodMeta | None, bookings: Collection[Booking] ) -> Decimal: if bookings: total = sum( b.cost for b in bookings if b.state == 'accepted' and b.cost ) total = total or Decimal('0.00') else: return Decimal('0.00') if period and period.all_inclusive and period.booking_cost: total += period.booking_cost return total
[docs] def attendees_by_username( request: FeriennetRequest, username: str ) -> list[Attendee]: """ Loads the given attendees linked to the given username, sorted by their name. """ a = AttendeeCollection(request.session).by_username(username).all() a.sort(key=lambda a: normalize_for_url(a.name)) return a
[docs] def get_booking_title(layout: DefaultLayout, booking: Booking) -> str: return '{} - {}'.format( booking.occasion.activity.title, layout.format_datetime_range( booking.occasion.dates[0].localized_start, booking.occasion.dates[0].localized_end))
[docs] def actions_by_booking( layout: DefaultLayout, period: Period | PeriodMeta | None, booking: Booking ) -> list[Link]: actions: list[Link] = [] if not period: return actions if period.wishlist_phase or booking.state in ('accepted', 'open'): if period.wishlist_phase or period.booking_phase: if not booking.group_code: actions.append( Link( text=_('Invite a companion'), url=layout.request.link(booking, 'invite'), attrs={ 'class': 'invite-link', }, ) ) else: # XXX this is not too efficient as there might be many queries states: Literal['*'] | tuple[BookingState, ...] if period.wishlist_phase: states = '*' else: states = ('open', 'accepted') count = booking.group_code_count(states) - 1 invite = GroupInvite( layout.request.session, booking.group_code, booking.username) if count < 1: # the group code is not shown if the attendee is alone actions.append( Link( text=_('Invite a companion'), url=layout.request.link(invite), attrs={ 'class': 'invite-link', }, ) ) elif count == 1: actions.append( Link( text=_('With 1 companion in a group'), url=layout.request.link(invite), attrs={ 'class': 'invite-link', 'data-group': booking.group_code, }, ) ) elif count > 1: actions.append( Link( text=_('With ${n} companions in a group', mapping={ 'n': count }), url=layout.request.link(invite), attrs={ 'class': 'invite-link', 'data-group': booking.group_code, }, ) ) if period.wishlist_phase or ( booking.state in DELETABLE_STATES and not period.finalized): label = ( period.wishlist_phase and _('Remove wish') or _('Remove booking') ) actions.append(Link( text=label, url=layout.csrf_protected_url(layout.request.link(booking)), attrs={ 'class': 'delete-link', }, traits=( Confirm( _('Do you really want to remove "${title}"?', mapping={ 'title': get_booking_title(layout, booking) }), None, label, _('Cancel'), ), Intercooler( request_method='DELETE', redirect_after=layout.request.link(layout.model), target='#booking-{}'.format(booking.id), ), ) )) if period.active and period.confirmed and booking.state == 'accepted': if ( # admins can always cancel bookings layout.request.is_admin # other users can if it isn't past its cancellation deadline or not booking.occasion.is_past_cancellation(layout.today()) ): actions.append(Link( text=_('Cancel Booking'), url=layout.csrf_protected_url( layout.request.link(booking, 'cancel') ), attrs={ 'class': 'cancel-link', }, traits=( Confirm( _('Do you really want to cancel "${title}"?', mapping={ 'title': get_booking_title(layout, booking) }), _('This cannot be undone.'), _('Cancel Booking'), _('Cancel'), ), Intercooler( request_method='POST', redirect_after=layout.request.link(layout.model) ) ))) return actions
[docs] def show_error_on_attendee( request: FeriennetRequest, attendee: Attendee, message: str ) -> None: @request.after def show_error(response: Response) -> None: response.headers.add('X-IC-Trigger', 'show-alert') response.headers.add('X-IC-Trigger-Data', json.dumps({ 'type': 'alert', 'target': f'#alert-boxes-for-{attendee.id}', 'message': request.translate(message) }))
@FeriennetApp.html( model=BookingCollection, template='bookings.pt', permission=Personal)
[docs] def view_my_bookings( self: BookingCollection, request: FeriennetRequest ) -> RenderData: assert self.username is not None attendees = attendees_by_username(request, self.username) periods = request.app.periods period = next((p for p in periods if p.id == self.period_id), None) bookings = all_bookings(self) grouped_bookings = period and group_bookings(period, bookings) or {} related = request.app.org.meta.get('show_related_contacts') or None if period and period.confirmed and related: related = related_attendees(self.session, occasion_ids={ b.occasion_id for b in bookings }) else: related = None if request.is_admin: users = users_for_select_element(request) user = (request.session.query(User) .filter_by(username=self.username).one()) else: assert request.current_user is not None users, user = None, request.current_user def subscribe_link(attendee: Attendee) -> str: url = request.link(AttendeeCalendar(self.session, attendee)) url = url.replace('https://', 'webcal://') url = url.replace('http://', 'webcal://') return url def get_total(attendee: Attendee) -> Decimal: bookings = tuple( b for state in grouped_bookings[attendee] for b in grouped_bookings[attendee][state] ) return total_by_bookings(period, bookings) def booking_cost(booking: Booking) -> Decimal | None: if period and period.confirmed: return booking.cost else: return booking.occasion.total_cost has_emergency_contact = user.data and user.data.get('emergency') show_emergency_info = user.username == request.current_username layout = BookingCollectionLayout(self, request, user) def user_select_link(user: User) -> str: return request.class_link(BookingCollection, { 'username': user.username, 'period_id': period and period.id or None }) def occasion_attendees( request: FeriennetRequest, username: str, occasion_id: UUID ) -> list[Attendee]: children = attendees_by_username(request, username) attendees = [] for c in children: accepted_bookings = [ b for b in c.bookings if b.state == 'accepted'] occasions = [b.occasion_id for b in accepted_bookings] if occasion_id in occasions: attendees.append(c) return attendees return { 'actions_by_booking': lambda b: actions_by_booking(layout, period, b), 'attendees': attendees, 'occasion_attendees': occasion_attendees, 'subscribe_link': subscribe_link, 'grouped_bookings': grouped_bookings, 'total_by_attendee': get_total, 'has_bookings': bookings and True or False, 'booking_cost': booking_cost, 'layout': layout, 'model': self, 'period': period, 'periods': periods, 'related': related, 'user': user, 'users': users, 'user_select_link': user_select_link, 'title': layout.title, 'has_emergency_contact': has_emergency_contact, 'show_emergency_info': show_emergency_info }
@FeriennetApp.view( model=Booking, permission=Personal, request_method='DELETE')
[docs] def delete_booking(self: Booking, request: FeriennetRequest) -> None: request.assert_valid_csrf_token() if self.period.confirmed and self.state not in DELETABLE_STATES: show_error_on_attendee(request, self.attendee, _( 'Only open, cancelled, denied or blocked bookings may be deleted')) return BookingCollection(request.session).delete(self) @request.after def remove_target(response: Response) -> None: response.headers.add('X-IC-Remove', 'true')
@FeriennetApp.view( model=Booking, name='cancel', permission=Personal, request_method='POST')
[docs] def cancel_booking(self: Booking, request: FeriennetRequest) -> None: request.assert_valid_csrf_token() if not self.period.wishlist_phase: if not request.is_admin: if self.occasion.is_past_cancellation(date.today()): request.alert(_( 'Only admins may cancel bookings at this point.' )) return BookingCollection(request.session).cancel_booking( booking=self, score_function=self.period.scoring, cascade=False) request.success(_('The booking was cancelled successfully')) bookings_link = Markup('<a href="{}">{}</a>').format( request.class_link(BookingCollection, { 'period_id': self.period.id }), request.translate(_('Bookings')) ) subject = request.translate(_( 'Degregistration of ${attendee} for "${title}"', mapping={ 'title': self.occasion.activity.title, 'attendee': self.attendee.name })) if self.period.booking_start <= date.today(): request.app.send_transactional_email( subject=subject, receivers=(self.user.username, ), content=render_template('mail_booking_canceled.pt', request, { 'layout': DefaultMailLayout(self, request), 'title': subject, 'model': self, 'bookings_link': bookings_link, 'name': self.attendee.name, 'dates': self.dates }) ) @request.after def update_matching(response: Response) -> None: response.headers.add('X-IC-Trigger', 'reload-from') response.headers.add('X-IC-Trigger-Data', json.dumps({ 'selector': f'#{self.occasion.id}' }))
@FeriennetApp.view( model=Booking, name='toggle-star', permission=Personal, request_method='POST')
[docs] def toggle_star(self: Booking, request: FeriennetRequest) -> str: if self.period.wishlist_phase: if not self.starred: if not self.star(max_stars=3): show_error_on_attendee(request, self.attendee, _( 'Cannot select more than three favorites per child')) else: self.unstar() else: show_error_on_attendee(request, self.attendee, _( 'The period is not in the wishlist-phase')) layout = DefaultLayout(self, request) return render_macro(layout.macros['star'], request, {'booking': self})
@FeriennetApp.view( model=Booking, name='toggle-nobble', permission=Secret, request_method='POST')
[docs] def toggle_nobble(self: Booking, request: FeriennetRequest) -> str: if self.nobbled: self.unnobble() else: self.nobble() layout = DefaultLayout(self, request) return render_macro(layout.macros['nobble'], request, {'booking': self})
[docs] def render_css(content: str, request: FeriennetRequest) -> morepath.Response: response = morepath.Response(content) response.content_type = 'text/css' return response
@FeriennetApp.view( model=BookingCollection, name='mask', permission=Personal, render=render_css)
[docs] def view_mask(self: BookingCollection, request: FeriennetRequest) -> str: # hackish way to get the single attendee print to work -> mask all the # attendees, except for the one given by param try: attendee = UUID(request.params.get('id', None)).hex # type:ignore except (ValueError, TypeError): return '' return f""" .attendee-bookings-row {{ display: none; }} #attendee-{attendee} {{ display: block; }} """
@FeriennetApp.view( model=Booking, name='invite', permission=Personal)
[docs] def create_invite(self: Booking, request: FeriennetRequest) -> Response: """ Creates a group_code on the booking, if one doesn't exist already and redirects to the GroupInvite view. """ if not self.group_code: self.group_code = GroupInvite.create( request.session, self.username).group_code link = request.link(GroupInvite( request.session, self.group_code, self.username)) return request.redirect(link)
@FeriennetApp.html( model=GroupInvite, permission=Public, template='invite.pt')
[docs] def view_group_invite( self: GroupInvite, request: FeriennetRequest ) -> RenderData: layout = GroupInviteLayout(self, request) occasion = self.occasion attendees_count = len(self.attendees) existing = [a for a, b in self.attendees if a.username == self.username] external = [a for a, b in self.attendees if a.username != self.username] possible = ( request.session.query(Attendee) .filter_by(username=self.username) .filter(not_(Attendee.id.in_(tuple(a.id for a in existing)))) .all() ) actionable_bookings = { b.attendee_id: b for b in request.session.query(Booking).filter_by( username=self.username, occasion_id=occasion.id, ) } def signup_url(attendee: Attendee | None = None) -> str: # we need a logged in user if not request.is_logged_in: return layout.login_to_url(request.link(self)) # build the URL needed to book the occasion url = request.link(occasion, name='book') # preselect the attendee when booking the occasion, and join this group if attendee: url_obj = URL(url).query_param('attendee_id', attendee.id.hex) else: url_obj = URL(url).query_param('attendee_id', 'other') # preselect the group code and the username url_obj = url_obj.query_param('group_code', self.group_code) if self.username is not None: url_obj = url_obj.query_param('username', self.username) url = url_obj.as_string() # return to the current URL url = request.return_here(url) return url def group_action( attendee: Attendee, action: Literal['join', 'leave'] ) -> Link: assert action in ('join', 'leave') traits: tuple[Trait, ...] if attendees_count == 1 and action == 'leave': traits = ( Intercooler( request_method='POST', redirect_after=request.class_link(BookingCollection) ), ) else: traits = ( Intercooler( request_method='POST', redirect_after=request.link(self) ), ) if attendee.id in actionable_bookings: booking = actionable_bookings[attendee.id] url = (URL(request.link(self, action)) .query_param('booking_id', booking.id.hex) .as_string()) else: url = signup_url(attendee) traits = () if action == 'join': text = ( ('👦 ' if attendee.gender == 'male' else '👧 ') + attendee.name ) else: text = _('Leave Group') return Link( text=text, url=layout.csrf_protected_url(url), traits=traits ) # https://stackoverflow.com/a/23847977/138103 first_child = existing and existing[0] or external[0] first_name = decode_name(first_child.name)[0] subject = occasion.activity.title message = _( ( 'Hi!\n\n' '${first_name} wants to take part in the "${title}" activity by ' '${organisation} and would be thrilled to go with a mate.\n\n' 'You can add the activity to the wishlist of your child through ' 'the following link, if you are interested. This way the children ' 'have a better chance of getting a spot together:\n\n' '${link}' ), mapping={ 'first_name': first_name, 'link': request.link(self.for_username(None)), 'title': occasion.activity.title, 'organisation': request.app.org.name, } ) mailto = 'mailto:%20?subject={subject}&body={message}'.format( subject=urllib.parse.quote(subject), message=urllib.parse.quote(request.translate(message)) ) users = users_for_select_element(request) user = (request.session.query(User) .filter_by(username=self.username).first()) def user_select_link(user: User) -> str: return request.link(self.for_username(user.username)) return { 'layout': layout, 'title': _('Group for "${title}"', mapping={ 'title': occasion.activity.title }), 'occasion': occasion, 'model': self, 'group_action': group_action, 'signup_url': signup_url, 'existing': existing, 'external': external, 'possible': possible, 'mailto': mailto, 'users': users, 'user': user, 'user_select_link': user_select_link, }
@FeriennetApp.view( model=GroupInvite, permission=Personal, name='join', request_method='POST')
[docs] def join_group(self: GroupInvite, request: FeriennetRequest) -> None: request.assert_valid_csrf_token() booking_id = request.params.get('booking_id', None) booking = request.session.query(Booking).filter_by(id=booking_id).first() if not booking: request.warning(_('The booking does not exist')) return own_children = { a_id for a_id, in request.session.query(Attendee.id) .filter_by(username=self.username) } if booking.attendee_id not in own_children: request.alert( _('Not permitted to join this attendee to the group')) return booking.group_code = self.group_code request.success(_('Successfully joined the group'))
@FeriennetApp.view( model=GroupInvite, permission=Personal, name='leave', request_method='POST')
[docs] def leave_group(self: GroupInvite, request: FeriennetRequest) -> None: request.assert_valid_csrf_token() booking_id = request.params.get('booking_id', None) booking = request.session.query(Booking).filter_by(id=booking_id).first() if not booking: request.warning(_('The booking does not exist')) return own_children = { a_id for a_id, in request.session.query(Attendee.id) .filter_by(username=self.username) } if booking.attendee_id not in own_children: request.alert( _('Not permitted to evict this attendee from the group')) return booking.group_code = None request.success(_('Successfully left the group'))