Source code for fsi.ims_import

"""
Dateien:

- Kurse: Die Liste der Kurse
- Personen -> importiert aus ldap
- Teilnehmer: repräsentiert die Anmeldungen mit referenz zu
  Personen und Durchführungen
- Ausführungen: die Listen der Durchführungen.


Verlinkungen

- Teilnehmer.TEILNEHMER_ID is ForeignKey für Personen.OBJ_ID

Im folgenden eine Beschreibung der Beobachtungen zu den Files

Personen.txt:

(Daten sind sortiert nach Nachname/Vorname)

- Es gibt Email-Adressen wie user@sg.ch.local, und dann in den Anmeldungen
  ohne .local.
- Was bedeutet das Feld P_VALID_TILL? Relevant?
- es gibt viele Einträge ohne email und/oder Kürzel (P_USERID), z.B. Zeilen
- es gibt viele fehlende Vornamen/Nachnamen (evtl auffindbar in Teilnehmer.txt)
- Neu erstellte Einträge mit gleicher Email und sonst gleichen Informationen:
  Beispiel Zeilen 2215/2216. Wie vorgehen, wenn es alte Anmeldungen zu Kursen
  gibt, die auf den alten Eintrag zeigen?

Fragen:

- Sind in Personen.txt auch externe Benutzer erfasst?
- Kann ich externe Benutzer über den Eintrag P_VERWALTUNG identifizieren?
  Hier gibt es

  - [spezielle/ungültige/fehlerhafte]
  - Dritte
  - Gemeinde
  - Kantonale Verwaltung Zug
  - Staatskanzlei

  Welche von denen sind in unserem LDAP? Das sollte ich in etwa wissen.

- Was ist zu tun mit 'alten' Benutzern? Und wie definieren wir alte Benutzer?
- Ist ein alter Benutzer, der sich nicht mehr einloggen kann?
- Inwieweit sollen 'alte' Benutzer noch in die Datenbank rein?
- Sollen damit Anmeldungen von 'alten' Benutzern nicht importiert werden?

Ausführungen.txt:

- Wir haben Start und/oder Ende einer Durchführung fehlend, bei verschiedenen
  manchmal mit BUCHUNGSTATUS = Keine Buchung, kann aber auch
  Erfasst oder Abgesagt sein, ein Datum muss in der neuen Datenbank
  eingetragen werden.

- Fehlende Namen der Kursleitung: Entweder 'Unbekannter Referent' als default
  oder Datenbank abändern (in Formularen weiterhin Pflichtfeld).

Teilnehmer.txt

- Field ANWESEND oft leer, soll Ja oder Nein gewählt werden?
- Einträge ohne TEILNEHMER_ID (Link zu Personen.txt) sind externe Benutzer
  z.B.

FAZIT

Vorgeschlagenes Vorgehen:

Die "single source of truth" bildet das bereits umegesetzte Verzeichnis
der gültigen Benutzer aus dem LDAP kombiniert mit den weiteren Verzeichnissen.
Es sollen nur Anmeldungen von mittels LDAP identifizierbaren Personen
importiert werden.

Es werden nicht importiert:

- Personen, ohne Kürzel und Email sowie deren alte Anmeldungen
- Durchführungen mit BUCHUNGS_STATUS = Keine Buchung and Start/Ende fehlend
  werden auch nicht mehr importiert.
- Durchführungen ohne Start UND Ende unabhängig von Status

Vervollständigungen

- .local in Email-Adressen wird herausgenommen
- Fehlender Wert für ANWESEND bei alten Anmeldungen wird mit JA gefüllt.
- Referent Kurs: Standard-Wert einfügen "Unbekannter Referent" sofern fehlend

Gemäss dem code (src/onegov/fsi/ims_import.py:368) gibt es keine Benutzer
in Personen.txt die weder email noch code haben und von den Teilnehmer.txt
her aus vervollständigt werden müssten.

Anmeldungen in Teilnehmer.txt die keine Referenz zu Personen.txt haben,
wurden berücksichtigt, sofern eine Email vorlag.

"""
import dateutil.parser
from collections import OrderedDict, defaultdict
from datetime import datetime
from sedate import replace_timezone
from sqlalchemy import cast, Date
from uuid import uuid4

from onegov.core.csv import CSVFile
from onegov.core.html import sanitize_html
from onegov.fsi.collections.subscription import SubscriptionsCollection
from onegov.fsi.models import CourseEvent, Course, CourseSubscription
from onegov.fsi.models.course_notification_template import (
    InfoTemplate, SubscriptionTemplate, CancellationTemplate, ReminderTemplate)
