Source code for wtfs.cli

import click
import sedate

from datetime import datetime
from functools import cached_property
from io import BytesIO
from onegov.core.cli import abort
from onegov.core.cli import command_group
from onegov.core.cli import pass_group_context
from onegov.core.crypto import hash_password, random_password
from onegov.core.csv import CSVFile, convert_excel_to_csv
from onegov.core.utils import Bunch
from onegov.user import User, UserGroupCollection
from onegov.wtfs.models import PickupDate, ScanJob
from pathlib import Path
from sqlalchemy import create_engine
from sqlalchemy.orm.session import close_all_sessions


from typing import Any, IO, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.core.cli.core import GroupContext
    from onegov.core.request import CoreRequest
    from onegov.wtfs.app import WtfsApp
    from onegov.wtfs.models import Municipality
    from sqlalchemy.orm import Session


[docs] cli = command_group()
@cli.command(context_settings={'creates_path': True}) @pass_group_context
[docs] def add( group_context: 'GroupContext' ) -> 'Callable[[CoreRequest, WtfsApp], None]': """ Adds an instance to the database. For example: .. code-block:: bash onegov-wtfs --select '/onegov_wtfs/wtfs' add """ def add_instance(request: 'CoreRequest', app: 'WtfsApp') -> None: app.cache.flush() app.add_initial_content() click.echo('Instance was created successfully') return add_instance
@cli.command() @pass_group_context
[docs] def delete( group_context: 'GroupContext' ) -> 'Callable[[CoreRequest, WtfsApp], None]': """ Deletes an instance from the database. For example: .. code-block:: bash onegov-wtfs --select '/onegov_wtfs/wtfs' delete """ def delete_instance(request: 'CoreRequest', app: 'WtfsApp') -> None: confirmation = 'Do you really want to DELETE {}?'.format(app.schema) if not click.confirm(confirmation): abort('Deletion process aborted') assert app.has_database_connection assert app.session_manager.is_valid_schema(app.schema) close_all_sessions() dsn = app.session_manager.dsn app.session_manager.dispose() engine = create_engine(dsn) engine.execute('DROP SCHEMA "{}" CASCADE'.format(app.schema)) engine.raw_connection().invalidate() engine.dispose() click.echo('Instance was deleted successfully') return delete_instance
@cli.command(name='import', context_settings={'singular': True}) @click.option('--path', type=click.Path(exists=True), required=True)
[docs] def import_users(path: str) -> 'Callable[[CoreRequest, WtfsApp], None]': """ Imports the wtfs live data from the legacy system. """ # we use a single random password for all accounts, which is known to # nobody (users are meant to request a new password after import) # # this speeds up the user import by an order of magnitude # # don't blindly copy this! password_hash = hash_password(random_password(128)) roles = { 'Admin': 'admin', 'Gemeinde Admin': 'editor', 'Benutzer': 'member', } types: dict[str, Literal['normal', 'express']] = { '1': 'normal', '2': 'express', } def fix(string: bytes) -> bytes: # one of the CSV files has a line that seems to elude our parser, # so we just fix it up return string.replace( b'""violett; Def. an Scan-Center""', b"'violett; Def. an Scan-Center'" ) def slurp(file: IO[bytes]) -> BytesIO: return BytesIO(fix(file.read())) def as_csv(path: Path) -> 'CSVFile[Any]': adapt: Callable[[IO[bytes]], BytesIO] if path.name.endswith('xlsx'): adapt = convert_excel_to_csv else: adapt = slurp with open(path, 'rb') as f: return CSVFile(adapt(f)) def load_files(path: Path) -> Bunch: prefix = 'tx_winscan_domain_model' files = Bunch() files.users = as_csv(path / 'User_Scanauftrag_neu.xlsx') files.bill = as_csv(path / f'{prefix}_bill.csv') files.date = as_csv(path / f'{prefix}_date.csv') files.paymenttype = as_csv(path / f'{prefix}_paymenttype.csv') files.township = as_csv(path / f'{prefix}_township.csv') files.transportorder = as_csv(path / f'{prefix}_transportorder.csv') files.transporttype = as_csv(path / f'{prefix}_transporttype.csv') return files files = load_files(Path(path)) class Context: def __init__(self, session: 'Session'): self.session = session @cached_property def groups(self) -> UserGroupCollection['Municipality']: return UserGroupCollection(self.session, type='wtfs') def town_payment_type(town: Any) -> str: return town.payment_type == '1' and 'normal' or 'spezial' def parse_datetime(dt: str | float | int) -> datetime: d = datetime.utcfromtimestamp(int(dt)) d = sedate.replace_timezone(d, 'UTC') d = sedate.to_timezone(d, 'Europe/Zurich') return d def handle_import(request: 'CoreRequest', app: 'WtfsApp') -> None: context = Context(request.session) created = Bunch(towns={}, users=[], dates=[], jobs=[]) townids = {} deleted = set() for record in files.township: if record.deleted == '1': deleted.add(record.uid) continue # towns double as user groups group = context.groups.add( name=record.name, bfs_number=record.bfs_nr, address_supplement=record.address_extension, gpn_number=record.gp_nr.isdigit() and int(record.gp_nr), payment_type=town_payment_type(record), type='wtfs') townids[record.uid] = record.bfs_nr assert group.bfs_number not in created.towns created.towns[group.bfs_number] = group print(f'✓ Imported {len(created.towns)} towns') for record in files.users: user = User( username=record.email, role=roles[record.rolle], realname=record.name, active=True, second_factor=None, signup_token=None, password_hash=password_hash, group_id=record.bfs and created.towns[record.bfs].id or None, data={'contact': record.kontakt == 'j'}) context.session.add(user) created.users.append(user) print(f'✓ Imported {len(created.users)} users') for record in files.date: if record.township in deleted: continue if record.deleted == '1' or record.township == '0': continue dt = parse_datetime(record.date).date() pickup_date = PickupDate( date=dt, municipality_id=created.towns[townids[record.township]].id) context.session.add(pickup_date) created.dates.append(pickup_date) print(f'✓ Imported {len(created.dates)} dates') for record in files.transportorder: dispatch_date = parse_datetime(record.distribution_date).date() return_date = parse_datetime(record.return_date).date() if record.deleted == '1' or record.township == '0': continue if record.township in deleted: continue job = ScanJob( municipality_id=created.towns[townids[record.township]].id, type=types[record.transport_type], delivery_number=record.delivery_bill_number, # dispatch (in) dispatch_date=dispatch_date, dispatch_note=record.comment_delivery, dispatch_boxes=int(record.box_in), dispatch_tax_forms_current_year=int( record.tax_current_year_in), dispatch_tax_forms_last_year=int( record.tax_last_year_in), dispatch_tax_forms_older=int( record.tax_old_in), dispatch_single_documents=int( record.single_voucher_in), # targets (ribbon stands for "Bändliweg" I think) dispatch_cantonal_tax_office=int( record.ribbon_out), dispatch_cantonal_scan_center=int( record.cantonal_scan_center), # return (out) return_date=return_date, return_note=record.comment_handover, return_boxes=record.box_out, return_tax_forms_current_year=int( record.tax_current_year_out), return_tax_forms_last_year=int( record.tax_last_year_out), return_tax_forms_older=int( record.tax_old_out), return_single_documents=int( record.single_voucher_out), return_unscanned_tax_forms_current_year=int( record.not_scanned_current_year), return_unscanned_tax_forms_last_year=int( record.not_scanned_last_year), return_unscanned_tax_forms_older=int( record.not_scanned_old), return_unscanned_single_documents=int( record.not_scanned_voucher) ) context.session.add(job) created.jobs.append(job) print(f'✓ Imported {len(created.jobs)} jobs') return handle_import