Source code for event.collections.events

import hashlib

from datetime import date
from datetime import datetime
from datetime import timedelta
from icalendar import Calendar as vCalendar
from icalendar.prop import vCategory
from lxml import etree
from lxml.etree import SubElement, CDATA
from markupsafe import escape
from onegov.core.collection import Pagination
from onegov.core.utils import increment_name
from onegov.core.utils import normalize_for_url
from onegov.event.models import Event
from onegov.gis import Coordinates
from pytz import UTC
from sedate import as_datetime
from sedate import replace_timezone
from sedate import standardize_date
from sedate import to_timezone
from sedate import utcnow
from sqlalchemy import and_
from sqlalchemy import or_
from uuid import uuid4


from typing import Any
from typing import IO
from typing import NamedTuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterable
    from collections.abc import Mapping
    from onegov.core.request import CoreRequest
    from onegov.event.models.event import EventState
    from sqlalchemy.orm import Query
    from sqlalchemy.orm import Session
    from typing import Self
    from uuid import UUID


[docs] class EventImportItem(NamedTuple):
[docs] event: Event
[docs] image: IO[bytes] | None
[docs] image_filename: str | None
[docs] pdf: IO[bytes] | None
[docs] pdf_filename: str | None
[docs] class EventCollection(Pagination[Event]): """ Manage a list of events. """ def __init__( self, session: 'Session', page: int = 0, state: 'EventState | None' = None ) -> None: super().__init__(page)
[docs] self.session = session
[docs] self.state = state
[docs] def __eq__(self, other: object) -> bool: return ( isinstance(other, self.__class__) and self.state == other.state and self.page == other.page )
[docs] def subset(self) -> 'Query[Event]': return self.query()
@property
[docs] def page_index(self) -> int: return self.page
[docs] def page_by_index(self, index: int) -> 'Self': return self.__class__(self.session, index, self.state)
[docs] def for_state(self, state: 'EventState | None') -> 'Self': """ Returns a new instance of the collection with the given state. """ return self.__class__(self.session, 0, state)
[docs] def query(self) -> 'Query[Event]': query = self.session.query(Event) if self.state: query = query.filter(Event.state == self.state) query = query.order_by(Event.start) return query
[docs] def _get_unique_name(self, name: str) -> str: """ Create a unique, URL-friendly name. """ # it's possible for `normalize_for_url` to return an empty string... name = normalize_for_url(name) or 'event' session = self.session while session.query(Event.name).filter(Event.name == name).first(): name = increment_name(name) return name
[docs] def add( self, title: str, start: datetime, end: datetime, timezone: str, autoclean: bool = True, **optional: Any ) -> Event: """ Add a new event. A unique, URL-friendly name is created automatically for this event using the title and optionally numbers for duplicate names. Every time a new event is added, old, stale events are deleted unless disabled with the ``autoclean`` option. Returns the created event or None if already automatically deleted. """ if not start.tzinfo: start = replace_timezone(start, timezone) if not end.tzinfo: end = replace_timezone(end, timezone) event = Event( state='initiated', title=title, start=start, end=end, timezone=timezone, name=self._get_unique_name(title), **optional ) self.session.add(event) self.session.flush() if autoclean: self.remove_stale_events() return event
[docs] def delete(self, event: Event) -> None: """ Delete an event. """ self.session.delete(event) self.session.flush()
[docs] def remove_stale_events(self, max_stale: datetime | None = None) -> None: """ Remove events which have never been submitted and are created more than five days ago. """ if max_stale is None: max_stale = utcnow() - timedelta(days=5) events = self.session.query(Event).filter( Event.state == 'initiated', and_( or_(Event.created < max_stale, Event.created.is_(None)), or_(Event.modified < max_stale, Event.modified.is_(None)) ) ) for event in events: self.session.delete(event) self.session.flush()
[docs] def by_name(self, name: str) -> Event | None: """ Returns an event by its URL-friendly name. """ query = self.session.query(Event).filter(Event.name == name) return query.first()
[docs] def by_id(self, id: 'UUID') -> Event | None: """ Return an event by its id. Hex representations work as well. """ query = self.session.query(Event).filter(Event.id == id) return query.first()
[docs] def from_import( self, items: 'Iterable[EventImportItem | str]', purge: str | None = None, publish_immediately: bool = True, valid_state_transfers: 'Mapping[str, str] | None' = None, published_only: bool = False, future_events_only: bool = False ) -> tuple[list[Event], list[Event], list['UUID']]: """ Add or updates the given events. Only updates events which have changed. Uses ``Event.source_updated`` if available, falls back to comparing all relevant attributes. Doesn't change the states of events allowing to permanently withdraw imported events. :param items: A list of ``EventImportItem``'s or event sources to keep from purging. :param purge: Optionally removes all events with the given meta-source-prefix not present in the given events. :param publish_immediately: Set newly added events to published, else let them be initiated. :param valid_state_transfers: Dict of existing : remote state should be considered when updating. Example: {'published': 'withdrawn'} would update locally published events if the remote has been withdrawn. for any {'valid_state': 'withdrawn'} that lead to an update of the local state, an existing ticket will be close automatically. Be aware, that transferring state and creating tickets might lead to inconsistencies. So adjust the script in that case to handle the tickets automatically. :param published_only: Do not import unpublished events. Still do not ignore state like withdrawn. :param future_events_only: If set only events in the future will be imported """ if purge: query = self.session.query(Event.meta['source'].label('source')) query = query.filter(Event.meta['source'].astext.startswith(purge)) purged = {r.source for r in query} else: purged = set() added = [] updated = [] valid_state_transfers = valid_state_transfers or {} for item in items: if isinstance(item, str): purged = {x for x in purged if not x.startswith(item)} continue # skip past events if option is set if future_events_only and item.event.end < utcnow(): continue event = item.event existing = self.session.query(Event).filter( Event.meta['source'] == event.meta['source'] ).first() if purge: purged -= {event.source} if existing: update_state = valid_state_transfers.get( existing.state) == event.state if existing.source_updated: changed = existing.source_updated != event.source_updated else: # No information on provided, figure it out ourselves! image_changed = ( (existing.image and not item.image) or (not existing.image and item.image) ) if existing.image and item.image: image_changed = ( existing.image.checksum != hashlib.new( 'md5', item.image.read(), usedforsecurity=False ).hexdigest() ) item.image.seek(0) changed = True if ( existing.title != event.title or existing.location != event.location or set(existing.tags) != set(event.tags) or existing.filter_keywords != event.filter_keywords or existing.timezone != event.timezone or existing.start != event.start or existing.end != event.end or existing.content != event.content or existing.coordinates != event.coordinates or existing.recurrence != event.recurrence or image_changed ) else False if changed: state = existing.state # avoid updating occurrences existing.state = 'initiated' existing.title = event.title existing.location = event.location existing.tags = event.tags existing.filter_keywords = event.filter_keywords existing.timezone = event.timezone existing.start = event.start existing.end = event.end existing.content = event.content existing.coordinates = event.coordinates existing.recurrence = event.recurrence existing.state = state existing.set_image(item.image, item.image_filename) existing.set_pdf(item.pdf, item.pdf_filename) if update_state: existing.state = event.state if changed or update_state: updated.append(existing) else: if published_only and event.state != 'published': continue event.id = uuid4() event.name = self._get_unique_name(event.title) event.state = 'initiated' event.set_image(item.image, item.image_filename) event.set_pdf(item.pdf, item.pdf_filename) self.session.add(event) if publish_immediately: event.submit() event.publish() added.append(event) purged_event_ids = [] if purged: query = self.session.query(Event) query = query.filter(Event.meta['source'].in_(purged)) for event in query: event.state = 'withdrawn' # remove occurrences purged_event_ids.append(event.id) self.session.delete(event) self.session.flush() return added, updated, purged_event_ids
[docs] def from_ical( self, ical: str, future_events_only: bool = False, event_image: 'IO[bytes] | None' = None, event_image_name: str | None = None, default_categories: list[str] | None = None, default_filter_keywords: dict[str, list[str]] | None = None ) -> tuple[list[Event], list[Event], list['UUID']]: """ Imports the events from an iCalender string. We assume the timezone to be Europe/Zurich! :param ical: ical to be imported :type ical: str :param future_events_only: if set only events in the future will be imported :type future_events_only: bool :param event_image: image file :param event_image_name: image name :type event_image_name: str :param default_categories: categories applied if non is given in ical :type default_categories: [str] :param default_filter_keywords: default filter keywords, see event filter settings app.org.event_filter_type :type default_filter_keywords: dict(str, [str] | None) """ items = [] if default_categories: assert isinstance(default_categories, list) cal = vCalendar.from_ical(ical) for vevent in cal.walk('vevent'): timezone = 'Europe/Zurich' start = vevent.get('dtstart') start = start.dt if start else None if type(start) is date: start = replace_timezone(as_datetime(start), timezone) elif type(start) is datetime: if start.tzinfo is None: start = standardize_date(start, timezone) else: start = to_timezone(start, UTC) end = vevent.get('dtend') end = end.dt if end else None if type(end) is date: end = replace_timezone(as_datetime(end), timezone) end = end + timedelta(days=1, minutes=-1) elif type(end) is datetime: if end.tzinfo is None: end = standardize_date(end, timezone) else: end = to_timezone(end, UTC) duration = vevent.get('duration') duration = duration.dt if duration else None if start and not end and duration: end = start + duration if start and not end: # assume event duration is 1 hour end = start + timedelta(hours=1) if not start or not end: raise (ValueError('Invalid date')) recurrence = vevent.get('rrule', '') if recurrence: recurrence = 'RRULE:{}'.format(recurrence.to_ical(). decode()) coordinates = vevent.get('geo') if coordinates: coordinates = Coordinates( coordinates.latitude, coordinates.longitude ) if default_categories is None: default_categories = [] tags = (vevent.get('categories') or vCategory(default_categories)) if tags: # categories may be in lists or they may be single values # whose 'cats' member contains the texts if ( not hasattr(tags, '__iter__') # v6 added an __iter__ method to vCategory # but it's not what we want to ierate over or isinstance(tags, vCategory) ): tags = [tags] tags = [ str(c) for tag in tags for c in tag.cats if c ] uid = str(vevent.get('uid', '')) title = str(escape(vevent.get('summary', ''))) description = str(escape(vevent.get('description', ''))) organizer = str(escape(vevent.get('organizer', ''))) location = str(escape(vevent.get('location', ''))) items.append( EventImportItem( event=Event( # type:ignore[misc] state='initiated', title=title, start=start, end=end, timezone=timezone, recurrence=recurrence, description=description, organizer=organizer, location=location, coordinates=coordinates, tags=tags or [], filter_keywords=default_filter_keywords, source=f'ical-{uid}', ), image=event_image, image_filename=event_image_name, pdf=None, pdf_filename=None, ) ) return self.from_import(items, publish_immediately=True, future_events_only=future_events_only)
[docs] def as_anthrazit_xml( self, request: 'CoreRequest', future_events_only: bool = True ) -> str: """ Returns all published occurrences as xml for Winterthur. Anthrazit format according https://doc.anthrazit.org/ext/XML_Schnittstelle:: <?xml version="1.0" encoding="UTF-8" standalone="yes"?> <import partner="???" partnerid"???" passwort"???" importid="??"> <item status="1" suchbar="1" mutationsdatum="2023-08-18 08:23:30"> <id>01</id> <titel>Titel der Seite</titel> <textmobile>2-3 Sätze des Text Feldes packed in CDATA</textmobile> <termin allday="1"> <von>2011-08-06 00:00:00</von> <bis>2011-08-06 23:59:00</bis> </termin> <termin> ... </termin> <url_web>url</url_web> <url_bild>bild</url_bild> <hauptrubrik name="Naturmusuem"> <rubrik>tag_1</rubrik> <rubrik>tag_2</rubrik> </hauptrubrik> <email></email> <telefon1></telefon1> <sf01>Veranstaltungspreis packed in CDATA</sf01> <veranstaltungsort> <title></title> <longitude></longitude> <latitude></latitude> </veranstaltungsort> ... </item> <item> ... </item> </import> :param future_events_only: if set, only future events will be returned, all events otherwise :rtype: str :return: xml string """ xml = ('<import partner="" partnerid="" passwort="" importid="">' '</import>') root = etree.XML(xml) query = self.session.query(Event) for e in query: if e.state != 'published': continue if future_events_only and not e.future_occurrences().all(): continue last_change = e.last_change.strftime('%Y-%m-%d %H:%M:%S') event = SubElement(root, 'item', attrib={ 'status': '1', 'suchbar': '1', 'mutationsdatum': last_change, }) id = SubElement(event, 'id') id.text = str(e.id) title = SubElement(event, 'titel') title.text = e.title text_mobile = SubElement(event, 'textmobile') if e.description: desc = e.description if len(e.description) > 10000: desc = e.description[:9995] + '..' text_mobile.text = CDATA( # type: ignore[assignment] desc.replace('\r\n', '<br>')) for occ in e.occurrences: termin = SubElement(event, 'termin') event_start = SubElement(termin, 'von') event_start.text = ( str(occ.localized_start.replace(tzinfo=None))) event_end = SubElement(termin, 'bis') event_end.text = str(occ.localized_end.replace(tzinfo=None)) if e.price: price = SubElement(event, 'sf01') price.text = CDATA( # type: ignore[assignment] e.price.replace('\r\n', '<br>')) if e.external_event_url: url = SubElement(event, 'url_web') url.text = e.external_event_url if e.event_registration_url: registration = SubElement(event, 'url_web') registration.text = e.event_registration_url if e.image: image = SubElement(event, 'url_bild') image.text = request.link(e.image) hr_text = '' tags = [] if e.tags: tags = e.tags if e.filter_keywords: for k, v in e.filter_keywords.items(): if k in ['kalender']: assert isinstance(v, str) hr_text = v else: if isinstance(v, list): tags.extend(v) else: tags.append(v) top_category = SubElement(event, 'hauptrubrik', attrib={'name': hr_text} if hr_text else None) for tag in tags: category = SubElement(top_category, 'rubrik') category.text = tag if e.organizer_email: email = SubElement(event, 'email') email.text = e.organizer_email if e.organizer_phone: phone = SubElement(event, 'telefon1') phone.text = e.organizer_phone location = SubElement(event, 'veranstaltungsort') location_title = SubElement(location, 'titel') location_title.text = e.location if e.coordinates: assert isinstance(e.coordinates, Coordinates) longitude = SubElement(location, 'longitude') longitude.text = str(e.coordinates.lon) latitude = SubElement(location, 'latitude') latitude.text = str(e.coordinates.lat) return etree.tostring(root, encoding='utf-8', xml_declaration=True, pretty_print=True).decode()