Source code for winterthur.collections.address

from __future__ import annotations

from concurrent.futures import ThreadPoolExecutor
from io import BytesIO
from onegov.core.collection import GenericCollection
from onegov.core.csv import CSVFile
from onegov.core.orm import as_selectable
from onegov.winterthur.models import WinterthurAddress
from pycurl import Curl, URL, WRITEDATA
from sedate import utcnow
from sqlalchemy import select, func


from typing import Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from datetime import datetime
    from onegov.core.csv import DefaultRow
    from sqlalchemy.orm import Query, Session
    from typing import NamedTuple

[docs] class StreetRow(NamedTuple):
[docs] letter: str
[docs] street: str
[docs] HOST = 'https://stadt.winterthur.ch'
[docs] STREETS = f'{HOST}/_static/strassenverzeichnis/gswpl_strver_str.csv'
[docs] ADDRESSES = f'{HOST}/_static/strassenverzeichnis/gswpl_strver_adr.csv'
[docs] class AddressCollection(GenericCollection[WinterthurAddress]): @property
[docs] def model_class(self) -> type[WinterthurAddress]: return WinterthurAddress
[docs] def streets(self) -> Query[StreetRow]: query = as_selectable(""" SELECT UPPER(UNACCENT(LEFT(street, 1))) AS letter, -- Text street -- Text FROM winterthur_addresses GROUP BY street ORDER BY unaccent(street) """) return self.session.execute(select(query.c))
[docs] def last_updated(self) -> datetime | None: result = self.query().first() return result.modified if result else None
[docs] def update_state(self) -> Literal['failed', 'ok']: last_updated = self.last_updated() if not last_updated: return 'failed' diff = utcnow() - last_updated diff_hours = (diff.days * 24) + (diff.seconds / 3600) if diff_hours > 24: return 'failed' return 'ok'
[docs] def update( self, streets: str = STREETS, addresses: str = ADDRESSES ) -> None: self.delete_existing() self.import_from_csv(*self.load_urls(streets, addresses))
[docs] def delete_existing(self) -> None: for address in self.query(): self.session.delete(address)
[docs] def import_from_csv( self, streets: CSVFile[DefaultRow], addresses: CSVFile[DefaultRow] ) -> None: streets_d = {s.strc: s.bez for s in streets.lines} addressless = set(streets_d.keys()) max_id = 0 for r in addresses.lines: addressless.discard(r.strc) address = WinterthurAddress() address.id = int(r.einid) address.street_id = int(r.strc) address.street = streets_d[r.strc] address.house_number = int(r.hnr) address.house_extra = r.hnrzu address.zipcode = int(r.plz) address.zipcode_extra = None if r.plzzu is None else int(r.plzzu) address.place = r.ort address.district = r.kreisname address.neighbourhood = r.quartiername self.session.add(address) max_id = max(max_id, address.id) # some streets do not have addresses -> we write a special record for # those streets so they still show up in our UI # # not the most elegant solution, but better than introducing a separate # table at least for now for id, key in enumerate(addressless, start=max_id + 1): address = self.model_class.as_addressless(int(key), streets_d[key]) address.id = id self.session.add(address) self.session.flush()
[docs] def load_urls(self, *urls: str) -> tuple[CSVFile[DefaultRow], ...]: with ThreadPoolExecutor(max_workers=2) as executor: futures = (executor.submit(self.load_url, url) for url in urls) return tuple(f.result() for f in futures)
[docs] def load_url(self, url: str) -> CSVFile[DefaultRow]: buffer = BytesIO() c = Curl() c.setopt(URL, url) c.setopt(WRITEDATA, buffer) c.perform() c.close() return CSVFile(buffer)
[docs] class AddressSubsetCollection(GenericCollection[WinterthurAddress]): def __init__(self, session: Session, street: str) -> None: super().__init__(session)
[docs] self.street = street
@property
[docs] def model_class(self) -> type[WinterthurAddress]: return WinterthurAddress
[docs] def subset(self) -> Query[WinterthurAddress]: subset = self.query().filter_by(street=self.street) return subset.order_by( func.unaccent(WinterthurAddress.street), WinterthurAddress.house_number, WinterthurAddress.house_extra )
[docs] def exists(self) -> bool: return self.session.query(self.subset().exists()).scalar()