Source code for pas.importer.zug_kub_importer

from __future__ import annotations

from onegov.pas.log import log


from typing import TYPE_CHECKING, Any, cast
if TYPE_CHECKING:
    import logging
    from uuid import UUID
    from collections.abc import Sequence
    from sqlalchemy.orm import Session
    from onegov.pas.importer.json_import import (
        ImportCategoryResult,
        PersonData,
        OrganizationData,
        MembershipData,
    )


[docs] def _serialize_model_objects(obj_list: list[Any]) -> list[dict[str, Any]]: """Convert model objects to JSON-serializable dictionaries.""" serialized = [] for obj in obj_list: if hasattr(obj, '__tablename__'): # SQLAlchemy model object result = { 'id': str(obj.id) if obj.id else None, 'type': obj.__class__.__name__, } # Add identifying fields based on model type if hasattr(obj, 'last_name') and hasattr(obj, 'first_name'): result.update({ 'last_name': obj.last_name, 'first_name': obj.first_name, 'email': getattr(obj, 'email', None) }) elif hasattr(obj, 'name'): result['name'] = obj.name elif hasattr(obj, 'title'): result['title'] = obj.title serialized.append(result) else: # For non-model objects, keep as-is serialized.append(obj) return serialized
[docs] def import_zug_kub_data( session: Session, people_data: Sequence[PersonData], organization_data: Sequence[OrganizationData], membership_data: Sequence[MembershipData], user_id: UUID | None = None, import_type: str = 'cli', logger: logging.Logger | None = None, create_import_log: bool = True, ) -> dict[str, ImportCategoryResult]: """ Imports data from KUB JSON files within a single transaction, logs the outcome, and returns details of changes including processed counts. Args: session: Database session people_data: People data to import organization_data: Organization data to import membership_data: Membership data to import user_id: ID of user performing the import (optional) import_type: Type of import logger: Optional logger to use instead of module logger create_import_log: Whether to create ImportLog (True for web form, False for CLI/orchestrator) Returns a dictionary where keys are categories (e.g., 'parliamentarians') and values are dictionaries containing 'created' (list), 'updated' (list), and 'processed' (int). Rolls back changes within this import if an internal error occurs. Logs the attempt regardless of success or failure. """ from onegov.pas.importer.json_import import ( PeopleImporter, OrganizationImporter, MembershipImporter, ) import_details: dict[str, ImportCategoryResult] = {} log_status = 'failed' log_details: dict[str, Any] = {} final_error: Exception | None = None # Use provided logger or default to module logger if logger is None: logger = log try: # Use a savepoint; rollback occurs automatically on exception with session.begin_nested(): people_importer = PeopleImporter(session, logger) (parliamentarian_map, people_details, people_processed) = ( people_importer.bulk_import(people_data) ) import_details['parliamentarians'] = { 'created': _serialize_model_objects(people_details['created']), 'updated': _serialize_model_objects(people_details['updated']), 'processed': people_processed, } organization_importer = OrganizationImporter(session, logger) ( commission_map, parliamentary_group_map, party_map, other_organization_map, org_details, org_processed_counts, ) = organization_importer.bulk_import(organization_data) for category, details_list_dict in org_details.items(): processed_count = org_processed_counts.get(category, 0) # Only add categories that have actual ORM objects # (Commissions, Parties) if category in ('commissions', 'parties'): created_objs = details_list_dict.get('created', []) updated_objs = details_list_dict.get('updated', []) import_details[category] = { 'created': _serialize_model_objects(created_objs), 'updated': _serialize_model_objects(updated_objs), 'processed': processed_count, } # Add 'other' processed count to log_details separately log_details['other_organizations_processed'] = ( org_processed_counts.get('other', 0) ) log_details['parliamentary_groups_processed'] = ( org_processed_counts.get('parliamentary_groups', 0) ) membership_importer = MembershipImporter(session, logger) membership_importer.init( session, parliamentarian_map, commission_map, parliamentary_group_map, party_map, other_organization_map, ) (membership_details, membership_processed_counts) = ( membership_importer.bulk_import(membership_data) ) for category, details_dict in membership_details.items(): if category == 'parliamentarians_from_memberships': if isinstance( details_dict, dict ): created_parl = details_dict.get('created', []) updated_parl = details_dict.get('updated', []) import_details['parliamentarians']['created'].extend( _serialize_model_objects(created_parl) ) import_details['parliamentarians']['updated'].extend( _serialize_model_objects(updated_parl) ) elif category in ( 'commission_memberships', 'parliamentarian_roles', ): created_items = details_dict.get('created', []) updated_items = details_dict.get('updated', []) import_details[category] = cast( 'ImportCategoryResult', { 'created': _serialize_model_objects(created_items), 'updated': _serialize_model_objects(updated_items), 'processed': details_dict.get('processed', 0) } ) log_details['skipped_memberships'] = ( membership_processed_counts.get('skipped', 0) ) log_details['summary'] = {} for k, v in import_details.items(): # v is now guaranteed to be ImportCategoryResult log_details['summary'][k] = { 'created_count': len(v['created']), 'updated_count': len(v['updated']), 'processed_count': v['processed'], } log_details['summary']['other_organizations_processed'] = ( log_details['other_organizations_processed'] ) log_details['summary']['parliamentary_groups_processed'] = ( log_details['parliamentary_groups_processed'] ) log_details['summary']['skipped_memberships'] = log_details[ 'skipped_memberships' ] log_status = 'completed' logger.info( 'KUB data import processing successful within transaction.' ) except Exception as e: final_error = e log_status = 'failed' log_details['error'] = str(e) logger.exception('KUB data import failed') finally: # Create ImportLog if requested (for web form imports) if create_import_log: try: from onegov.pas.models import ImportLog import_log = ImportLog( user_id=user_id, details=log_details, status=log_status, import_type=import_type ) session.add(import_log) if session.is_active: session.flush() # Add import log ID to import details for redirect import_details['_import_log_id'] = import_log.id # type: ignore logger.info( f'KUB data import attempt logged with status: {log_status}' ) except Exception: logger.exception( 'Failed to log import status' ) else: # ImportLog creation is handled by the orchestrator logger.info( f'KUB data import completed with status: {log_status}' ) if final_error: raise RuntimeError('KUB data import failed.') from final_error return import_details