from collections import defaultdict
from datetime import datetime
from email_validator import validate_email, EmailNotValidError, \
EmailUndeliverableError
from markupsafe import Markup
from onegov.agency.collections import (
ExtendedAgencyCollection, ExtendedPersonCollection)
from onegov.core.csv import CSVFile
from onegov.core.orm.abstract.adjacency_list import numeric_priority
from onegov.core.utils import linkify
from typing import TypeVar, Any
from typing import TypeVarTuple
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import StrOrBytesPath
from collections.abc import Callable, Iterable
from collections.abc import Mapping
from datetime import date
from onegov.agency.app import AgencyApp
from onegov.agency.models import ExtendedAgency
from onegov.agency.models import ExtendedPerson
from onegov.agency.request import AgencyRequest
from onegov.core.csv import DefaultRow
from onegov.people import AgencyMembership
from sqlalchemy.orm import Session
[docs]
Ts = TypeVarTuple('Ts')
[docs]
def with_open(
func: 'Callable[[CSVFile[DefaultRow], *Ts], T]'
) -> 'Callable[[StrOrBytesPath, *Ts], T]':
def _read(filename: 'StrOrBytesPath', *args: *Ts) -> T:
with open(filename, 'rb') as f:
file = CSVFile(
f,
# encoding='iso-8859-1'
encoding='utf-8', # lu
)
return func(file, *args)
return _read
[docs]
def v_(string: str | None) -> str | None:
if not string or string == 'NULL' or not string.strip():
return None
return string.strip()
[docs]
def cleaned(
func: 'Callable[[str], T]'
) -> 'Callable[[str | None], T | None]':
def clean(string: str | None) -> T | None:
cleaned = v_(string)
if not cleaned:
return None
return func(cleaned)
return clean
@cleaned
[docs]
def get_phone(string: str) -> str:
if string.startswith('00'):
return string.replace('00', '+', 1)
if not string.startswith('+'):
if len(string.replace(' ', '')) == 10: # be sure #digits fit CH
return string.replace('0', '+41 ', 1)
# lu adds country digits
if len(string.replace(' ', '')) == 9:
return (f'+41 {string[0:2]} {string[2:5]} '
f'{string[5:7]} {string[7:9]}')
return string
[docs]
def p(text: str) -> Markup:
return Markup('<p>{}</p>').format(text)
[docs]
def br(text: str) -> Markup:
return Markup('{}<br>').format(text)
[docs]
def split_address_on_new_line(
address: str,
newline: bool = False
) -> Markup:
new_addr = Markup('<br>').join(part.strip() for part in address.split(','))
new_addr = new_addr + Markup('<br>') if newline else new_addr
return new_addr
[docs]
def get_address(line: 'DefaultRow') -> Markup | None:
stao_addr, post_addr = v_(line.standortadresse), v_(line.postadresse)
if stao_addr and post_addr:
if stao_addr == post_addr:
return br(split_address_on_new_line(stao_addr))
else:
return br(split_address_on_new_line(stao_addr, True)) + br(
split_address_on_new_line(post_addr)
)
elif stao_addr:
return br(split_address_on_new_line(stao_addr))
if post_addr:
return br(split_address_on_new_line(post_addr))
return None
[docs]
def get_agency_portrait(line: 'DefaultRow') -> Markup | None:
portrait = Markup('')
address = get_address(line)
if address:
portrait += br(address)
if v_(line.telzentrale):
tel = linkify(get_phone(v_(line.telzentrale)))
portrait += br(Markup('Tel.: {}').format(tel))
if v_(line.faxzentrale):
tel = linkify(get_phone(v_(line.faxzentrale)))
portrait += br(Markup('Fax: {}').format(tel))
if v_(line.emailneutral):
portrait += br(linkify(v_(line.emailneutral)))
homepage = v_(line.homepageurl)
if homepage:
portrait += br(Markup('<a href="{}">Homepage</a>').format(homepage))
stadtplan = v_(line.stadtplanurl)
if stadtplan:
portrait += br(Markup('<a href="{}">Standort</a>').format(stadtplan))
if v_(line.oeffnungszeiten):
portrait += br(f'Öffnungszeiten:\n{v_(line.oeffnungszeiten)}')
if v_(line.wahlperiode):
portrait += br(f'Wahlperiode: {v_(line.wahlperiode)}')
if v_(line.bemerkung):
portrait += br(f'{v_(line.bemerkung)}')
portrait = portrait.strip()
return p(portrait) if portrait else None
@with_open
[docs]
def import_bs_agencies(
csvfile: CSVFile['DefaultRow'],
session: 'Session',
app: 'AgencyApp'
) -> dict[str, 'ExtendedAgency']:
agencies = ExtendedAgencyCollection(session)
lines_by_id = {line.verzorgeinheitid: line for line in csvfile.lines}
treat_as_root = tuple(
line.verzorgeinheitid for line in csvfile.lines
if line.verzvorfahreoeid not in lines_by_id.keys()
)
if len(treat_as_root) == 1:
# Use the first level as root
treat_as_root = tuple(
line.verzorgeinheitid for line in csvfile.lines
if line.verzvorfahreoeid in treat_as_root
)
added_agencies = {}
children = defaultdict(list)
roots = []
added_count = 0
print('Treated as root agencies: ', ', '.join(treat_as_root))
for line in csvfile.lines:
parent_id = line.verzvorfahreoeid or None
basisid = line.verzorgeinheitid
if parent_id:
if basisid in treat_as_root:
parent_id = None
roots.append(basisid)
children[parent_id].append(basisid)
def parse_agency(
line: 'DefaultRow',
parent: 'ExtendedAgency | None' = None
) -> 'ExtendedAgency':
portrait = get_agency_portrait(line)
agency = agencies.add(
parent=parent,
title=line.bezeichnung.strip(),
description=None,
portrait=portrait,
order=numeric_priority(v_(line.anzeigeprio)),
export_fields=['person.title', 'person.phone']
)
added_agencies[line.verzorgeinheitid] = agency
return agency
def add_children(
basisid: str,
parent: 'ExtendedAgency | None' = None
) -> None:
nonlocal added_count
added_count += 1
if added_count % 50 == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
line = lines_by_id[basisid]
agency = parse_agency(line, parent=parent)
for child_id in children.get(line.verzorgeinheitid, []):
add_children(child_id, parent=agency)
for basisid in roots:
add_children(basisid)
return added_agencies
@with_open
[docs]
def import_bs_persons(
csvfile: CSVFile['DefaultRow'],
agencies: 'Mapping[str, ExtendedAgency]',
session: 'Session',
app: 'AgencyApp'
) -> list['ExtendedPerson']:
people = ExtendedPersonCollection(session)
persons = []
def parse_date(date_string: str | None) -> 'date | None':
if not date_string:
return None
return datetime.strptime(date_string, '%d.%m.%Y').date()
def parse_person(line: 'DefaultRow') -> None:
bemerkung = v_(line.bemerkung)
notiz = v_(line.notiz)
sprechstunde = v_(line.sprechstunde)
note = '\n'.join([
s for s in (bemerkung, notiz, sprechstunde) if s]
)
agency_id = line.verzorgeinheitid
person_ = people.add(
last_name=v_(line.name) or 'NACHNAME',
first_name=v_(line.vorname) or 'VORNAME',
salutation=v_(line.anrede),
academic_title=v_(line.titel),
function=v_(line.funktion),
email=v_(line.email),
phone=get_phone(line.telextern),
phone_direct=get_phone(line.telmobil),
website=v_(line.url1) or v_(line.url2) or None,
notes=note or None,
born=v_(line.geburtsdatum),
political_party=v_(line.partei),
address=v_(line.privatadresse),
access='public'
)
persons.append(person_)
# A person has only one membership
if agency_id:
agency = agencies.get(agency_id)
if agency:
agency.add_person(
person_.id,
title='Mitglied',
since=None,
prefix=None,
addition=None,
note=None,
)
else:
print(f'agency id {agency_id} not found in agencies')
for ix, line in enumerate(csvfile.lines):
if ix % 50 == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
parse_person(line)
return persons
[docs]
def import_bs_data(
agency_file: 'StrOrBytesPath',
person_file: 'StrOrBytesPath',
request: 'AgencyRequest',
app: 'AgencyApp'
) -> tuple[dict[str, 'ExtendedAgency'], list['ExtendedPerson']]:
session = request.session
agencies = import_bs_agencies(agency_file, session, app)
persons = import_bs_persons(person_file, agencies, session, app)
for agency in agencies.values():
agency.sort_relationships()
return agencies, persons
[docs]
def get_plz_city(plz: str | None, ort: str | None) -> str | None:
if plz and ort:
return f'{plz} {ort}'
if ort:
return ort
if plz:
return plz
return None
[docs]
def get_web_address(internet_adresse: str) -> str | None:
if not internet_adresse:
return None
if internet_adresse.startswith('http'):
return internet_adresse
return f'http://{internet_adresse}'
[docs]
def get_email(line: 'DefaultRow') -> str | None:
email = v_(line.e_mail_adresse)
if not email:
return None
# only keep valid generic email address, but not `vorname.nachname@lu.ch`
addr = email.split(' ')
for a in addr:
if a in ['vorname.name@lu.ch', '@lu.ch']:
continue
if '@' in a:
try:
validate_email(a)
except EmailUndeliverableError:
continue
except EmailNotValidError:
print(f'Error importing person with invalid email {a}; line '
f'{line.rownumber}')
continue
return a
return None
[docs]
def check_skip(line: 'DefaultRow') -> bool:
if line.department == 'zNeu':
return True
if any(s in line.vorname for s in ('Zi.', 'Korr.', 'test')):
return True
if any(s in line.nachname for s in ('WG', 'WH', 'W3', 'W5',
'frei neuer MA', 'frei neuer MA',
'AAL Picket')):
return True
if line.nachname == '' and line.vorname == '':
return True # skip empty lines
return False
[docs]
def check_skip_people(line: 'DefaultRow') -> bool:
kw_1 = 'Telefon'
kw_2 = 'Telefonist'
# skip 'Telefon' but don't skip 'Telefonist' 'Telefonistin' resp.
if kw_2 in line.nachname or kw_2 in line.vorname or kw_2 in line.funktion:
return False
if kw_1 in line.nachname or kw_1 in line.vorname or kw_1 in line.funktion:
# print(f'Skipping person on line {line.rownumber} with keyword '
# f'{kw_1} {line.nachname}, {line.vorname}, {line.funktion}')
return True
return False
[docs]
def agency_id_agency_lu(words: 'Iterable[Any]') -> str:
"""
Generates an agency id based on each organisation and sub organisation word
"""
return '__'.join(str(word).lower() for word in words if word)
[docs]
def agency_id_person_lu(line: 'DefaultRow') -> str:
"""
Generates an agency id based on each organisation and sub organisation
name for a person.
"""
words = [line.department, line.dienststelle, line.abteilung,
line.unterabteilung, line.unterabteilung_2]
return agency_id_agency_lu(words)
@with_open
[docs]
def import_lu_people(
csvfile: CSVFile['DefaultRow'],
agencies: 'Mapping[str, ExtendedAgency]',
session: 'Session',
app: 'AgencyApp'
) -> list['ExtendedPerson']:
people = ExtendedPersonCollection(session)
persons = []
def parse_person(line: 'DefaultRow') -> None:
vorname = v_(line.vorname) or ''
if vorname and vorname[-1].isdigit():
# some people have a number at the end of their first name
# indicating another membership
vorname = ' '.join(vorname.split(' ')[:-1])
function = v_(line.funktion) or ''
person = people.add_or_get(
last_name=v_(line.nachname) or ' ',
first_name=vorname,
salutation=None,
academic_title=v_(line.akad__titel),
function=function,
email=get_email(line),
phone=get_phone(line.isdn_nummer),
phone_direct=get_phone(line.mobil),
website=v_(get_web_address(line.internet_adresse)),
location_address=v_(line.adresse),
location_code_city=v_(get_plz_city(line.plz, line.ort)),
access='public',
compare_names_only=True
)
persons.append(person)
parse_membership(line, person, function)
def parse_membership(
line: 'DefaultRow',
person: 'ExtendedPerson',
function: str
) -> None:
agency_id = agency_id_person_lu(line)
hi_code = v_(line.hi_code)
order = 0 if not hi_code else int(hi_code)
if agency_id:
agency = agencies.get(agency_id)
if agency and order:
agency.add_person(person.id,
title=function or 'Mitglied',
order_within_agency=order)
elif agency:
agency.add_person(person.id,
title=function or 'Mitglied')
else:
print(f'Error agency id {agency_id} not found')
for ix, line in enumerate(csvfile.lines):
if ix % 100 == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
if not check_skip(line) and not check_skip_people(line):
parse_person(line)
return persons
@with_open
[docs]
def import_lu_agencies(
csvfile: CSVFile['DefaultRow'],
session: 'Session',
app: 'AgencyApp'
) -> dict[str, 'ExtendedAgency']:
added_agencies = {}
agencies = ExtendedAgencyCollection(session)
# Hierarchy: Hierarchie: Department, Dienststelle, Abteilung,
# Unterabteilung, Unterabteilung 2, Unterabteilung 3
for ix, line in enumerate(csvfile.lines):
if ix % 100 == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
if check_skip(line):
continue
dienststelle, abteilung, unterabteilung, unterabteilung_2 = (
None, None, None, None)
export_fields = ['person.title', 'person.phone']
adr, pc, loc = None, None, None
phone, phone_u2, phone_u, phone_a, phone_ds, phone_dep = \
None, None, None, None, None, None
kw = 'Telefon'
if kw in line.nachname or kw in line.vorname or kw in line.funktion:
phone = get_phone(line.isdn_nummer)
if v_(line.unterabteilung_2):
phone_u2 = phone
elif v_(line.unterabteilung):
phone_u = phone
elif v_(line.abteilung):
phone_a = phone
elif v_(line.dienststelle):
phone_ds = phone
elif v_(line.department):
phone_dep = phone
adr = v_(line.adresse)
pc = v_(line.plz)
loc = v_(line.ort)
department_name = v_(line.department)
if department_name:
department = agencies.add_or_get(
None, department_name, export_fields=export_fields)
if phone_dep:
department.phone = phone_dep
department.location_address = adr
department.location_code_city = get_plz_city(pc, loc)
agency_id = agency_id_agency_lu([department_name])
if agency_id not in added_agencies:
added_agencies[agency_id] = department
dienststellen_name = v_(line.dienststelle)
if dienststellen_name:
assert department, (f'Error adding agency with no department; '
f'line {line.rownumber}, {line.nachname}')
dienststelle = agencies.add_or_get(
department, dienststellen_name, export_fields=export_fields)
if phone_ds:
dienststelle.phone = phone_ds
dienststelle.location_address = adr
dienststelle.location_code_city = get_plz_city(pc, loc)
agency_id = agency_id_agency_lu([
department_name, dienststellen_name])
if agency_id not in added_agencies:
added_agencies[agency_id] = dienststelle
abteilungs_name = v_(line.abteilung)
if abteilungs_name:
assert dienststelle, (f'Error adding agency with no dienststelle; '
f'line {line.rownumber}, {line.nachname}')
abteilung = agencies.add_or_get(
dienststelle, abteilungs_name, export_fields=export_fields)
if phone_a:
abteilung.phone = phone_a
abteilung.location_address = adr
abteilung.location_code_city = get_plz_city(pc, loc)
agency_id = agency_id_agency_lu([
department_name, dienststellen_name, abteilungs_name])
if agency_id not in added_agencies:
added_agencies[agency_id] = abteilung
unterabteilungs_name = v_(line.unterabteilung)
if unterabteilungs_name:
assert abteilung, (f'Error adding agency with no abteilung; '
f'line {line.rownumber}, {line.nachname}')
unterabteilung = (
agencies.add_or_get(abteilung, unterabteilungs_name,
export_fields=export_fields))
if phone_u:
unterabteilung.phone = phone_u
unterabteilung.location_address = adr
unterabteilung.location_code_city = get_plz_city(pc, loc)
agency_id = agency_id_agency_lu([
department_name, dienststellen_name, abteilungs_name,
unterabteilungs_name])
if agency_id not in added_agencies:
added_agencies[agency_id] = unterabteilung
unterabteilung_2_name = v_(line.unterabteilung_2)
if unterabteilung_2_name:
assert unterabteilung, \
(f'Error adding agency with no unterabteilung; '
f'line {line.rownumber}, {line.nachname}')
unterabteilung_2 = (
agencies.add_or_get(unterabteilung, unterabteilung_2_name,
export_fields=export_fields))
if phone_u2:
unterabteilung_2.phone = phone_u2
unterabteilung_2.location_address = adr
unterabteilung_2.location_code_city = get_plz_city(pc, loc)
agency_id = agency_id_agency_lu([
department_name, dienststellen_name, abteilungs_name,
unterabteilungs_name, unterabteilung_2_name])
if agency_id not in added_agencies:
added_agencies[agency_id] = unterabteilung_2
return added_agencies
[docs]
def import_lu_data(
data_file: 'StrOrBytesPath',
request: 'AgencyRequest',
app: 'AgencyApp'
) -> tuple[dict[str, 'ExtendedAgency'], list['ExtendedPerson']]:
session = request.session
agencies = import_lu_agencies(data_file, session, app)
people = import_lu_people(data_file, agencies, session, app)
return agencies, people
@with_open
[docs]
def parse_agencies(csvfile: CSVFile['DefaultRow']) -> dict[str, str]:
lines_by_id = {line.verzorgeinheitid: line for line in csvfile.lines}
treat_as_root = tuple(
line.verzorgeinheitid for line in csvfile.lines
if line.verzvorfahreoeid not in lines_by_id.keys()
)
if len(treat_as_root) == 1:
# Use the first level as root
treat_as_root = tuple(
line.verzorgeinheitid for line in csvfile.lines
if line.verzvorfahreoeid in treat_as_root
)
added_agencies = {}
print('Treated as root agencies: ', ', '.join(treat_as_root))
for line in csvfile.lines:
basisid = line.verzorgeinheitid
added_agencies[basisid] = line.bezeichnung.strip()
return added_agencies
@with_open
[docs]
def match_person_membership_title(
csvfile: CSVFile['DefaultRow'],
agencies: 'Mapping[str, str]',
request: 'AgencyRequest',
app: 'AgencyApp'
) -> None:
session = request.session
people = ExtendedPersonCollection(session)
agency_coll = ExtendedAgencyCollection(session)
person_query = session.query(people.model_class)
total_entries = 0
person_not_found = []
agency_by_name_not_found = []
updated_memberships = []
def find_persons(line: 'DefaultRow') -> list['ExtendedPerson']:
nonlocal person_not_found
email = v_(line.email)
fn = v_(line.vorname)
ln = v_(line.name)
persons = []
if email:
persons = person_query.filter(people.model_class.email.in_(
(email, email.lower())
)).all()
if not persons and fn and ln:
persons = person_query.filter(
people.model_class.first_name == fn,
people.model_class.last_name == ln
).all()
if not persons:
person_not_found.append(f'{email}, {fn} {ln}')
return persons
def get_agencies_by_name(name: str) -> list['ExtendedAgency']:
return agency_coll.query().filter_by(title=name).all()
def match_membership_title(
line: 'DefaultRow',
agencies: 'Mapping[str, str]'
) -> None:
nonlocal agency_by_name_not_found
persons = find_persons(line)
agency_id = line.verzorgeinheitid
if not persons:
return
if agency_id not in agencies:
# actually we don't come here with our data for a solid export
return
agency_name = agencies[agency_id]
agencies_by_name = get_agencies_by_name(agency_name)
if not agencies_by_name:
agency_by_name_not_found.append(agency_name)
return
def set_membership_title(
membership: 'AgencyMembership',
name: str | None
) -> None:
nonlocal updated_memberships
title = membership.title.strip()
name = name.strip() if name else None
if not name:
if title:
# print('No function given but title set')
return
membership.title = 'Mitglied'
updated_memberships.append(membership)
return
if title and title != 'Mitglied':
# title already set
return
membership.title = name
updated_memberships.append(membership)
function = v_(line.funktion)
for person in persons:
for membership in person.memberships:
agency = membership.agency
if agency in agencies_by_name:
set_membership_title(membership, function)
for ix, line in enumerate(csvfile.lines):
if ix % 50 == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
total_entries += 1
match_membership_title(line, agencies)
uq_person_not_found = set(person_not_found)
uq_agency_by_name_not_found = set(agency_by_name_not_found)
print('---- STATISTICS ----')
print('Total rows: ', total_entries)
print('Unique People not found: ', len(uq_person_not_found))
print(
'Unique Agencies by name not found: ',
len(uq_agency_by_name_not_found)
)
print('Updated memberships: ', len(updated_memberships))
log_file_path = '/var/lib/onegov-cloud/staka_bs_memberships_title.log'
with open(str(log_file_path), 'w') as f:
f.write('PEOPLE NOT FOUND\n')
f.write('\n'.join(uq_person_not_found))
f.write('\n\nAGENCIES NOT FOUND\n')
f.write('\n'.join(uq_agency_by_name_not_found))
print('Find the logfile in ' + log_file_path)
[docs]
def import_membership_titles(
agency_file: 'StrOrBytesPath',
person_file: 'StrOrBytesPath',
request: 'AgencyRequest',
app: 'AgencyApp'
) -> None:
agencies = parse_agencies(agency_file)
match_person_membership_title(person_file, agencies, request, app)