from onegov.user import User


from typing import TypeVarTuple, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.core.csv import DefaultRow
    from onegov.fsi.models import CourseAttendee
    from onegov.fsi.models.course_event import EventStatusType
    from onegov.fsi.request import FsiRequest
    from sqlalchemy.orm import Session
    from typing import TypedDict, NotRequired
    from typing_extensions import TypeVar
    from uuid import UUID

[docs] T = TypeVar('T')
DefaultT = TypeVar('DefaultT', default=None) class SubscriptionDict(TypedDict): course_event_id: UUID completed: bool class PersonDict(TypedDict): obj_id: str email: str | None code: str | None first_name: str | None last_name: str | None subscriptions: NotRequired[list[SubscriptionDict]] class UserDict(TypedDict): id: UUID email: str | None organisation: str | None first_name: str | None last_name: str | None subscriptions: list[SubscriptionDict]
[docs] Ts = TypeVarTuple('Ts')
[docs] class InconsistencyError(BaseException): def __init__( self, msg: str, file: str, rownumber: int | None = None ) -> None:
[docs] self.msg = msg
[docs] self.file = file
[docs] self.rownumber = rownumber
[docs] def __str__(self) -> str: return f'{self.file}:{self.rownumber} - {self.msg}'
[docs] def parse_email(email: str) -> str | None: # some mail lack .org email = email.strip().replace('.local', '').lower() if not email: return None if email[-3:] != '.ch' and email[-3:] == '@ẑg': email += '.ch' return email
[docs] def parse_date( val: str | None, # NOTE: Seems like PEP-696 does not allow assigning a TypeVar to its # default value yet, this should probably be allowed default: 'DefaultT' = None # type:ignore[assignment] ) -> 'datetime | DefaultT': if not val: return default date_ = dateutil.parser.parse(val) return replace_timezone(date_, 'UTC')
[docs] def parse_status(val: str) -> 'EventStatusType': mapping: dict[str, EventStatusType] = { 'Abgeschlossen': 'confirmed', 'Abgesagt': 'canceled', 'Erfasst': 'created' } return mapping[val]
[docs] def validate_integer( val: str | None, treat_none_as_default: bool = True, default: int = 0 ) -> int: if not val: if treat_none_as_default: return default raise ValueError(f'Empty value: {val}') return int(val)
[docs] def with_open( func: 'Callable[[CSVFile[DefaultRow], *Ts], T]' ) -> 'Callable[[str, *Ts], T]': def _read(filename: str, /, *args: *Ts) -> 'T': with open(filename, 'rb') as f: file = CSVFile( f, dialect='excel', encoding='iso-8859-1' ) return func(file, *args) return _read
@with_open
[docs] def import_teacher_data( csvfile: CSVFile['DefaultRow'], request: 'FsiRequest', clean: bool = False ) -> None: session = request.session sgk = session.query(Course).filter_by( name='Sicherheitsgrundkurs (SGK)').one() subscriptions = SubscriptionsCollection(session) total_lines = 0 matched_count = 0 deleted_count = 0 for ix, line in enumerate(csvfile.lines): total_lines += 1 email = line.emailadresse.lower() course_name = line.kursbezeichnung if 'Sicherheitsgrundkurs' not in course_name: continue course_date = datetime.strptime(line.kursdatum, '%d.%m.%Y').date() confirmed = line.besucht == 'J' and True or False matched_teacher = session.query(User).filter_by(username=email).first() if not matched_teacher: print(f'{email} not found') continue assert hasattr(matched_teacher, 'attendee') matched_event = sgk.events.filter( cast(CourseEvent.start, Date) == course_date ).order_by(CourseEvent.start).first() if matched_event: if not confirmed: print(f'{email} for {course_date!s} not confirmed') # print(f'Found {email} in database and event that day') matched_count += 1 if not clean: subscriptions.add( course_event_id=matched_event.id, attendee_id=matched_teacher.attendee.id, event_completed=confirmed ) else: attendee_id = matched_teacher.attendee.id subs = session.query(CourseSubscription).filter( CourseSubscription.course_event_id == matched_event.id, CourseSubscription.attendee_id == attendee_id ).first() if subs: deleted_count += 1 session.delete(subs) print(f'Total lines: {total_lines}') print(f'Matched emails/events: {matched_count}') if clean: print(f'Deleted {deleted_count} subscriptions')
[docs] def parse_completed(val: str | None) -> bool: return False if val == 'N' else True
@with_open
[docs] def parse_persons(csvfile: CSVFile['DefaultRow']) -> dict[str, 'PersonDict']: """Pure extracting information""" persons: dict[str, PersonDict] = OrderedDict() print('-- parse_persons --') for line in csvfile.lines: obj_id = line.obj_id assert obj_id # valid_till = parse_date(line.p_valid_till) persons.setdefault(obj_id, { 'obj_id': obj_id, 'email': parse_email(line.p_email), 'code': line.p_userid, 'first_name': line.p_vorname, 'last_name': line.p_name, }) return persons
@with_open
[docs] def parse_courses( csvfile: CSVFile['DefaultRow'] ) -> tuple[dict[int, str], dict[str, Course]]: errors = OrderedDict() courses = OrderedDict() print('-- parse_courses --') for line in csvfile.lines: try: courses[line.vorgangsnr] = Course( id=uuid4(), name=line.kurzbeschreibung, description=sanitize_html(line.detailbeschreibung) ) except Exception as e: errors[line.rownumber] = e.args[0] continue return errors, courses
@with_open
[docs] def parse_events( csvfile: CSVFile['DefaultRow'], courses: dict[str, Course] ) -> tuple[dict[int, str], dict[str, CourseEvent]]: events = OrderedDict() errors = OrderedDict() print('-- parse_events --') for line in csvfile.lines: # Block accepts empty end values, but fails if the start is not set parent_course = courses[line.proc_id_kv] if not parent_course: print(f'Skipping {line.rownumber}: parent id {line.proc_id_kv} ' f'not found in Personen.txt') continue date_lacking = not line.kurs_von or not line.kurs_bis if line.buchungsstatus == 'Keine Buchung' and date_lacking: print(f'Skipping line {line.rownumber}: ' f'status: Buchungsstatus = Keine Buchung and lacking date') continue try: start = parse_date(line.kurs_von) assert start is not None except Exception as e: errors[line.rownumber] = e.args[0] print(f'Skipping {line.rownumber}: {e.args[0]}') continue try: # Setting end to start in case end is empty end = parse_date(line.kurs_bis, default=start) except TypeError: print(f'{line.rownumber}: invalid date, setting end = start') end = start default_name = 'Unbekannter Referent' default_company = 'Unbekannte Firma' min_att = validate_integer(line.teilnehmer_min) max_att = validate_integer(line.teilnehmer_max) event = CourseEvent( id=uuid4(), course_id=parent_course.id, location=line.kurslokal, start=start, end=end, presenter_name=line.referent_name or default_name, presenter_company=line.referent_firma or default_company, min_attendees=min_att, max_attendees=max_att, status=parse_status(line.status), ) events[line.proc_id] = event return errors, events
@with_open
[docs] def parse_subscriptions( csvfile: CSVFile['DefaultRow'], persons: dict[str, 'PersonDict'], events: dict[str, CourseEvent] ) -> tuple[dict[int, str], dict[str, 'PersonDict'], dict[str, 'UserDict']]: """ :param csvfile: :param persons: dict of dicts of person records :param events: dict of CourseEvent classes :return: subscriptions within persons and within maybe_external_in_ldap """ print('-- parse_subscriptions --') # The selection of valid subscriptions/subscriptions errors: dict[int, str] = OrderedDict() maybe_external_in_ldap: dict[str, UserDict] = OrderedDict() emails_by_teilnehmer_id = defaultdict(list) # new_emails_for_existing = defaultdict(list) for line in csvfile.lines: teilnehmer_id = line.teilnehmer_id course_event = events.get(line.proc_id) completed = parse_completed(line.anwesend) if not course_event: print(f'Skipping {line.rownumber}: drop since no course event') continue email = parse_email(line.teilnehmer_email) if teilnehmer_id: # Deal with the persons object person_obj = persons.get(line.teilnehmer_id) if not person_obj: print(f'Skipping {line.rownumber}: orphaned subscription') # skip orphaned subscriptions continue # Analyze possible identifiers code = person_obj['code'] current_email = person_obj['email'] # first_name, last_name from Persons first_name = person_obj['first_name'] last_name = person_obj['last_name'] complete_record = all((current_email, code, first_name, last_name)) # skip subscriptions for persons without email and code if not current_email and not code: print(f'Skipping {line.rownumber}: ' f'person obj without email and code') continue subscriptions = person_obj.setdefault( 'subscriptions', []) if code: # Add to valid subscriptions subscriptions.append({ 'course_event_id': course_event.id, 'completed': completed }) continue if current_email: # just current_email if current_email != email: # Assert full profile of the record in Personen.txt assert complete_record subscriptions.append({ 'course_event_id': course_event.id, 'completed': completed }) continue if email: # Just a check, empty will be assert beneath emails_by_teilnehmer_id[teilnehmer_id].append(email) elif line.teilnehmer_firma == 'Intern': print(f'Skipping {line.rownumber}: ' f'Firma intern but no link to Persons') continue else: # no teilnehmer_id so is external from a school or elsewhere first_name = line.teilnehmer_vorname last_name = line.teilnehmer_nachname if not (email and first_name and last_name): print(f'Skipping {line.rownumber}: ' 'Subscription misses one of ' 'email, firstname or lastname') continue external = maybe_external_in_ldap.setdefault(email, { 'id': uuid4(), 'first_name': first_name, 'last_name': last_name, 'organisation': line.teilnehmer_firma, 'subscriptions': [], 'email': email }) external['subscriptions'].append({ 'course_event_id': course_event.id, 'completed': completed }) assert not emails_by_teilnehmer_id return errors, persons, maybe_external_in_ldap
[docs] def map_persons_to_known_ldap_user( person_record: 'PersonDict | UserDict', session: 'Session' ) -> 'CourseAttendee | None': """ Since the exported persons table contains records without email from various sources, we have to try to map it to an existing record or create a new one. The ldap fetched users have an attendee created with first_name, last_name and email set on the attendee as well. Returns an attendee or None """ code = person_record.get('code', None) email = person_record['email'] if code: user = session.query(User).filter_by(source_id=code).first() if user: if email and user.username != email: # set email to most up-to-date email = user.username assert hasattr(user, 'attendee') if not user.attendee: raise InconsistencyError( f'{email} - {code}: user should have an attendee', 'Personen.txt', ) return user.attendee print(f'LDAP CODE: {code} not found, Email {email} not searched') elif email: user = session.query(User).filter_by(username=email).first() if user: assert hasattr(user, 'attendee') if not user.attendee: raise InconsistencyError( f'{email}: user should have an attendee', 'Personen.txt', ) return user.attendee print(f'LDAP EMAIL: {email} not found') else: identifier = person_record.get('obj_id') or email print(f'No identifier for user with OBJ_ID {identifier}') return None
[docs] def parse_ims_data( subscriptions_file: str, events_file: str, courses_file: str, persons_file: str, ) -> tuple[ dict[str, dict[int, str]], dict[str, 'PersonDict'] | None, dict[str, Course] | None, dict[str, CourseEvent] | None, dict[str, 'UserDict'] | None ]: gathered_errors = {} # raw extraction raw_persons = parse_persons(persons_file) errors, courses = parse_courses(courses_file) if errors: gathered_errors['parse_courses'] = errors return gathered_errors, None, None, None, None errors, events = parse_events(events_file, courses) if errors: gathered_errors['parse_events'] = errors return gathered_errors, None, None, None, None errors, persons, possible_ldap_users = parse_subscriptions( subscriptions_file, raw_persons, events) if errors: gathered_errors['parse_subscriptions'] = errors return gathered_errors, persons, courses, events, possible_ldap_users
[docs] def import_ims_data( session: 'Session', persons: dict[str, 'PersonDict'], courses: dict[str, Course], events: dict[str, CourseEvent], possible_ldap_users: dict[str, 'UserDict'] ) -> dict[str, int]: print('-- Import IMS DATA to database with LDAP --') statistics = { 'LDAP_found': 0, 'LDAP_not_found': 0, 'external_found': 0, 'external_not_found': 0, } session.add_all(courses.values()) for event in events.values(): session.add(event) data = {'course_event_id': event.id} session.add_all(( InfoTemplate(**data), SubscriptionTemplate(**data), CancellationTemplate(**data), ReminderTemplate(**data) )) for person in persons.values(): attendee = map_persons_to_known_ldap_user(person, session) if not attendee: statistics['LDAP_not_found'] += 1 continue statistics['LDAP_found'] += 1 subscriptions = person['subscriptions'] if not subscriptions: continue session.add_all( CourseSubscription( attendee_id=attendee.id, course_event_id=r['course_event_id'], event_completed=r['completed'] ) for r in subscriptions ) for record in possible_ldap_users.values(): attendee = map_persons_to_known_ldap_user(record, session) if not attendee: statistics['external_not_found'] += 1 continue statistics['external_found'] += 1 session.add_all( CourseSubscription( attendee_id=attendee.id, course_event_id=r['course_event_id'] ) for r in record['subscriptions'] ) return statistics