Source code for org.views.allocation

import morepath

from datetime import timedelta
from libres.db.models import ReservedSlot
from libres.modules.errors import LibresError

from onegov.core.security import Public, Private, Secret
from onegov.core.utils import is_uuid
from onegov.form import merge_forms
from onegov.org import OrgApp, utils, _
from onegov.org.forms import AllocationRuleForm
from onegov.org.forms import DaypassAllocationEditForm
from onegov.org.forms import DaypassAllocationForm
from onegov.org.forms import RoomAllocationEditForm
from onegov.org.forms import RoomAllocationForm
from onegov.org.forms.allocation import (
    DailyItemAllocationForm, DailyItemAllocationEditForm)
from onegov.org.layout import AllocationEditFormLayout
from onegov.org.layout import AllocationRulesLayout
from onegov.org.layout import ResourceLayout
from onegov.core.elements import Link, Confirm, Intercooler
from onegov.reservation import Allocation
from onegov.reservation import Reservation
from onegov.reservation import Resource
from onegov.reservation import ResourceCollection
from purl import URL
from sedate import utcnow
from sqlalchemy import not_, func
from sqlalchemy.dialects.postgresql import JSON
from sqlalchemy.orm import defer, defaultload
from webob import exc


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from onegov.core.types import JSON_ro, RenderData
    from onegov.org.request import OrgRequest
    from sqlalchemy.orm import Query
    from typing import TypeAlias
    from webob import Response

