Source code for pas.cronjobs

from __future__ import annotations

import glob
import logging
from onegov.pas.app import PasApp
from onegov.pas.collections.parliamentarian import (
    PASParliamentarianCollection,
)
from onegov.pas.importer.orchestrator import KubImporter
from onegov.pas.log import CompositeOutputHandler, LogOutputHandler
from onegov.pas.importer.output_handlers import DatabaseOutputHandler
from onegov.pas.models import ImportLog
from sqlalchemy.orm.attributes import flag_modified

from typing import Any, TYPE_CHECKING, cast
if TYPE_CHECKING:
    from onegov.pas.app import PasApp as PasAppType
    from onegov.pas.request import PasRequest

[docs] log = logging.getLogger('onegov.pas.cronjobs')
[docs] def _resolve_cert(cert_dir: str) -> tuple[str, str]: crt = glob.glob(f'{cert_dir}/*.crt') key = glob.glob(f'{cert_dir}/*.key') if len(crt) != 1 or len(key) != 1: raise FileNotFoundError( f'Expected exactly one .crt and one .key in {cert_dir}, ' f'found {len(crt)} .crt and {len(key)} .key' ) return crt[0], key[0]
@PasApp.cronjob(hour='*', minute=0, timezone='UTC')
[docs] def hourly_kub_data_import(request: PasRequest) -> None: try: trigger_kub_data_import(request, import_type='automatic') except ValueError as e: log.warning(f'KUB import skipped: {e}') except Exception: log.exception('KUB import failed') raise
[docs] def trigger_kub_data_import( request: PasRequest, import_type: str ) -> dict[str, Any] | None: app = request.app if not (kub_token := getattr(app, 'kub_api_token', None)): return None if not (kub_base_url := getattr(app, 'kub_base_url', None)): return None cert_dir = getattr(app, 'kub_cert_dir', None) if not cert_dir: return None cert = _resolve_cert(cert_dir) db_handler = DatabaseOutputHandler() log_handler = LogOutputHandler() output_handler = CompositeOutputHandler(db_handler, log_handler) with KubImporter( kub_token, kub_base_url, output_handler, cert=cert ) as importer: combined_results, import_log_id = importer.run_full_sync( request, cast('PasAppType', app), import_type ) # Sync user accounts right after import collection = PASParliamentarianCollection(app) sync_result = collection.sync_user_accounts() combined_results['user_sync'] = sync_result log.info( f'User account sync: {sync_result["synced"]} synced, ' f'{sync_result["skipped"]} skipped' ) if sync_result['created']: log.info( f'Created {len(sync_result["created"])} user ' f'accounts: {", ".join(sync_result["created"])}' ) # Store user sync results in ImportLog import_log = request.session.get(ImportLog, import_log_id) if import_log: import_log.details['user_sync'] = sync_result flag_modified(import_log, 'details') request.session.flush() return { 'import_results': combined_results.get('import', {}), 'custom_results': combined_results.get('custom_data'), 'user_sync': sync_result, 'import_log_id': import_log_id }