import click
import transaction
from onegov.agency.collections import ExtendedAgencyCollection
from onegov.agency.data_import import (import_bs_data,
import_membership_titles,
import_lu_data)
from onegov.agency.excel_export import export_person_xlsx
from onegov.agency.models import ExtendedAgencyMembership, ExtendedPerson
from onegov.core.cli import command_group
from onegov.core.cli import pass_group_context
from onegov.people import Agency, Person, AgencyMembership
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from onegov.agency.app import AgencyApp
from onegov.agency.request import AgencyRequest
from onegov.core.cli.core import GroupContext
from sqlalchemy.orm import Session
@cli.command('consolidate', context_settings={'singular': True})
@click.option('--based-on', required=True, default='email')
@click.option('--ignore_case', is_flag=True, required=True, default=True)
@click.option('--dry-run', is_flag=True, default=False)
@click.option('--verbose', is_flag=True, default=False, help='Verbose mode')
[docs]
def consolidate_cli(
based_on: str,
ignore_case: bool,
dry_run: bool,
verbose: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
"""
Consolidates double entries of person objects based on the property
`based_on`. Property must be convertible to string.
"""
buffer = 100
def find_double_entries(session: 'Session') -> tuple[
dict[str, ExtendedPerson],
dict[str, list[ExtendedPerson]]
]:
seen: dict[str, ExtendedPerson] = {}
to_consolidate: dict[str, list[ExtendedPerson]] = {}
for person in session.query(ExtendedPerson):
identifier = getattr(person, based_on)
if not identifier:
continue
identifier = str(identifier)
id_mod = identifier.lower() if ignore_case else identifier
if id_mod in seen:
to_remove = to_consolidate.setdefault(id_mod, [])
to_remove.append(person)
else:
seen[id_mod] = person
return seen, to_consolidate
def consolidate_persons(
person: ExtendedPerson,
persons: list[ExtendedPerson],
identifier: str
) -> ExtendedPerson:
"""Consolidates person with persons on their attributes,
completing person arg. The identifier is excluded from comparison"""
attributes = (
'salutation', 'academic_title', 'first_name', 'last_name', 'born',
'profession', 'function', 'political_party', 'parliamentary_group',
'picture_url', 'email', 'phone', 'phone_direct', 'website',
'address', 'notes', 'access'
)
assert persons
if not persons:
return person
for current in persons:
for attr in attributes:
if attr == identifier:
continue
value = getattr(current, attr)
existing = getattr(person, attr)
if not existing and value:
if verbose:
print(f'Setting {person.title}: {attr}={value}')
setattr(person, attr, value)
return person
def consolidate_memberships(
session: 'Session',
person: ExtendedPerson,
persons: list[ExtendedPerson]
) -> None:
assert person not in persons
for p in persons:
for membership in p.memberships:
membership.person_id = person.id
session.flush()
session.delete(p)
def do_consolidate(request: 'AgencyRequest', app: 'AgencyApp') -> None:
session = request.session
first_seen, to_consolidate = find_double_entries(session)
print(f'Double entries found based on '
f'{based_on}: {len(to_consolidate)}')
count = session.query(ExtendedAgencyMembership).count()
for ix, (id_, persons) in enumerate(to_consolidate.items()):
person = first_seen[id_]
person = consolidate_persons(person, persons, id_)
session.flush()
consolidate_memberships(session, person, persons)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
count_after = session.query(ExtendedAgencyMembership).count()
assert count == count_after, f'before: {count}, after {count_after}'
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
return do_consolidate
@cli.command('import-bs-membership-title', context_settings={'singular': True})
@click.argument('agency-file', type=click.Path(exists=True))
@click.argument('people-file', type=click.Path(exists=True))
@click.option('--dry-run', is_flag=True, default=False)
[docs]
def import_bs_function(
agency_file: str,
people_file: str,
dry_run: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
def execute(request: 'AgencyRequest', app: 'AgencyApp') -> None:
import_membership_titles(agency_file, people_file, request, app)
total_empty_titles = 0
for membership in request.session.query(ExtendedAgencyMembership):
title = membership.title
if not title.strip():
membership.title = 'Mitglied'
total_empty_titles += 1
print('Corrected remaining empty titles: ', total_empty_titles)
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
return execute
@cli.command('import-bs-data', context_settings={'singular': True})
@click.argument('agency-file', type=click.Path(exists=True))
@click.argument('people-file', type=click.Path(exists=True))
@click.option('--dry-run', is_flag=True, default=False)
@click.option('--clean', is_flag=True, default=False)
[docs]
def import_bs_data_files(
agency_file: str,
people_file: str,
dry_run: bool,
clean: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
"""
Usage:
onegov-agency --select /onegov_agency/bs import-bs-data \
$agency_file $people_file
"""
buffer = 100
def execute(request: 'AgencyRequest', app: 'AgencyApp') -> None:
if clean:
session = request.session
for ix, membership in enumerate(session.query(AgencyMembership)):
session.delete(membership)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
for ix, person in enumerate(session.query(Person)):
session.delete(person)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
for ix, agency in enumerate(session.query(Agency)):
session.delete(agency)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
session.flush()
click.secho(
'All Memberships, Agencies and Persons removed', fg='green')
click.secho('Exiting...')
return
agencies, people = import_bs_data(
agency_file, people_file, request, app)
click.secho(f'Imported {len(agencies.keys())} agencies '
f'and {len(people)} persons',
fg='green')
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
return execute
@cli.command('import-lu-data', context_settings={'singular': True})
@click.argument('data-file', type=click.Path(exists=True))
@click.option('--dry-run', is_flag=True, default=False)
@click.option('--clean', is_flag=True, default=False)
[docs]
def import_lu_data_files(
data_file: str,
dry_run: bool,
clean: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
"""
Usage:
onegov-agency --select /onegov_agency/lu import-lu-data $people_file
"""
buffer = 100
def execute(request: 'AgencyRequest', app: 'AgencyApp') -> None:
if clean:
session = request.session
for ix, person in enumerate(session.query(Person)):
session.delete(person)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
session.flush()
click.secho('All Persons removed', fg='green')
for ix, agency in enumerate(session.query(Agency)):
session.delete(agency)
if ix % buffer == 0:
app.es_indexer.process()
app.psql_indexer.bulk_process(session)
session.flush()
click.secho('All Agencies removed', fg='green')
click.secho('Exiting...')
return
agencies, people = import_lu_data(data_file, request, app)
click.secho(f'Imported {len(people)} persons and '
f'{len(agencies)} agencies', fg='green')
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
return execute
@cli.command('create-pdf')
@pass_group_context
@click.option('--root/--no-root', default=True)
@click.option('--recursive/--no-recursive', default=True)
[docs]
def create_pdf(
group_context: 'GroupContext',
root: bool,
recursive: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
def _create_pdf(request: 'AgencyRequest', app: 'AgencyApp') -> None:
session = app.session()
agencies = ExtendedAgencyCollection(session)
if root:
# FIXME: asymmetric property
app.root_pdf = app.pdf_class.from_agencies( # type:ignore
agencies=agencies.roots,
title=app.org.name,
toc=True,
exclude=app.org.hidden_people_fields,
page_break_on_level=int(app.org.meta.get(
'page_break_on_level_root_pdf', 1)),
link_color=app.org.meta.get('pdf_link_color'),
underline_links=app.org.meta.get('pdf_underline_links', False)
)
click.secho('Root PDF created', fg='green')
if recursive:
for agency in agencies.query():
# FIXME: asymmetric property
agency.pdf_file = app.pdf_class.from_agencies( # type:ignore
agencies=[agency],
title=agency.title,
toc=False,
exclude=app.org.hidden_people_fields,
page_break_on_level=int(app.org.meta.get(
'page_break_on_level_org_pdf', 1)),
link_color=app.org.meta.get('pdf_link_color'),
underline_links=app.org.meta.get(
'pdf_underline_links', False)
)
click.secho(f"Created PDF of '{agency.title}'", fg='green')
return _create_pdf
@cli.command('export-xlsx')
@pass_group_context
@click.option('--people', default=True, is_flag=True)
[docs]
def export_xlsx(
group_context: 'GroupContext',
people: bool
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
def _export_xlsx(request: 'AgencyRequest', app: 'AgencyApp') -> None:
session = app.session()
if people:
xlsx = export_person_xlsx(session)
# FIXME: asymmetric property
app.people_xlsx = xlsx # type:ignore[assignment]
click.secho("Created XLSX for people'", fg='green')
return _export_xlsx
@cli.command('enable-yubikey')
@pass_group_context
[docs]
def enable_yubikey(
group_context: 'GroupContext'
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
def _enable_yubikey(request: 'AgencyRequest', app: 'AgencyApp') -> None:
if app.org:
app.org.meta['enable_yubikey'] = True
click.secho('YubiKey enabled', fg='green')
return _enable_yubikey
@cli.command('disable-yubikey')
@pass_group_context
[docs]
def disable_yubikey(
group_context: 'GroupContext'
) -> 'Callable[[AgencyRequest, AgencyApp], None]':
def _disable_yubikey(request: 'AgencyRequest', app: 'AgencyApp') -> None:
if app.org:
app.org.meta['enable_yubikey'] = False
click.secho('YubiKey disabled', fg='green')
return _disable_yubikey