Source code for agency.data_import

from collections import defaultdict
from datetime import datetime
from markupsafe import Markup

from onegov.agency.collections import (
    ExtendedAgencyCollection, ExtendedPersonCollection)
from onegov.core.csv import CSVFile
from onegov.core.orm.abstract.adjacency_list import numeric_priority
from onegov.core.utils import linkify


from typing import TypeVar
from typing import TypeVarTuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from _typeshed import StrOrBytesPath
    from collections.abc import Callable
    from collections.abc import Mapping
    from datetime import date
    from onegov.agency.app import AgencyApp
    from onegov.agency.models import ExtendedAgency
    from onegov.agency.models import ExtendedPerson
    from onegov.agency.request import AgencyRequest
    from onegov.core.csv import DefaultRow
    from onegov.people import AgencyMembership
    from sqlalchemy.orm import Session

[docs] T = TypeVar('T')
[docs] Ts = TypeVarTuple('Ts')
[docs] def with_open( func: 'Callable[[CSVFile[DefaultRow], *Ts], T]' ) -> 'Callable[[StrOrBytesPath, *Ts], T]': def _read(filename: 'StrOrBytesPath', *args: *Ts) -> T: with open(filename, 'rb') as f: file = CSVFile( f, encoding='iso-8859-1' ) return func(file, *args) return _read
[docs] def v_(string: str | None) -> str | None: if not string or string == 'NULL' or not string.strip(): return None return string.strip()
[docs] def cleaned( func: 'Callable[[str], T]' ) -> 'Callable[[str | None], T | None]': def clean(string: str | None) -> T | None: cleaned = v_(string) if not cleaned: return None return func(cleaned) return clean
@cleaned
[docs] def get_phone(string: str) -> str: if string.startswith('00'): return string.replace('00', '+', 1) if not string.startswith('+'): if len(string.replace(' ', '')) == 10: # be sure #digits fit CH return string.replace('0', '+41 ', 1) return string
[docs] def p(text: str) -> Markup: return Markup('<p>{}</p>').format(text)
[docs] def br(text: str) -> Markup: return Markup('{}<br>').format(text)
[docs] def split_address_on_new_line( address: str, newline: bool = False ) -> Markup: new_addr = Markup('<br>').join(part.strip() for part in address.split(',')) new_addr = new_addr + Markup('<br>') if newline else new_addr return new_addr
[docs] def get_address(line: 'DefaultRow') -> Markup | None: stao_addr, post_addr = v_(line.standortadresse), v_(line.postadresse) if stao_addr and post_addr: if stao_addr == post_addr: return br(split_address_on_new_line(stao_addr)) else: return br(split_address_on_new_line(stao_addr, True)) + br( split_address_on_new_line(post_addr) ) elif stao_addr: return br(split_address_on_new_line(stao_addr)) if post_addr: return br(split_address_on_new_line(post_addr)) return None
[docs] def get_agency_portrait(line: 'DefaultRow') -> Markup | None: portrait = Markup('') address = get_address(line) if address: portrait += br(address) if v_(line.telzentrale): tel = linkify(get_phone(v_(line.telzentrale))) portrait += br(Markup('Tel.: {}').format(tel)) if v_(line.faxzentrale): tel = linkify(get_phone(v_(line.faxzentrale))) portrait += br(Markup('Fax: {}').format(tel)) if v_(line.emailneutral): portrait += br(linkify(v_(line.emailneutral))) homepage = v_(line.homepageurl) if homepage: portrait += br(Markup('<a href="{}">Homepage</a>').format(homepage)) stadtplan = v_(line.stadtplanurl) if stadtplan: portrait += br(Markup('<a href="{}">Standort</a>').format(stadtplan)) if v_(line.oeffnungszeiten): portrait += br(f'Öffnungszeiten:\n{v_(line.oeffnungszeiten)}') if v_(line.wahlperiode): portrait += br(f'Wahlperiode: {v_(line.wahlperiode)}') if v_(line.bemerkung): portrait += br(f'{v_(line.bemerkung)}') portrait = portrait.strip() return p(portrait) if portrait else None
@with_open
[docs] def import_bs_agencies( csvfile: CSVFile['DefaultRow'], session: 'Session', app: 'AgencyApp' ) -> dict[str, 'ExtendedAgency']: agencies = ExtendedAgencyCollection(session) lines_by_id = {line.verzorgeinheitid: line for line in csvfile.lines} treat_as_root = tuple( line.verzorgeinheitid for line in csvfile.lines if line.verzvorfahreoeid not in lines_by_id.keys() ) if len(treat_as_root) == 1: # Use the first level as root treat_as_root = tuple( line.verzorgeinheitid for line in csvfile.lines if line.verzvorfahreoeid in treat_as_root ) added_agencies = {} children = defaultdict(list) roots = [] added_count = 0 print('Treated as root agencies: ', ', '.join(treat_as_root)) for line in csvfile.lines: parent_id = line.verzvorfahreoeid or None basisid = line.verzorgeinheitid if parent_id: if basisid in treat_as_root: parent_id = None roots.append(basisid) children[parent_id].append(basisid) def parse_agency( line: 'DefaultRow', parent: 'ExtendedAgency | None' = None ) -> 'ExtendedAgency': portrait = get_agency_portrait(line) agency = agencies.add( parent=parent, title=line.bezeichnung.strip(), description=None, portrait=portrait, order=numeric_priority(v_(line.anzeigeprio)), export_fields=['person.title', 'person.phone'] ) added_agencies[line.verzorgeinheitid] = agency return agency def add_children( basisid: str, parent: 'ExtendedAgency | None' = None ) -> None: nonlocal added_count added_count += 1 if added_count % 50 == 0: app.es_indexer.process() app.psql_indexer.bulk_process(session) line = lines_by_id[basisid] agency = parse_agency(line, parent=parent) for child_id in children.get(line.verzorgeinheitid, []): add_children(child_id, parent=agency) for basisid in roots: add_children(basisid) return added_agencies
@with_open
[docs] def import_bs_persons( csvfile: CSVFile['DefaultRow'], agencies: 'Mapping[str, ExtendedAgency]', session: 'Session', app: 'AgencyApp' ) -> list['ExtendedPerson']: people = ExtendedPersonCollection(session) persons = [] def parse_date(date_string: str | None) -> 'date | None': if not date_string: return None return datetime.strptime(date_string, '%d.%m.%Y').date() def parse_person(line: 'DefaultRow') -> None: bemerkung = v_(line.bemerkung) notiz = v_(line.notiz) sprechstunde = v_(line.sprechstunde) note = '\n'.join([ s for s in (bemerkung, notiz, sprechstunde) if s] ) agency_id = line.verzorgeinheitid person_ = people.add( last_name=v_(line.name) or 'NACHNAME', first_name=v_(line.vorname) or 'VORNAME', salutation=v_(line.anrede), academic_title=v_(line.titel), function=v_(line.funktion), email=v_(line.email), phone=get_phone(line.telextern), phone_direct=get_phone(line.telmobil), website=v_(line.url1) or v_(line.url2) or None, notes=note or None, born=v_(line.geburtsdatum), political_party=v_(line.partei), address=v_(line.privatadresse), access='public' ) persons.append(person_) # A person has only one membership if agency_id: agency = agencies.get(agency_id) if agency: agency.add_person( person_.id, title='Mitglied', since=None, prefix=None, addition=None, note=None, ) else: print(f'agency id {agency_id} not found in agencies') for ix, line in enumerate(csvfile.lines): if ix % 50 == 0: app.es_indexer.process() app.psql_indexer.bulk_process(session) parse_person(line) return persons
[docs] def import_bs_data( agency_file: 'StrOrBytesPath', person_file: 'StrOrBytesPath', request: 'AgencyRequest', app: 'AgencyApp' ) -> tuple[dict[str, 'ExtendedAgency'], list['ExtendedPerson']]: session = request.session agencies = import_bs_agencies(agency_file, session, app) persons = import_bs_persons(person_file, agencies, session, app) for agency in agencies.values(): agency.sort_relationships() return agencies, persons
@with_open
[docs] def parse_agencies(csvfile: CSVFile['DefaultRow']) -> dict[str, str]: lines_by_id = {line.verzorgeinheitid: line for line in csvfile.lines} treat_as_root = tuple( line.verzorgeinheitid for line in csvfile.lines if line.verzvorfahreoeid not in lines_by_id.keys() ) if len(treat_as_root) == 1: # Use the first level as root treat_as_root = tuple( line.verzorgeinheitid for line in csvfile.lines if line.verzvorfahreoeid in treat_as_root ) added_agencies = {} print('Treated as root agencies: ', ', '.join(treat_as_root)) for line in csvfile.lines: basisid = line.verzorgeinheitid added_agencies[basisid] = line.bezeichnung.strip() return added_agencies
@with_open
[docs] def match_person_membership_title( csvfile: CSVFile['DefaultRow'], agencies: 'Mapping[str, str]', request: 'AgencyRequest', app: 'AgencyApp' ) -> None: session = request.session people = ExtendedPersonCollection(session) agency_coll = ExtendedAgencyCollection(session) person_query = session.query(people.model_class) total_entries = 0 person_not_found = [] agency_by_name_not_found = [] updated_memberships = [] def find_persons(line: 'DefaultRow') -> list['ExtendedPerson']: nonlocal person_not_found email = v_(line.email) fn = v_(line.vorname) ln = v_(line.name) persons = [] if email: persons = person_query.filter(people.model_class.email.in_( (email, email.lower()) )).all() if not persons and fn and ln: persons = person_query.filter( people.model_class.first_name == fn, people.model_class.last_name == ln ).all() if not persons: person_not_found.append(f'{email}, {fn} {ln}') return persons def get_agencies_by_name(name: str) -> list['ExtendedAgency']: return agency_coll.query().filter_by(title=name).all() def match_membership_title( line: 'DefaultRow', agencies: 'Mapping[str, str]' ) -> None: nonlocal agency_by_name_not_found persons = find_persons(line) agency_id = line.verzorgeinheitid if not persons: return if agency_id not in agencies: # actually we don't come here with our data for a solid export return agency_name = agencies[agency_id] agencies_by_name = get_agencies_by_name(agency_name) if not agencies_by_name: agency_by_name_not_found.append(agency_name) return def set_membership_title( membership: 'AgencyMembership', name: str | None ) -> None: nonlocal updated_memberships title = membership.title.strip() name = name.strip() if name else None if not name: if title: # print('No function given but title set') return membership.title = 'Mitglied' updated_memberships.append(membership) return if title and title != 'Mitglied': # title already set return membership.title = name updated_memberships.append(membership) function = v_(line.funktion) for person in persons: for membership in person.memberships: agency = membership.agency if agency in agencies_by_name: set_membership_title(membership, function) for ix, line in enumerate(csvfile.lines): if ix % 50 == 0: app.es_indexer.process() app.psql_indexer.bulk_process(session) total_entries += 1 match_membership_title(line, agencies) uq_person_not_found = set(person_not_found) uq_agency_by_name_not_found = set(agency_by_name_not_found) print('---- STATISTICS ----') print('Total rows: ', total_entries) print('Unique People not found: ', len(uq_person_not_found)) print( 'Unique Agencies by name not found: ', len(uq_agency_by_name_not_found) ) print('Updated memberships: ', len(updated_memberships)) log_file_path = '/var/lib/onegov-cloud/staka_bs_memberships_title.log' with open(str(log_file_path), 'w') as f: f.write('PEOPLE NOT FOUND\n') f.write('\n'.join(uq_person_not_found)) f.write('\n\nAGENCIES NOT FOUND\n') f.write('\n'.join(uq_agency_by_name_not_found)) print('Find the logfile in ' + log_file_path)
[docs] def import_membership_titles( agency_file: 'StrOrBytesPath', person_file: 'StrOrBytesPath', request: 'AgencyRequest', app: 'AgencyApp' ) -> None: agencies = parse_agencies(agency_file) match_person_membership_title(person_file, agencies, request, app)