Source code for org.forms.allocation
import sedate
from functools import cached_property
from datetime import date, datetime, time, timedelta
from dateutil.relativedelta import relativedelta
from dateutil.rrule import rrule, DAILY
from uuid import uuid4
from wtforms.fields import DateField
from wtforms.fields import IntegerField
from wtforms.fields import RadioField
from wtforms.fields import StringField
from wtforms.validators import DataRequired, NumberRange, InputRequired
from onegov.form import Form
from onegov.form.fields import MultiCheckboxField
from onegov.form.fields import TimeField
from onegov.org import _
from onegov.org.forms.util import WEEKDAYS
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterable, Sequence
from onegov.org.request import OrgRequest
from onegov.reservation import Allocation, Resource
from onegov.core.types import SequenceOrScalar
from typing import Protocol
[docs]
def choices_as_integer(choices: 'Iterable[str] | None') -> list[int] | None:
if choices is None:
return None
return [int(c) for c in choices]
[docs]
class AllocationFormHelpers:
[docs]
def generate_dates(
self,
start: date | None,
end: date | None,
start_time: time | None = None,
end_time: time | None = None,
weekdays: 'Iterable[int] | None' = None
) -> list[tuple[datetime, datetime]]:
""" Takes the given dates and generates the date tuples.
The ``except_for`` field will be considered if present, as will the
``on_holidays`` setting.
"""
if not (start and end):
return []
start_dt = sedate.as_datetime(start)
end_dt = sedate.as_datetime(end)
dates: Iterable[datetime]
if start_dt == end_dt:
dates = (start_dt, )
else:
dates = rrule(
DAILY,
dtstart=start_dt,
until=end_dt,
byweekday=weekdays
)
if start_time is None or end_time is None:
return [(d, d) for d in dates if not self.is_excluded(d)]
else:
if end_time < start_time:
end_offset = timedelta(days=1)
else:
end_offset = timedelta()
return [
(
datetime.combine(d, start_time),
datetime.combine(d + end_offset, end_time)
) for d in dates if not self.is_excluded(d)
]
[docs]
def combine_datetime(self, field: str, time_field: str) -> datetime | None:
""" Takes the given date field and combines it with the given time
field. """
d, t = getattr(self, field).data, getattr(self, time_field).data
if d and not t:
return sedate.as_datetime(d)
if not t:
return None
return datetime.combine(d, t)
[docs]
class AllocationRuleForm(Form):
""" Base form form allocation rules. """
if TYPE_CHECKING:
# forward declare required properties/methods
@property
@property
def weekdays(self) -> Iterable[int]: ...
@property
def whole_day(self) -> bool: ...
@property
def quota(self) -> int: ...
@property
def quota_limit(self) -> int: ...
@property
def partly_available(self) -> bool: ...
def generate_dates(
self,
start: date | None,
end: date | None,
start_time: time | None = None,
end_time: time | None = None,
weekdays: Iterable[int] | None = None
) -> Sequence[tuple[datetime, datetime]]: ...
[docs]
title = StringField(
label=_('Title'),
description=_('General availability'),
validators=[InputRequired()],
fieldset=_('Rule'))
[docs]
extend = RadioField(
label=_('Extend'),
validators=[InputRequired()],
fieldset=_('Rule'),
default='daily',
choices=(
('daily', _('Extend by one day at midnight')),
('monthly', _('Extend by one month at the end of the month')),
('yearly', _('Extend by one year at the end of the year'))
))
@cached_property
@cached_property
@cached_property
@property
[docs]
def rule(self) -> dict[str, Any]:
return {
'id': self.rule_id,
'title': self.title.data,
'extend': self.extend.data,
'options': self.options,
'iteration': self.iteration,
'last_run': self.last_run,
}
@rule.setter
def rule(self, value: dict[str, Any]) -> None:
# this is a little scary, maybe these shouldn't be
# cached properties and instead just attributes that
# get generated and pre-filled inside __init__
self.__dict__['rule_id'] = value['id']
self.__dict__['iteration'] = value['iteration']
self.__dict__['last_run'] = value['last_run']
self.title.data = value['title']
self.extend.data = value['extend']
for k, v in value['options'].items():
if hasattr(self, k):
if getattr(self, k) is not None:
getattr(self, k).data = v
@property
[docs]
def options(self) -> dict[str, Any]:
return {
k: getattr(self, k).data for k in self._fields
if k not in ('title', 'extend', 'csrf_token')
}
[docs]
def apply(self, resource: 'Resource') -> int:
if self.iteration == 0:
dates = self.dates
else:
match self.extend.data:
case 'daily':
end_offset = relativedelta(days=self.iteration)
case 'monthly':
end_offset = relativedelta(months=self.iteration)
case 'yearly':
end_offset = relativedelta(years=self.iteration)
case _:
raise AssertionError('unreachable')
start = self['end'].data + timedelta(days=1)
end = self['end'].data + end_offset
if 'start_time' in self:
start_time = self['start_time'].data
end_time = self['end_time'].data
else:
start_time = None
end_time = None
dates = self.generate_dates(
start,
end,
start_time=start_time,
end_time=end_time,
weekdays=self.weekdays
)
data = {**(self.data or {}), 'rule': self.rule_id}
return len(resource.scheduler.allocate(
dates=dates,
whole_day=self.whole_day,
quota=self.quota,
quota_limit=self.quota_limit,
data=data,
partly_available=self.partly_available,
skip_overlapping=True
))
[docs]
class AllocationForm(Form, AllocationFormHelpers):
""" Baseform for all allocation forms. Allocation forms are expected
to implement the methods above (which contain a NotImplementedException).
Have a look at :meth:`libres.db.scheduler.Scheduler.allocate` to find out
more about those values.
"""
if TYPE_CHECKING:
[docs]
except_for = MultiCheckboxField(
label=_('Except for'),
choices=WEEKDAYS,
coerce=int,
render_kw={
'prefix_label': False,
'class_': 'oneline-checkboxes'
},
fieldset=('Date')
)
[docs]
on_holidays = RadioField(
label=_('On holidays'),
choices=(
('yes', _('Yes')),
('no', _('No'))
),
default='yes',
fieldset=_('Date'))
[docs]
during_school_holidays = RadioField(
label=_('During school holidays'),
choices=(
('yes', _('Yes')),
('no', _('No'))
),
default='yes',
fieldset=_('Date'))
[docs]
access = RadioField(
label=_('Access'),
choices=(
('public', _('Public')),
('private', _('Only by privileged users')),
('member', _('Only by privileged users and members')),
),
default='public',
fieldset=_('Security')
)
[docs]
def on_request(self) -> None:
if not self.request.app.org.holidays:
self.delete_field('on_holidays')
if not self.request.app.org.has_school_holidays:
self.delete_field('during_school_holidays')
[docs]
def ensure_start_before_end(self) -> bool | None:
if self.start.data and self.end.data:
if self.start.data > self.end.data:
assert isinstance(self.start.errors, list)
self.start.errors.append(_('Start date before end date'))
return False
return None
@property
[docs]
def weekdays(self) -> list[int]:
""" The rrule weekdays derived from the except_for field. """
exceptions = set(self.except_for.data or ())
return [d[0] for d in WEEKDAYS if d[0] not in exceptions]
@cached_property
[docs]
def exceptions(self) -> 'DateContainer':
if not hasattr(self, 'request'):
return ()
if not self.on_holidays:
return ()
if self.on_holidays.data == 'yes':
return ()
return self.request.app.org.holidays
@cached_property
[docs]
def ranged_exceptions(self) -> 'Sequence[tuple[date, date]]':
if not hasattr(self, 'request'):
return ()
if not self.during_school_holidays:
return ()
if self.during_school_holidays.data == 'yes':
return ()
return tuple(self.request.app.org.school_holidays)
[docs]
def is_excluded(self, dt: datetime) -> bool:
date = dt.date()
if date in self.exceptions:
return True
for start, end in self.ranged_exceptions:
if start <= date <= end:
return True
return False
@property
[docs]
def dates(self) -> 'SequenceOrScalar[tuple[datetime, datetime]]':
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
raise NotImplementedError
@property
[docs]
def whole_day(self) -> bool:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
raise NotImplementedError
@property
[docs]
def partly_available(self) -> bool:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
raise NotImplementedError
@property
[docs]
def quota(self) -> int:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
raise NotImplementedError
@property
[docs]
def quota_limit(self) -> int:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
raise NotImplementedError
# FIXME: This collides with Form.data which is not ideal, we should
# probably choose a different name for this
@property
[docs]
def data(self) -> dict[str, Any]:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
return {'access': self.access.data}
[docs]
class AllocationEditForm(Form, AllocationFormHelpers):
""" Baseform for edit forms. Edit forms differ from the base allocation
form somewhat, since they don't offer a way to generate more than
one allocation at a time.
The dates property is therefore expected to return a single start, end
dates tuple.
"""
[docs]
access = RadioField(
label=_('Access'),
choices=(
('public', _('Public')),
('private', _('Only by privileged users')),
('member', _('Only by privileged users and members')),
),
default='public',
fieldset=_('Security')
)
# FIXME: same here
@property
[docs]
def data(self) -> dict[str, Any]:
""" Passed to :meth:`libres.db.scheduler.Scheduler.allocate`. """
return {'access': self.access.data}
[docs]
def apply_data(self, data: dict[str, Any] | None) -> None:
if data and 'access' in data:
self.access.data = data['access']
[docs]
class Daypasses:
[docs]
daypasses = IntegerField(
label=_('Daypasses'),
validators=[
InputRequired(),
NumberRange(1, 999)
],
fieldset=_('Daypasses')
)
[docs]
daypasses_limit = IntegerField(
label=_('Daypasses Limit'),
validators=[
InputRequired(),
NumberRange(0, 999)
],
fieldset=_('Daypasses')
)
[docs]
class DaypassAllocationForm(AllocationForm, Daypasses):
@property
@property
@property
[docs]
def dates(self) -> 'Sequence[tuple[datetime, datetime]]':
return self.generate_dates(
self.start.data,
self.end.data,
weekdays=self.weekdays
)
[docs]
class DaypassAllocationEditForm(AllocationEditForm, Daypasses):
@property
@property
@property
[docs]
def dates(self) -> tuple[datetime, datetime]:
assert self.date.data is not None
return (
sedate.as_datetime(self.date.data),
sedate.as_datetime(self.date.data) + timedelta(
days=1, microseconds=-1
)
)
[docs]
def apply_model(self, model: 'Allocation') -> None:
self.apply_data(model.data)
self.date.data = model.display_start().date()
self.daypasses.data = model.quota
self.daypasses_limit.data = model.quota_limit
[docs]
class RoomAllocationForm(AllocationForm):
[docs]
as_whole_day = RadioField(
label=_('Whole day'),
choices=[
('yes', _('Yes')),
('no', _('No'))
],
default='yes',
fieldset=_('Time')
)
[docs]
start_time = TimeField(
label=_('Each starting at'),
description=_('HH:MM'),
validators=[InputRequired()],
fieldset=_('Time'),
depends_on=('as_whole_day', 'no')
)
[docs]
end_time = TimeField(
label=_('Each ending at'),
description=_('HH:MM'),
validators=[InputRequired()],
fieldset=_('Time'),
depends_on=('as_whole_day', 'no')
)
[docs]
is_partly_available = RadioField(
label=_('May be partially reserved'),
choices=[
('yes', _('Yes')),
('no', _('No'))
],
default='no',
fieldset=_('Options'),
depends_on=('as_whole_day', 'no')
)
[docs]
per_time_slot = IntegerField(
label=_('Reservations per time slot'),
validators=[
InputRequired(),
NumberRange(1, 999)
],
fieldset=_('Options'),
default=1,
depends_on=('as_whole_day', 'no', 'is_partly_available', 'no')
)
@property
@property
@property
[docs]
def partly_available(self) -> bool:
if self.whole_day:
# Hiding a field will still pass the default, we catch it here
return False
return self.is_partly_available.data == 'yes'
@property
[docs]
def dates(self) -> 'Sequence[tuple[datetime, datetime]]':
return self.generate_dates(
self.start.data,
self.end.data,
self.start_time.data,
self.end_time.data,
self.weekdays
)
[docs]
class DailyItemFields:
[docs]
items = IntegerField(
label=_('Available items'),
validators=[
InputRequired(),
NumberRange(1, 999)
],
fieldset=_('Options'),
default=1,
)
[docs]
item_limit = IntegerField(
label=_('Reservations per time slot and person'),
validators=[
InputRequired(),
NumberRange(1, 999)
],
fieldset=_('Options'),
default=1,
)
[docs]
class DailyItemAllocationForm(AllocationForm, DailyItemFields):
@property
@property
@property
[docs]
def dates(self) -> 'Sequence[tuple[datetime, datetime]]':
return self.generate_dates(
self.start.data,
self.end.data,
weekdays=self.weekdays
)
[docs]
class DailyItemAllocationEditForm(AllocationEditForm, DailyItemFields):
@property
@property
@property
[docs]
def dates(self) -> tuple[datetime, datetime]:
assert self.date.data is not None
return (
sedate.as_datetime(self.date.data),
sedate.as_datetime(self.date.data) + timedelta(
days=1, microseconds=-1
)
)
[docs]
def apply_model(self, model: 'Allocation') -> None:
self.apply_data(model.data)
self.date.data = model.display_start().date()
self.items.data = model.quota
self.item_limit.data = model.quota_limit
[docs]
class RoomAllocationEditForm(AllocationEditForm):
[docs]
as_whole_day = RadioField(
label=_('Whole day'),
choices=[
('yes', _('Yes')),
('no', _('No'))
],
default='yes',
fieldset=_('Time')
)
[docs]
start_time = TimeField(
label=_('From'),
description=_('HH:MM'),
validators=[DataRequired()],
fieldset=_('Time'),
depends_on=('as_whole_day', 'no')
)
[docs]
end_time = TimeField(
label=_('Until'),
description=_('HH:MM'),
validators=[DataRequired()],
fieldset=_('Time'),
depends_on=('as_whole_day', 'no')
)
[docs]
per_time_slot = IntegerField(
label=_('Slots per Reservation'),
validators=[
InputRequired(),
NumberRange(1, 999)
],
fieldset=_('Options'),
default=1,
depends_on=('as_whole_day', 'no')
)
[docs]
def ensure_start_before_end(self) -> bool | None:
if self.whole_day:
return None
assert self.start_time.data is not None
assert self.end_time.data is not None
if self.start_time.data >= self.end_time.data:
assert isinstance(self.start_time.errors, list)
self.start_time.errors.append(_('Start time before end time'))
return False
return None
@property
@property
@property
@property
[docs]
def dates(self) -> tuple[datetime, datetime]:
if self.whole_day:
assert self.date.data is not None
return (
sedate.as_datetime(self.date.data),
sedate.as_datetime(self.date.data) + timedelta(
days=1, microseconds=-1
)
)
else:
return ( # type:ignore[return-value]
self.combine_datetime('date', 'start_time'),
self.combine_datetime('date', 'end_time'),
)
[docs]
def apply_dates(self, start: datetime, end: datetime) -> None:
self.date.data = start.date()
self.start_time.data = start.time()
self.end_time.data = end.time()
[docs]
def apply_model(self, model: 'Allocation') -> None:
self.apply_data(model.data)
self.apply_dates(model.display_start(), model.display_end())
self.as_whole_day.data = model.whole_day and 'yes' or 'no'
self.per_time_slot.data = model.quota
[docs]
def on_request(self) -> None:
if self.partly_available:
self.hide(self.as_whole_day)
self.hide(self.per_time_slot)