from __future__ import annotations
import warnings
from datetime import datetime
from dateutil import rrule
from dateutil.rrule import rrulestr
from icalendar import Calendar as vCalendar
from icalendar import Event as vEvent
from icalendar import vRecur
from onegov.core.orm import Base
from onegov.core.orm.abstract import associated
from onegov.core.orm.mixins import content_property
from onegov.core.orm.mixins import meta_property
from onegov.core.orm.mixins import TimestampMixin
from onegov.core.orm.types import UUID
from onegov.event.models.mixins import OccurrenceMixin
from onegov.event.models.occurrence import Occurrence
from onegov.file import File
from onegov.file.utils import as_fileintent
from onegov.gis import Coordinates
from onegov.gis import CoordinatesMixin
from onegov.search import SearchableContent
from PIL.Image import DecompressionBombError
from pytz import UTC
from sedate import standardize_date
from sedate import to_timezone, utcnow
from sqlalchemy import and_
from sqlalchemy import Column
from sqlalchemy import desc
from sqlalchemy import Enum
from sqlalchemy import Text
from sqlalchemy.orm import object_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from uuid import uuid4
from typing import IO
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import uuid
from collections.abc import Iterator
from onegov.core.orm.mixins import dict_property
from onegov.core.request import CoreRequest
from sqlalchemy.orm import Query
from typing import Literal
from typing import TypeAlias
[docs]
EventState: TypeAlias = Literal[
'initiated',
'submitted',
'published',
'withdrawn'
]
[docs]
class EventFile(File):
[docs]
__mapper_args__ = {'polymorphic_identity': 'eventfile'}
[docs]
class Event(Base, OccurrenceMixin, TimestampMixin, SearchableContent,
CoordinatesMixin):
""" Defines an event.
Occurrences are stored in a seperate table containing only a minimal set
of attributes from the event. This could also be archieved using postgres
directly with dateutil/plpythonu/pg_rrule and materialized views.
Occurrences are only created/updated, if the event is published.
Occurrences are created only for this and the next year.
"""
[docs]
__tablename__ = 'events'
[docs]
occurrence_dates_year_limit = 2
#: Internal number of the event
[docs]
id: Column[uuid.UUID] = Column(
UUID, # type:ignore[arg-type]
primary_key=True,
default=uuid4
)
#: State of the event
[docs]
state: Column[EventState] = Column(
Enum( # type: ignore[arg-type]
'initiated', 'submitted', 'published', 'withdrawn',
name='event_state'
),
nullable=False,
default='initiated'
)
#: description of the event
[docs]
description: dict_property[str | None] = content_property()
#: the event organizer
[docs]
organizer: dict_property[str | None] = content_property()
#: the event organizer's public e-mail address
[docs]
organizer_email: dict_property[str | None] = content_property()
#: the event organizer's phone number
[docs]
organizer_phone: dict_property[str | None] = content_property()
#: an external url for the event
[docs]
external_event_url: dict_property[str | None] = content_property()
#: an external url for the event
[docs]
event_registration_url: dict_property[str | None] = content_property()
#: the price of the event (a text field, not an amount)
[docs]
price: dict_property[str | None] = content_property()
#: the source of the event, if imported
[docs]
source: dict_property[str | None] = meta_property()
#: when the source of the event was last updated (if imported)
[docs]
source_updated: dict_property[str | None] = meta_property()
#: Recurrence of the event (RRULE, see RFC2445)
[docs]
recurrence: Column[str | None] = Column(Text, nullable=True)
#: The access property of the event, taken from onegov.org. Not ideal to
#: have this defined here, instead of using an AccessExtension, but that
#: would only be possible with deeper changes to the Event model.
[docs]
access: dict_property[str] = meta_property(default='public')
#: The associated image
[docs]
image = associated(
EventFile, 'image', 'one-to-one', uselist=False, backref_suffix='image'
)
#: The associated PDF
[docs]
pdf = associated(
EventFile, 'pdf', 'one-to-one', uselist=False, backref_suffix='pdf'
)
[docs]
def set_image(
self,
content: bytes | IO[bytes] | None,
filename: str | None = None
) -> None:
self.set_blob('image', content, filename)
[docs]
def set_pdf(
self,
content: bytes | IO[bytes] | None,
filename: str | None = None
) -> None:
self.set_blob('pdf', content, filename)
[docs]
def set_blob(
self,
blob: str,
content: bytes | IO[bytes] | None,
filename: str | None = None
) -> None:
""" Adds or removes the given blob. """
filename = filename or 'file'
if not content:
setattr(self, blob, None)
elif getattr(self, blob):
getattr(self, blob).reference = as_fileintent(content, filename)
else:
try:
setattr(self, blob, EventFile( # type: ignore[misc]
name=filename,
reference=as_fileintent(content, filename)
))
except DecompressionBombError:
setattr(self, blob, None)
#: Occurrences of the event
[docs]
occurrences: relationship[list[Occurrence]] = relationship(
'Occurrence',
cascade='all, delete-orphan',
back_populates='event',
lazy='joined',
)
[docs]
es_properties = {
'title': {'type': 'localized'},
'description': {'type': 'localized'},
'location': {'type': 'localized'},
'organizer': {'type': 'localized'},
'filter_keywords': {'type': 'keyword'}
}
@property
[docs]
def es_public(self) -> bool:
return self.state == 'published'
@property
[docs]
def es_skip(self) -> bool:
return self.state != 'published' or getattr(self, '_es_skip', False)
[docs]
def source_url(self, request: CoreRequest) -> str | None:
""" Returns an url pointing to the external event if imported. """
if not self.source or not self.source.startswith('guidle'):
return None
guidle_id = self.source.rsplit('-', 1)[-1].split('.', 1)[0]
return f'https://www.guidle.com/angebote/{guidle_id}'
[docs]
def __setattr__(self, name: str, value: object) -> None:
""" Automatically update the occurrences if shared attributes change
"""
super().__setattr__(name, value)
if name in ('state', 'title', 'name', 'location', 'tags',
'filter_keywords', 'start', 'end', 'timezone',
'recurrence'):
self._update_occurrences()
@property
[docs]
def base_query(self) -> Query[Occurrence]:
session = object_session(self)
return session.query(Occurrence).filter_by(event_id=self.id)
@property
[docs]
def latest_occurrence(self) -> Occurrence | None:
""" Returns the occurrence which is presently occurring, the next
one to occur or the last occurrence.
"""
now = utcnow()
base = self.base_query
current = base.filter(and_(
Occurrence.start <= now,
Occurrence.end >= now
)).order_by(Occurrence.start).limit(1)
future = base.filter(
Occurrence.start >= now
).order_by(Occurrence.start).limit(1)
past = base.filter(
Occurrence.end <= now
).order_by(desc(Occurrence.start))
return current.union_all(future, past).first()
[docs]
def future_occurrences(
self,
offset: int = 0,
limit: int = 10
) -> Query[Occurrence]:
return self.base_query.filter(
Occurrence.start >= utcnow()
).order_by(Occurrence.start).offset(offset).limit(limit)
@validates('recurrence')
[docs]
def validate_recurrence(self, key: str, r: str | None) -> str | None:
""" Our rrules are quite limited in their complexity. This validator
makes sure that is actually the case.
This is a somewhat harsh limit, but it mirrors the actual use of
onegov.event at this point. More complex rrules are not handled by the
UI, nor is there currently a plan to do so.
Currently supported are weekly recurrences and lists of rdates.
The rational is that people commonly add recurring events on a weekly
basis (which is a lot of work for a whole year). Or on a monthly
or yearly basis, in which case selection of single dates is
acceptable, or even preferrable to complex rrules.
This UI talk doesn't belong into a module of course, but it is again
a reailty that only a strict subset of rules is handled and so we want
to catch events which we cannot edit in our UI early if they are
imported from outside.
"""
if r:
rule = rrulestr(r)
# a rule must either have a frequency or be a list of rdates
if not hasattr(rule, '_freq'):
if all(l.startswith('RDATE') for l in r.splitlines()):
return r
raise RuntimeError(f"'{r}' is too complex")
# we also only do weekly recurrences (they can also be used
# to do daily recurrences if they are set to include all days)
if not rule._freq == rrule.WEEKLY:
raise RuntimeError(f"The frequency of '{r}' is not WEEKLY")
# we require a definite end
until: datetime | None = getattr(rule, '_until', None)
if until is None:
raise RuntimeError(f"'{r}' has no UNTIL")
# we also want the end date to be timezone-aware
if until.tzinfo is None:
raise RuntimeError(f"'{r}''s UNTIL is not timezone-aware")
return r
[docs]
def occurrence_dates(
self,
limit: bool = True,
localize: bool = False
) -> list[datetime]:
""" Returns the start dates of all occurrences.
Returns non-localized dates per default. Limits the occurrences per
default to this and the next year.
"""
def to_local(dt: datetime, timezone: str) -> datetime:
if dt.tzinfo:
return to_timezone(dt, timezone).replace(tzinfo=None)
return dt
dates = [self.start]
if self.recurrence:
# Make sure the RRULE uses local dates (or else the DST is wrong)
start_local = to_local(self.start, self.timezone)
try:
rule = rrulestr(self.recurrence, dtstart=self.start)
if dtstart := getattr(rule, '_dtstart', None):
rule._dtstart = to_local( # type: ignore[union-attr]
dtstart,
self.timezone
)
if until := getattr(rule, '_until', None):
rule._until = to_local(until, self.timezone) # type:ignore
rule = rrulestr(str(rule))
except ValueError:
# This might happen if only RDATEs and EXDATEs are present
rule = rrulestr(self.recurrence, dtstart=start_local)
# Make sure, the RDATEs and EXDATEs contain the start times
for attribute in ('_exdate', '_rdate'):
if hasattr(rule, attribute):
setattr(rule, attribute, [
to_local(date_, self.timezone).replace(
hour=start_local.hour, minute=start_local.minute
)
for date_ in getattr(rule, attribute)
])
# Generate the occurences and convert to UTC
dates = [standardize_date(date_, self.timezone) for date_ in rule]
# Make sure the start date is port of the reucrrence
if self.start not in dates:
dates.append(self.start)
dates.sort()
if localize:
dates = [to_timezone(date_, self.timezone) for date_ in dates]
if limit:
max_year = datetime.today().year + self.occurrence_dates_year_limit
dates = [date_ for date_ in dates if date_.year <= max_year]
return dates
[docs]
def spawn_occurrence(self, start: datetime) -> Occurrence:
""" Create an occurrence at the given date, without storing it. """
end = start + (self.end - self.start)
name = f'{self.name}-{start.date().isoformat()}'
return Occurrence( # type:ignore[misc]
title=self.title,
name=name,
location=self.location,
tags=self.tags,
filter_keywords=self.filter_keywords,
start=start,
end=end,
timezone=self.timezone,
)
@property
[docs]
def virtual_occurrence(self) -> Occurrence:
""" Before the event is accepted, there are no real occurrences stored
in the database.
At this time it is useful to be able to generate the latest occurence
without storing it.
"""
for start in self.occurrence_dates(limit=False):
occurrence = self.spawn_occurrence(start)
occurrence.event = self
with warnings.catch_warnings():
warnings.filterwarnings(
'ignore', 'Object of type <Occurrence> not in session')
session = object_session(self)
session.expunge(occurrence)
session.flush()
return occurrence
raise AssertionError('unreachable')
[docs]
def _update_occurrences(self) -> None:
""" Updates the occurrences.
Removes all occurrences if the event is not published or no start and
end date/time is set. Only occurrences for this and next year are
created.
"""
# clear old occurrences
self.occurrences = []
# do not create occurrences unless the event is published
if self.state != 'published':
return
# do not create occurrences unless start and end is set
if not self.start or not self.end:
return
# create all occurrences for this and next year
for start in self.occurrence_dates():
self.occurrences.append(self.spawn_occurrence(start))
for occ in self.occurrences:
occ.filter_keywords = self.filter_keywords
[docs]
def submit(self) -> None:
""" Submit the event. """
assert self.state == 'initiated'
self.state = 'submitted'
[docs]
def publish(self) -> None:
""" Publish the event.
Publishing the event will generate the occurrences.
"""
assert self.state == 'submitted' or self.state == 'withdrawn'
self.state = 'published'
[docs]
def withdraw(self) -> None:
""" Withdraw the event.
Withdraw the event will delete the occurrences."""
assert self.state in ('submitted', 'published')
self.state = 'withdrawn'
[docs]
def get_ical_vevents(self, url: str | None = None) -> Iterator[vEvent]:
""" Returns the event and all its occurrences as icalendar objects.
If the calendar has a bunch of RDATE's instead of a proper RRULE, we
return every occurrence as separate event since most calendars don't
support RDATE's.
"""
modified = self.modified or self.created or utcnow()
rrule = None
if self.recurrence:
rrule = vRecur.from_ical(self.recurrence.replace('RRULE:', ''))
for dtstart in self.occurrence_dates():
dtstart = to_timezone(dtstart, UTC)
dtend = dtstart + (self.end - self.start)
vevent = vEvent()
vevent.add('uid', f'{self.name}-{dtstart.date()}@onegov.event')
vevent.add('summary', self.title)
vevent.add('dtstart', dtstart)
vevent.add('dtend', dtend)
vevent.add('last-modified', modified)
vevent.add('dtstamp', modified)
vevent.add('location', self.location)
vevent.add('description', self.description)
vevent.add('categories', self.tags)
if rrule:
vevent.add('rrule', rrule)
if url:
vevent.add('url', url)
if self.coordinates:
assert isinstance(self.coordinates, Coordinates)
vevent.add('geo', (self.coordinates.lat, self.coordinates.lon))
yield vevent
if rrule:
break
[docs]
def as_ical(self, url: str | None = None) -> bytes:
""" Returns the event and all its occurrences as iCalendar string.
"""
vcalendar = vCalendar()
vcalendar.add('prodid', '-//OneGov//onegov.event//')
vcalendar.add('version', '2.0')
for vevent in self.get_ical_vevents(url):
vcalendar.add_component(vevent)
return vcalendar.to_ical()