Source code for pas.cli

from __future__ import annotations

import click
import transaction
import logging
import warnings
from onegov.core.cli import command_group
from onegov.pas.collections.parliamentarian import PASParliamentarianCollection
from onegov.pas.excel_header_constants import (
    commission_expected_headers_variant_1,
    commission_expected_headers_variant_2,
)
from onegov.pas.data_import import import_commissions_excel
from onegov.pas.log import ClickOutputHandler, CompositeOutputHandler
from onegov.pas.importer.output_handlers import DatabaseOutputHandler
from onegov.pas.importer.orchestrator import (
    KubImporter
)


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.pas.app import PasApp
    from onegov.pas.request import PasRequest

[docs] type Processor = Callable[[PasRequest, PasApp], None]
[docs] log = logging.getLogger('onegov.org.cli')
[docs] cli = command_group()
# Try to suppress Fontconfig error messages warnings.filterwarnings('ignore', message='.*Fontconfig error.*') warnings.filterwarnings('ignore', message='.*No writable cache directories.*') @cli.command('import-commission-data') @click.argument('excel_file', type=click.Path(exists=True))
[docs] def import_commission_data( excel_file: str, ) -> Processor: """ Note: This is deprecated, not really used, as we have the JSON import. Import commission data from an Excel or csv file. Assumes that the name of the commission is the filename. Each row of this import contains a single line, which is a single parliamentarian and all the information about them. Example: onegov-pas --select '/onegov_pas/zug' import-commission-data \ "Kommission_Gesundheit_und_Soziales.xlsx" """ def import_data(request: PasRequest, app: PasApp) -> None: try: import_commissions_excel( excel_file, request.session, excel_file, expected_headers=commission_expected_headers_variant_1, ) click.echo('Ok.') except Exception: click.echo('Trying the other variant of headers...') import_commissions_excel( excel_file, request.session, excel_file, expected_headers=commission_expected_headers_variant_2, ) click.echo('Ok.') return import_data
@cli.command(name='update-accounts', context_settings={'singular': True}) @click.option('--dry-run/-no-dry-run', default=False)
[docs] def update_accounts_cli(dry_run: bool) -> Processor: """ Updates user accounts for parliamentarians. """ def do_update_accounts(request: PasRequest, app: PasApp) -> None: parliamentarians = PASParliamentarianCollection(app) for parliamentarian in parliamentarians.query(): if not parliamentarian.email_primary: click.echo( f'Skipping {parliamentarian.title}, no primary email.' ) continue parliamentarians.update_user( parliamentarian, parliamentarian.email_primary ) if dry_run: transaction.abort() return do_update_accounts
@cli.command(name='update-account-single', context_settings={'singular': True}) @click.option('--email', required=True, help='Email of the parliamentarian') @click.option('--dry-run/-no-dry-run', default=False)
[docs] def update_account_single_cli(email: str, dry_run: bool) -> Processor: """Updates user account for a single parliamentarian by email.""" def do_update_account(request: PasRequest, app: PasApp) -> None: from onegov.pas.models import PASParliamentarian parliamentarians = PASParliamentarianCollection(app) parliamentarian = ( parliamentarians.query() .filter(PASParliamentarian.email_primary == email) .first() ) if not parliamentarian: click.secho(f'No parliamentarian found: {email}', fg='red') transaction.abort() return parliamentarians.update_user( parliamentarian, parliamentarian.email_primary ) click.secho( f'Updated account for parliamentarian: {email}', fg='green' ) if dry_run: transaction.abort() click.secho('Dry run - changes aborted', fg='yellow') return do_update_account
@cli.command('import-kub-data') @click.option('--token', required=True, help='Authorization token for KUB API') @click.option('--base-url', help='Base URL for the KUB API, ending in /api/v2') @click.option('--cert-dir', required=True, type=click.Path(exists=True, file_okay=False), help='Directory containing .crt and .key files') @click.option('--update-custom/--no-update-custom', default=True, help='Update parliamentarians with custom field data ' 'after import (default: enabled)') @click.option('--max-workers', default=3, type=int, help='Maximum number of concurrent workers for ' 'custom data update (default: 3)')
[docs] def import_kub_data( token: str, base_url: str, cert_dir: str, update_custom: bool, max_workers: int, ) -> Processor: """ Import data from the KUB API endpoints. Fetches data from /people, /organizations, and /memberships endpoints and imports them using the existing import logic. Optionally updates parliamentarians with custom field data from individual API calls using multi-threaded processing for improved performance. Example: onegov-pas --select '/onegov_pas/zug' import-kub-data \ --token "your-token-here" # Skip custom data update: onegov-pas --select '/onegov_pas/zug' import-kub-data \ --token "your-token-here" --no-update-custom # Use more workers for faster custom data processing: onegov-pas --select '/onegov_pas/zug' import-kub-data \ --token "your-token-here" --max-workers 5 """ def cli_wrapper(request: PasRequest, app: PasApp) -> None: """CLI wrapper that calls the orchestrator.""" # Create composite output handler for both CLI and database click_handler = ClickOutputHandler() db_handler = DatabaseOutputHandler() output_handler = CompositeOutputHandler(click_handler, db_handler) from onegov.pas.cronjobs import _resolve_cert cert = _resolve_cert(cert_dir) try: with KubImporter( token, base_url, output_handler, cert=cert ) as importer: combined_results, import_log_id = importer.run_full_sync( request, app, 'cli', update_custom, max_workers ) # Display summary import_results = combined_results.get('import', {}) custom_results = combined_results.get('custom_data') click_handler.info('Import Results Summary:') for category, details in import_results.items(): created_count = len(details.get('created', [])) updated_count = len(details.get('updated', [])) processed_count = details.get('processed', 0) if (created_count > 0 or updated_count > 0 or processed_count > 0): click_handler.info( f' {category}: {created_count} created, ' f'{updated_count} updated, {processed_count} processed' ) if custom_results and 'error' not in custom_results: click_handler.success( f'Custom data update: {custom_results["updated"]} ' f'updated, {custom_results["errors"]} errors' ) click_handler.success(f'Import log ID: {import_log_id}') except Exception as e: click_handler.error(f'Import failed: {e}') raise return cli_wrapper
@cli.command('update-custom-data') @click.option('--token', required=True, help='Authorization token for KUB API') @click.option('--base-url', required=True, help='Base URL for the KUB API, ending in /api/v2') @click.option('--cert-dir', required=True, type=click.Path(exists=True, file_okay=False), help='Directory containing .crt and .key files') @click.option('--max-workers', default=3, type=int, help='Maximum number of concurrent workers ' '(default: 3)')
[docs] def update_custom_data( token: str, base_url: str, cert_dir: str, max_workers: int, ) -> Processor: """ Update parliamentarians with customFields data which somehow is not included in the main /people api. Needs to be run after import_kub_data Uses multi-threading to fetch custom data concurrently from the API while maintaining thread-safe database operations. Example: onegov-pas --select '/onegov_pas/zug' update-custom-data \ --token "your-token-here" \ --base-url "url-ending-in/api/v2" # Use more workers for faster processing: onegov-pas --select '/onegov_pas/zug' update-custom-data \ --token "your-token-here" \ --base-url "url-ending-in/api/v2" \ --max-workers 5 """ def update_data(request: PasRequest, app: PasApp) -> None: # Create composite output handler for both CLI and database click_handler = ClickOutputHandler() db_handler = DatabaseOutputHandler() output_handler = CompositeOutputHandler(click_handler, db_handler) from onegov.pas.cronjobs import _resolve_cert cert = _resolve_cert(cert_dir) try: with KubImporter( token, base_url, output_handler, cert=cert ) as importer: updated_count, error_count, _output_messages = ( importer.update_custom_data( request, app, max_workers ) ) click_handler.success( f'Update completed: {updated_count} updated, ' f'{error_count} errors' ) except Exception as e: click_handler.error(f'Custom data update failed: {e}') raise return update_data