from __future__ import annotations
from bleach.sanitizer import Cleaner
from functools import cached_property
from datetime import timedelta
from dateutil.parser import parse
from onegov.gis import Coordinates
from sedate import replace_timezone
from typing import overload
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from collections.abc import Iterator
from collections.abc import Mapping
from collections.abc import Sequence
from lxml.etree import _Element
from onegov.gis.models.coordinates import RealCoordinates
from typing import TypeVar
[docs]
class GuidleBase:
""" Base class for parsing guidle exports containing general helpers. """
def __init__(self, root: _Element) -> None:
[docs]
self.nsmap = {'guidle': 'http://www.guidle.com'}
[docs]
self.cleaner = Cleaner(
tags=[],
attributes={},
strip=True,
filters=[]
)
[docs]
def find(
self,
path: str,
root: _Element | None = None
) -> list[_Element]:
""" Gets the elements with the given path. """
root = root if root is not None else self.root
return root.xpath(path, namespaces=self.nsmap) or []
@overload
[docs]
def get(
self,
path: str,
root: _Element | None = None,
joiner: str = ' ',
parser: None = None
) -> str: ...
@overload
def get(
self,
path: str,
root: _Element | None = None,
joiner: str = ' ',
*,
parser: Callable[[str], _T]
) -> _T | None: ...
def get(
self,
path: str,
root: _Element | None = None,
joiner: str = ' ',
parser: Callable[[str], _T] | None = None
) -> _T | str | None:
""" Returns the text of the elements with the given path.
Allows to specifiy a joining character and optionally a parser. If no
parser is given, the text is HTML-cleaned.
"""
result = joiner.join(
(element.text or '').strip()
for element in self.find(path, root)
).strip()
if parser:
return parser(result) if result else None
else:
return self.cleaner.clean(result) if result else ''
[docs]
def join(self, texts: Sequence[str], joiner: str = ', ') -> str:
""" Joins a set of text, skips duplicate and empty texts while
preserving the order. """
deduplicated = []
for text in texts:
if text and text not in deduplicated:
deduplicated.append(text)
retval = joiner.join(deduplicated)
return retval
[docs]
class GuidleExportData(GuidleBase):
""" Represents a whole guidle export. """
[docs]
def offers(self) -> Iterator[GuidleOffer]:
for offer in self.find('.//guidle:offer'):
yield GuidleOffer(offer)
[docs]
class GuidleOffer(GuidleBase):
""" Represents a single offer containing some description and dates. """
@cached_property
[docs]
def uid(self) -> str | None:
return self.root.get('id')
@cached_property
[docs]
def last_update(self) -> str:
return self.get('guidle:lastUpdateDate')
@cached_property
[docs]
def title(self) -> str:
title = self.get('guidle:offerDetail/guidle:title')
# titles are rendered as unsafe html downstream, so we can
# losen the rules one tiny bit here.
# we'll still have '>,' and '<', but those are probably
# used very rarely in a title, so we can ignore that
title = title.replace('&', '&')
return title
@cached_property
[docs]
def description(self) -> str:
short = self.get('guidle:offerDetail/guidle:shortDescription')
long = self.get('guidle:offerDetail/guidle:longDescription')
if long.startswith(short):
short = ''
return self.join((
short,
long,
self.get('guidle:offerDetail/guidle:openingHours'),
self.get('guidle:offerDetail/guidle:externalLink'),
self.get('guidle:offerDetail/guidle:homepage'),
self.get('guidle:offerDetail/guidle:ticketingUrl')
), '\n\n')
@cached_property
[docs]
def price(self) -> str:
return self.get('guidle:offerDetail/guidle:priceInformation')
@cached_property
[docs]
def organizer(self) -> str:
return self.join((
self.get('guidle:contact/guidle:company'),
self.get('guidle:contact/guidle:name'),
self.get('guidle:contact/guidle:telephone_1'),
))
@cached_property
[docs]
def organizer_email(self) -> str:
return self.get('guidle:contact/guidle:email')
@cached_property
[docs]
def location(self) -> str:
return self.join((
self.get('guidle:address/guidle:company'),
self.get('guidle:address/guidle:street'),
' '.join((
self.get('guidle:address/guidle:zip'),
self.get('guidle:address/guidle:city'),
)).strip(),
))
[docs]
def image(self, size: str) -> tuple[str, str] | tuple[None, None]:
""" Returns the image url for the offer with the given size, together
with the filename, or (None, None).
"""
xpath = (
f"guidle:offerDetail/"
f"guidle:images/"
f"guidle:image/"
f"guidle:size[@label='{size}']"
)
images = self.find(f'({xpath})[1]')
if not len(images):
return None, None
url = images[0].attrib['url']
return url, url.rsplit('/', 1)[-1]
[docs]
def pdf(self) -> tuple[str, str] | tuple[None, None]:
""" Returns the first attachment that is a pdf, together with
the filename, or (None, None).
"""
for attachment in self.find('guidle:offerDetail//guidle:attachment'):
url = self.get('guidle:url', root=attachment)
if not url.endswith('.pdf'):
return None, None
name = self.get('guidle:description', root=attachment)
name = name.strip().split('\n')[0]
return url, f'{name}.pdf'
return None, None
@cached_property
[docs]
def coordinates(self) -> RealCoordinates | None:
lat = self.get('guidle:address/guidle:latitude', parser=float)
lon = self.get('guidle:address/guidle:longitude', parser=float)
if lat is not None and lon is not None:
return Coordinates(lat, lon)
return None
[docs]
def schedules(self) -> Iterator[GuidleScheduleDate]:
for schedule in self.find('guidle:schedules/guidle:date'):
yield GuidleScheduleDate(schedule)
[docs]
class GuidleScheduleDate(GuidleBase):
""" Represents a single schedule date of an offer. """
def __init__(self, root: _Element) -> None:
super().__init__(root)
# Parse start date, end date and recurrence
start = self.get('guidle:startDate', parser=parse)
end = self.get('guidle:endDate', parser=parse)
recurrence = ''
weekdays = self.get('guidle:weekdays/guidle:day', joiner=',')
if weekdays:
recurrence = f'RRULE:FREQ=WEEKLY;BYDAY={weekdays.upper()}'
if end and not recurrence:
if start != end:
until = end + timedelta(days=1)
recurrence = (
f'RRULE:FREQ=WEEKLY;'
f'BYDAY=MO,TU,WE,TH,FR,SA,SU;'
f'UNTIL={until:%Y%m%dT%H%M00Z}'
)
end = start
else:
if recurrence:
if not end:
raise AssertionError('End date is required if recurrence '
'is set for event')
until = end + timedelta(days=1)
recurrence += f';UNTIL={until:%Y%m%dT%H%M00Z}'
end = start
# Parse start and end times
start_time = self.get(
'guidle:startTime', parser=lambda x: parse(x, default=start)
)
end_time = self.get(
'guidle:endTime', parser=lambda x: parse(x, default=end)
)
if start_time == end_time and recurrence:
start_time = None
end_time = None
if start_time:
start = start_time
if end_time:
end = end_time
assert end is not None and start is not None
if end <= start:
end = end + timedelta(days=1) - timedelta(microseconds=1)
# Add timezones
if start:
start = replace_timezone(start, self.timezone)
if end:
end = replace_timezone(end, self.timezone)
[docs]
self.recurrence = recurrence
@cached_property
[docs]
def timezone(self) -> str:
return 'Europe/Zurich'