Source code for winterthur.roadwork

import isodate
import pycurl
import sedate

from datetime import datetime, timedelta
from dogpile.cache.api import NO_VALUE
from functools import cached_property
from io import BytesIO
from onegov.core.custom import json
from operator import attrgetter
from pathlib import Path
from purl import URL
from sedate import utcnow


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from onegov.core.cache import RedisCacheRegion
    from typing import Self


[docs] class RoadworkError(Exception): pass
[docs] class RoadworkConnectionError(RoadworkError): pass
[docs] class RoadworkConfig: """ Looks at ~/.pdb.secret and /etc/pdb.secret (in this order), to extract the configuration used for the RoadworkClient class. The configuration is as follows:: HOSTNAME: pdb.example.org ENDPOINT: 127.0.0.1:6004 USERNAME: username PASSWORD: password * The HOSTNAME is the address of the PDB service. * The ENDPOINT is the optional address of the tcp-proxy used. * The USERNAME is the NTLM password. * The PASSWORD is the NTLM password. """ def __init__( self, hostname: str | None, endpoint: str | None, username: str | None, password: str | None ) -> None:
[docs] self.hostname = hostname
[docs] self.endpoint = endpoint
[docs] self.username = username
[docs] self.password = password
@classmethod
[docs] def lookup_paths(cls) -> 'Iterator[Path]': yield Path('~/.pdb.secret').expanduser() yield Path('/etc/pdb.secret')
@classmethod
[docs] def lookup(cls) -> 'Self': for path in cls.lookup_paths(): if path.exists(): return cls(**cls.parse(path)) paths = ', '.join(str(p) for p in cls.lookup_paths()) raise RoadworkError( f'No pdb configuration found in {paths}')
@classmethod
[docs] def parse(cls, path: Path) -> dict[str, str | None]: result: dict[str, str | None] = { 'hostname': None, 'endpoint': None, 'username': None, 'password': None, } with path.open('r') as file: for line in file: line = line.strip() if not line: continue if ':' not in line: continue if line.startswith('#'): continue k, v = line.split(':', maxsplit=1) k = k.strip().lower() v = v.strip() if k in result: result[k] = v return result
[docs] class RoadworkClient: """ A proxy to Winterthur's internal roadworks service. Uses redis as a caching mechanism to ensure performance and reliability. Since the roadworks service can only be reached inside Winterthur's network, we rely on a proxy connection during development/testing. To not expose any information unwittingly to the public, the description of how to connect to that proxy is kept at docs.seantis.ch. """ def __init__( self, cache: 'RedisCacheRegion', hostname: str, username: str, password: str, endpoint: str | None = None ) -> None:
[docs] self.cache = cache
[docs] self.hostname = hostname
[docs] self.username = username
[docs] self.password = password
[docs] self.endpoint = endpoint or hostname
@cached_property
[docs] def curl(self) -> pycurl.Curl: curl = pycurl.Curl() curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_NTLM) curl.setopt(pycurl.USERPWD, f'{self.username}:{self.password}') curl.setopt(pycurl.HTTPHEADER, [f'HOST: {self.hostname}']) curl.setopt(pycurl.VERBOSE, True) # This is is not really a good idea as it disables TLS certificate # validation! curl.setopt(pycurl.SSL_VERIFYPEER, 0) curl.setopt(pycurl.SSL_VERIFYHOST, 0) return curl
[docs] def url(self, path: str) -> str: return f'https://{self.endpoint}/{path}'
[docs] def get( self, path: str, lifetime: float = 5 * 60, downtime: float = 60 * 60 ) -> Any: """ Requests the given path, returning the resulting json if successful. A cache is used in two stages: * At the lifetime stage, the cache is returned unconditionally. * At the end of the lifetime, the cache is refreshed if possible. * At the end of the downtime stage the cache forcefully refreshed. During its lifetime the object is basically up to 5 minutes out of date. But since the backend may not be available when that time expires we operate with a downtime that is higher (1 hour). This means that a downtime in the backend will not result in evicted caches, even if the lifetime is up. Once the downtime limit is up we do however evict the cache forcefully, raising an error if we cannot connect to the backend. """ path = path.lstrip('/') cached = self.cache.get(path) def refresh() -> Any: try: status, body = self.get_uncached(path) except pycurl.error as exception: raise RoadworkConnectionError( f'Could not connect to {self.hostname}' ) from exception if status == 200: self.cache.set(path, { 'created': utcnow(), 'status': status, 'body': body }) return body raise RoadworkError(f'{path} returned {status}') # no cache yet, return result and cache it if cached is NO_VALUE: return refresh() now = utcnow() lifetime_horizon = cached['created'] + timedelta(seconds=lifetime) downtime_horizon = cached['created'] + timedelta(seconds=downtime) # within cache lifetime, return cached value if now <= lifetime_horizon: return cached['body'] # outside cache lifetime, but still in downtime horizon, try to # refresh the value but ignore errors if lifetime_horizon < now < downtime_horizon: try: return refresh() except RoadworkConnectionError: return cached['body'] # outside the downtime lifetime, force refresh and raise errors return refresh()
[docs] def get_uncached(self, path: str) -> tuple[int, Any]: body = BytesIO() self.curl.setopt(pycurl.URL, self.url(path)) self.curl.setopt(pycurl.WRITEFUNCTION, body.write) self.curl.perform() status = self.curl.getinfo(pycurl.RESPONSE_CODE) body_str = body.getvalue().decode('utf-8') if status == 200: return status, json.loads(body_str) return status, body_str
[docs] def is_cacheable(self, response: tuple[int, Any]) -> bool: return response[0] == 200
[docs] class RoadworkCollection: def __init__( self, client: RoadworkClient, letter: str | None = None, query: str | None = None ) -> None:
[docs] self.client = client
[docs] self.query = None
[docs] self.letter = None
if query: self.query = query.lower() elif letter: self.letter = letter.lower() @property
[docs] def letters(self) -> list[str]: letters = set() for roadwork in self.by_letter(None).roadwork: for letter in roadwork.letters: letters.add(letter) return sorted(letters)
[docs] def by_filter(self, filter: str) -> list['Roadwork']: # note: addGisLink doesn't work here url = ( URL('odata/Baustellen') .query_param('addGisLink', 'False') .query_param('$filter', filter) ) records = self.client.get(url.as_string()).get('value', ()) return sorted(( Roadwork(r) for r in records if r['Internet'] ), key=attrgetter('title'))
@property
[docs] def roadwork(self) -> list['Roadwork']: date = datetime.today() roadwork = self.by_filter(filter=' and '.join(( f'DauerVon le {date.strftime("%Y-%m-%d")}', f'DauerBis ge {date.strftime("%Y-%m-%d")}', ))) # The backend supports searches/filters, but the used dataset is # so small that it makes little sense to use that feature, since it # would lead to a lot more cache-misses on our end. # # Instead we simply loop through the results and filter them out. if self.query: roadwork = [ r for r in roadwork if self.query in r.title.lower() ] elif self.letter: roadwork = [ r for r in roadwork if self.letter in r.letters ] return roadwork
[docs] def by_id(self, id: int) -> 'Roadwork | None': url = ( URL(f'odata/Baustellen({id})') .query_param('addGisLink', 'True')) work = tuple( Roadwork(r) for r in self.client.get( url.as_string()).get('value', ())) if work: return work[0] # secondary lookup is against the subsections.. this probably calls # for an index eventually for r in self.roadwork: for section in r.sections: if section.id == id: return section return None
[docs] def by_letter(self, letter: str | None) -> 'Self': return self.__class__(self.client, letter=letter, query=None)
[docs] class Roadwork: def __init__(self, data: dict[str, Any]) -> None:
[docs] self.data = data
[docs] self.convertors = { 'DauerVon': lambda v: v and isodate.parse_datetime(v), 'DauerBis': lambda v: v and isodate.parse_datetime(v), }
@property
[docs] def id(self) -> int: return self['Id']
@property
[docs] def letters(self) -> 'Iterator[str]': for key in ('ProjektBezeichnung', 'ProjektBereich'): if value := self[key]: letter = value[0].lower() if 97 <= ord(letter) <= 122: yield letter
@property
[docs] def title(self) -> str: parts = (self[key] for key in ('ProjektBezeichnung', 'ProjektBereich')) parts = (p.strip() for p in parts if p) parts = (p for p in parts) return ' '.join(parts)
@property
[docs] def sections(self) -> list['Self']: now = sedate.utcnow() sections = ( self.__class__({ 'Id': r['TeilbaustelleId'], 'Teilbaustellen': [], **r }) for r in self['Teilbaustellen'] ) sections = (s for s in sections if s['DauerVon']) sections = (s for s in sections if s['DauerVon'] <= now) sections = (s for s in sections if now <= (s['DauerBis'] or now)) return list(sections)
[docs] def __getitem__(self, key: str) -> Any: value = self.data[key] if key in self.convertors: return self.convertors[key](value) return value
[docs] def __contains__(self, key: str) -> bool: return key in self.data