Source code for pas.importer.zug_kub_importer

from __future__ import annotations

from onegov.pas import log
from onegov.pas.models import ImportLog


from typing import TYPE_CHECKING, Any, cast
if TYPE_CHECKING:
    from uuid import UUID
    from collections.abc import Sequence
    from sqlalchemy.orm import Session
    from onegov.pas.importer.json_import import (
        ImportCategoryResult,
        PersonData,
        OrganizationData,
        MembershipData,
    )


[docs] def import_zug_kub_data( session: Session, people_data: Sequence[PersonData], organization_data: Sequence[OrganizationData], membership_data: Sequence[MembershipData], user_id: UUID | None = None, import_type: str = 'cli', ) -> dict[str, ImportCategoryResult]: """ Imports data from KUB JSON files within a single transaction, logs the outcome, and returns details of changes including processed counts. Args: session: Database session people_data: People data to import organization_data: Organization data to import membership_data: Membership data to import user_id: ID of user performing the import (optional) import_type: Type of import ('cli', 'upload', or 'automatic') Returns a dictionary where keys are categories (e.g., 'parliamentarians') and values are dictionaries containing 'created' (list), 'updated' (list), and 'processed' (int). Rolls back changes within this import if an internal error occurs. Logs the attempt regardless of success or failure. """ from onegov.pas.importer.json_import import ( PeopleImporter, OrganizationImporter, MembershipImporter, ) import_details: dict[str, ImportCategoryResult] = {} log_status = 'failed' log_details: dict[str, Any] = {} final_error: Exception | None = None try: # Use a savepoint; rollback occurs automatically on exception with session.begin_nested(): people_importer = PeopleImporter(session) (parliamentarian_map, people_details, people_processed) = ( people_importer.bulk_import(people_data) ) import_details['parliamentarians'] = { 'created': people_details['created'], 'updated': people_details['updated'], 'processed': people_processed, } organization_importer = OrganizationImporter(session) ( commission_map, parliamentary_group_map, party_map, other_organization_map, org_details, org_processed_counts, ) = organization_importer.bulk_import(organization_data) for category, details_list_dict in org_details.items(): processed_count = org_processed_counts.get(category, 0) # Only add categories that have actual ORM objects # (Commissions, Parties) if category in ('commissions', 'parties'): import_details[category] = { 'created': details_list_dict.get('created', []), 'updated': details_list_dict.get('updated', []), 'processed': processed_count, } # Add 'other' processed count to log_details separately log_details['other_organizations_processed'] = ( org_processed_counts.get('other', 0) ) log_details['parliamentary_groups_processed'] = ( org_processed_counts.get('parliamentary_groups', 0) ) membership_importer = MembershipImporter(session) membership_importer.init( session, parliamentarian_map, commission_map, parliamentary_group_map, party_map, other_organization_map, ) (membership_details, membership_processed_counts) = ( membership_importer.bulk_import(membership_data) ) for category, details_dict in membership_details.items(): if category == 'parliamentarians_from_memberships': if isinstance( details_dict, dict ): import_details['parliamentarians']['created'].extend( details_dict.get('created', []) ) import_details['parliamentarians']['updated'].extend( details_dict.get('updated', []) ) elif category in ( 'commission_memberships', 'parliamentarian_roles', ): import_details[category] = cast( 'ImportCategoryResult', details_dict ) log_details['skipped_memberships'] = ( membership_processed_counts.get('skipped', 0) ) log_details['summary'] = {} for k, v in import_details.items(): # v is now guaranteed to be ImportCategoryResult log_details['summary'][k] = { 'created_count': len(v['created']), 'updated_count': len(v['updated']), 'processed_count': v['processed'], } log_details['summary']['other_organizations_processed'] = ( log_details['other_organizations_processed'] ) log_details['summary']['parliamentary_groups_processed'] = ( log_details['parliamentary_groups_processed'] ) log_details['summary']['skipped_memberships'] = log_details[ 'skipped_memberships' ] log_status = 'completed' log.info( 'KUB data import processing successful within transaction.' ) except Exception as e: final_error = e log_status = 'failed' log_details['error'] = str(e) log.error(f'KUB data import failed: {e}', exc_info=True) finally: try: import_log = ImportLog( user_id=user_id, details=log_details, status=log_status, import_type=import_type ) session.add(import_log) if session.is_active: session.flush() log.info( f'KUB data import attempt logged with status: {log_status}' ) except Exception as log_e: log.error( f'Failed to log import status: {log_e}', exc_info=True ) if final_error: raise RuntimeError('KUB data import failed.') from final_error return import_details