[docs] AllocationForm: TypeAlias = ( DaypassAllocationForm | RoomAllocationForm | DailyItemAllocationForm )
AllocationEditForm: TypeAlias = ( DaypassAllocationEditForm | RoomAllocationEditForm | DailyItemAllocationEditForm ) @OrgApp.json(model=Resource, name='slots', permission=Public)
[docs] def view_allocations_json(self: Resource, request: 'OrgRequest') -> 'JSON_ro': """ Returns the allocations in a fullcalendar compatible events feed. See `<https://fullcalendar.io/docs/event_data/events_json_feed/>`_ for more information. """ start, end = utils.parse_fullcalendar_request(request, self.timezone) if not (start and end): return () # get all allocations (including mirrors), for the availability calculation query: Query[Allocation] query = self.scheduler.allocations_in_range( # type:ignore[assignment] start, end, masters_only=False) query = query.order_by(Allocation._start) query = query.options(defer(Allocation.data)) query = query.options(defer(Allocation.group)) query = query.options( defaultload('reserved_slots') .defer('reservation_token') .defer('allocation_id') .defer('end')) # but only return the master allocations return tuple( e.as_dict() for e in utils.AllocationEventInfo.from_allocations( request, self, tuple(query) ) )
@OrgApp.view(model=Resource, name='process-rules', permission=Secret)
[docs] def process_rules(self: Resource, request: 'OrgRequest') -> None: """ Manually runs the rules processing cronjobs for testing. Not really dangerous, though it should be replaced with something proper instead, cronjobs currently do not run in most tests and that should be remedied. """ if request.current_username == 'admin@example.org': handle_rules_cronjob(self, request)
@OrgApp.html(model=Resource, name='rules', permission=Private, template='allocation_rules.pt')
[docs] def view_allocation_rules( self: Resource, request: 'OrgRequest', layout: AllocationRulesLayout | None = None ) -> 'RenderData': layout = layout or AllocationRulesLayout(self, request) def link_for_rule(rule: dict[str, Any], name: str) -> str: url = URL(request.link(self, name)) url = url.query_param('csrf-token', layout.csrf_token) url = url.query_param('rule', rule['id']) return url.as_string() def actions_for_rule(rule: dict[str, Any]) -> 'Iterator[Link]': yield Link( text=_('Stop'), url=link_for_rule(rule, 'stop-rule'), traits=( Confirm( _( 'Do you really want to stop "${title}"?', mapping={'title': rule['title']} ), _( 'The rule will be removed without affecting ' 'existing allocations.' ), _('Stop rule'), _('Cancel') ), Intercooler( request_method='POST', redirect_after=request.link(self, 'rules') ) ) ) yield Link( text=_('Delete'), url=link_for_rule(rule, 'delete-rule'), traits=( Confirm( _( 'Do you really want to delete "${title}"?', mapping={'title': rule['title']} ), _( "All allocations created by the rule will be removed, " "if they haven't been reserved yet." ), _('Delete rule'), _('Cancel') ), Intercooler( request_method='POST', redirect_after=request.link(self, 'rules') ) ) ) yield Link( text=_('Edit'), url=link_for_rule(rule, 'edit-rule'), ) def rules_with_actions() -> 'Iterator[RenderData]': form_class = get_allocation_rule_form_class(self, request) for rule in self.content.get('rules', ()): form = request.get_form(form_class, csrf_support=False, model=self) form.rule = rule yield { 'title': rule['title'], 'actions': tuple(actions_for_rule(rule)), 'form': form } return { 'layout': layout, 'title': _('Rules'), 'rules': tuple(rules_with_actions()) }
[docs] def get_new_allocation_form_class( resource: Resource, request: 'OrgRequest' ) -> type['AllocationForm']: """ Returns the form class for new allocations (different resources have different allocation forms). """ if resource.type == 'daypass': return DaypassAllocationForm if resource.type == 'room': return RoomAllocationForm if resource.type == 'daily-item': return DailyItemAllocationForm raise NotImplementedError
[docs] def get_edit_allocation_form_class( allocation: Allocation, request: 'OrgRequest' ) -> type['AllocationEditForm']: """ Returns the form class for existing allocations (different resources have different allocation forms). """ resource = ResourceCollection( request.app.libres_context).by_id(allocation.resource) assert resource is not None if resource.type == 'daypass': return DaypassAllocationEditForm if resource.type == 'room': return RoomAllocationEditForm if resource.type == 'daily-item': return DailyItemAllocationEditForm raise NotImplementedError
# NOTE: We would like the return type to be an intersection
[docs] def get_allocation_rule_form_class( resource: Resource, request: 'OrgRequest' ) -> type[AllocationRuleForm]: """ Returns the form class for allocation rules. """ form = get_new_allocation_form_class(resource, request) return merge_forms(AllocationRuleForm, form)
@OrgApp.form(model=Resource, template='form.pt', name='new-allocation', permission=Private, form=get_new_allocation_form_class)
[docs] def handle_new_allocation( self: Resource, request: 'OrgRequest', form: 'AllocationForm', layout: ResourceLayout | None = None ) -> 'RenderData | Response': """ Handles new allocations for differing form classes. """ if form.submitted(request): try: allocations = self.scheduler.allocate( dates=form.dates, whole_day=form.whole_day, quota=form.quota, quota_limit=form.quota_limit, data=form.data, partly_available=form.partly_available ) except LibresError as e: utils.show_libres_error(e, request) else: if not allocations: request.alert(_('No allocations to add')) else: request.success( _('Successfully added ${n} allocations', mapping={ 'n': len(allocations) })) self.highlight_allocations(allocations) return morepath.redirect(request.link(self)) elif not request.POST: start, end = utils.parse_fullcalendar_request(request, self.timezone) whole_day = request.params.get('whole_day') == 'yes' if start and end: if whole_day: form.start.data = start form.end.data = end if hasattr(form, 'as_whole_day'): form.as_whole_day.data = 'yes' else: form.start.data = start form.end.data = end if hasattr(form, 'as_whole_day'): form.as_whole_day.data = 'no' if hasattr(form, 'start_time'): assert hasattr(form, 'end_time') form.start_time.data = start form.end_time.data = end layout = layout or ResourceLayout(self, request) layout.breadcrumbs.append(Link(_('New allocation'), '#')) layout.editbar_links = None return { 'layout': layout, 'title': _('New allocation'), 'form': form }
@OrgApp.form(model=Allocation, template='form.pt', name='edit', permission=Private, form=get_edit_allocation_form_class)
[docs] def handle_edit_allocation( self: Allocation, request: 'OrgRequest', form: 'AllocationEditForm', layout: AllocationEditFormLayout | None = None ) -> 'RenderData | Response': """ Handles edit allocation for differing form classes. """ resources = ResourceCollection(request.app.libres_context) resource = resources.by_id(self.resource) assert resource is not None # this is a bit of a hack to keep the current view when a user drags an # allocation around, which opens this form and later leads back to the # calendar - if the user does this on the day view we want to return to # the same day view after the process # therefore we set the view on the resource (where this is okay) and on # the form action (where it's a bit of a hack), to ensure that the view # parameter is around for the whole time if isinstance(view := request.params.get('view'), str): resource.view = view assert hasattr(form, 'action') form.action = URL(form.action).query_param('view', view).as_string() if form.submitted(request): new_start, new_end = form.dates try: resource.scheduler.move_allocation( master_id=self.id, new_start=new_start, new_end=new_end, new_quota=form.quota, quota_limit=form.quota_limit, whole_day=form.whole_day ) except LibresError as e: utils.show_libres_error(e, request) else: # when we edit an allocation, we disassociate it from any rules if self.data and 'rule' in self.data: self.data = {k: v for k, v in self.data.items() if k != 'rule'} request.success(_('Your changes were saved')) resource.highlight_allocations([self]) return morepath.redirect(request.link(resource)) elif not request.POST: form.apply_model(self) assert self.timezone is not None start, end = utils.parse_fullcalendar_request(request, self.timezone) if start and end: form.apply_dates(start, end) layout = layout or AllocationEditFormLayout(self, request) layout.edit_mode = True return { 'layout': layout, 'title': _('Edit allocation'), 'form': form }
@OrgApp.view(model=Allocation, request_method='DELETE', permission=Private)
[docs] def handle_delete_allocation(self: Allocation, request: 'OrgRequest') -> None: """ Deletes the given resource (throwing an error if there are existing reservations associated with it). """ request.assert_valid_csrf_token() resource = request.app.libres_resources.by_allocation(self) assert resource is not None resource.scheduler.remove_allocation(id=self.id) @request.after def trigger_calendar_update(response: 'Response') -> None: response.headers.add('X-IC-Trigger', 'rc-allocations-changed')
@OrgApp.form(model=Resource, template='form.pt', name='new-rule', permission=Private, form=get_allocation_rule_form_class)
[docs] def handle_allocation_rule( self: Resource, request: 'OrgRequest', form: AllocationRuleForm, layout: AllocationRulesLayout | None = None ) -> 'RenderData | Response': layout = layout or AllocationRulesLayout(self, request) if form.submitted(request): changes = form.apply(self) rules = self.content.get('rules', []) rules.append(form.rule) self.content['rules'] = rules request.success(_( 'New rule active, ${n} allocations created', mapping={'n': changes} )) return request.redirect(request.link(self, name='rules')) return { 'layout': layout, 'title': _('New Rule'), 'form': form, 'helptext': _( 'Rules ensure that the allocations between start/end exist and ' 'that they are extended beyond those dates at the given ' 'intervals. ' ) }
@OrgApp.form(model=Resource, template='form.pt', name='edit-rule', permission=Private, form=get_allocation_rule_form_class)
[docs] def handle_edit_rule( self: Resource, request: 'OrgRequest', form: AllocationRuleForm, layout: AllocationRulesLayout | None = None ) -> 'RenderData | Response': request.assert_valid_csrf_token() layout = layout or AllocationRulesLayout(self, request) rule_id = rule_id_from_request(request) if form.submitted(request): # all the slots slots = self.scheduler.managed_reserved_slots() slots = slots.with_entities(ReservedSlot.allocation_id) # all the reservations reservations = self.scheduler.managed_reservations() reservations = reservations.with_entities(Reservation.target) candidates = self.scheduler.managed_allocations() candidates = candidates.filter( func.json_extract_path_text( func.cast(Allocation.data, JSON), 'rule' ) == rule_id ) # .. without the ones with slots candidates = candidates.filter( not_(Allocation.id.in_(slots.subquery()))) # .. without the ones with reservations candidates = candidates.filter( not_(Allocation.group.in_(reservations.subquery()))) # delete the allocations deleted_count = candidates.delete('fetch') # Update the rule itself rules = self.content.get('rules', []) for i, rule in enumerate(rules): if rule['id'] == rule_id: updated_rule = form.rule updated_rule['last_run'] = utcnow() # Reset last_run updated_rule['iteration'] = 0 # Reset iteration count rules[i] = updated_rule break self.content['rules'] = rules # Apply the updated rule new_allocations_count = form.apply(self) request.success( _( 'Rule updated. ${deleted} allocations removed, ' '${created} new allocations created.', mapping={ 'deleted': deleted_count, 'created': new_allocations_count, }, ) ) return request.redirect(request.link(self, name='rules')) # Pre-populate the form with existing rule data existing_rule = next( ( rule for rule in self.content.get('rules', []) if rule['id'] == rule_id ), None, ) if existing_rule is None: request.message(_('Rule not found'), 'warning') return request.redirect(request.link(self, name='rules')) form.rule = existing_rule return { 'layout': layout, 'title': _('Edit Rule'), 'form': form, 'helptext': _( 'Rules ensure that the allocations between start/end exist and ' 'that they are extended beyond those dates at the given ' 'intervals. ' ) }
[docs] def rule_id_from_request(request: 'OrgRequest') -> str: """ Returns the rule_id from the request params, ensuring that an actual uuid is returned. """ rule_id = request.params.get('rule') if not isinstance(rule_id, str) or not is_uuid(rule_id): raise exc.HTTPBadRequest() return rule_id
[docs] def handle_rules_cronjob(resource: Resource, request: 'OrgRequest') -> None: """ Handles all cronjob duties of the rules stored on the given resource. """ if not resource.content.get('rules'): return targets = [] now = utcnow() tomorrow = (now + timedelta(days=1)).date() def should_process(rule: dict[str, Any]) -> bool: # do not reprocess rules if they were processed less than 12 hours # prior - this prevents flaky cronjobs from accidentally processing # rules too often if ( rule['last_run'] and rule['last_run'] > utcnow() - timedelta(hours=12) ): return False # we assume to be called once a day, so if we are called, a daily # rule has to be processed if rule['extend'] == 'daily': return True if rule['extend'] == 'monthly' and tomorrow.day == 1: return True if ( rule['extend'] == 'yearly' and tomorrow.month == 1 and tomorrow.day == 1 ): return True return False def prepare_rule(rule: dict[str, Any]) -> dict[str, Any]: if should_process(rule): rule['iteration'] += 1 rule['last_run'] = now targets.append(rule) return rule resource.content['rules'] = [ prepare_rule(r) for r in resource.content.get('rules', ())] form_class = get_allocation_rule_form_class(resource, request) for rule in targets: form = request.get_form(form_class, csrf_support=False, model=resource) form.rule = rule form.apply(resource)
[docs] def delete_rule(resource: Resource, rule_id: str) -> None: """ Removes the given rule from the resource. """ resource.content['rules'] = [ rule for rule in resource.content.get('rules', ()) if rule['id'] != rule_id ]
@OrgApp.view(model=Resource, request_method='POST', permission=Private, name='stop-rule')
[docs] def handle_stop_rule(self: Resource, request: 'OrgRequest') -> None: request.assert_valid_csrf_token() rule_id = rule_id_from_request(request) delete_rule(self, rule_id) request.success(_('The rule was stopped'))
@OrgApp.view(model=Resource, request_method='POST', permission=Private, name='delete-rule')
[docs] def handle_delete_rule(self: Resource, request: 'OrgRequest') -> None: request.assert_valid_csrf_token() rule_id = rule_id_from_request(request) # all the slots slots = self.scheduler.managed_reserved_slots() slots = slots.with_entities(ReservedSlot.allocation_id) # all the reservations reservations = self.scheduler.managed_reservations() reservations = reservations.with_entities(Reservation.target) # include the allocations created by the given rule... candidates = self.scheduler.managed_allocations() candidates = candidates.filter( func.json_extract_path_text( func.cast(Allocation.data, JSON), 'rule' ) == rule_id ) # .. without the ones with slots candidates = candidates.filter( not_(Allocation.id.in_(slots.subquery()))) # .. without the ones with reservations candidates = candidates.filter( not_(Allocation.group.in_(reservations.subquery()))) # delete the allocations count = candidates.delete('fetch') delete_rule(self, rule_id) request.success( _('The rule was deleted, along with ${n} allocations', mapping={ 'n': count }) )