from __future__ import annotations
from datetime import date, datetime
from uuid import UUID
from onegov.pas import log
from onegov.pas.models import (
CommissionMembership,
Parliamentarian,
Commission,
ParliamentaryGroup,
Party,
ParliamentarianRole,
ImportLog,
)
from sqlalchemy.orm import selectinload
from typing import Any, Literal, TypedDict, cast
# Define TypedDicts for data structures used in JSON import
# These are defined outside TYPE_CHECKING to be available at runtime
# Because we validate the json structure has actually these fields
[docs]
class OrganizationData(TypedDict):
[docs]
organizationTypeTitle: (
Literal['Kommission', 'Fraktion', 'Kantonsrat', 'Sonstige'] | None
)
[docs]
thirdPartyId: str | None
[docs]
class EmailData(TypedDict):
[docs]
thirdPartyId: str | None
[docs]
class AddressData(TypedDict):
[docs]
organisationNameAddOn1: str
[docs]
organisationNameAddOn2: str
[docs]
postOfficeBox: str
[docs]
thirdPartyId: str | None
[docs]
class PhoneNumberData(TypedDict):
[docs]
otherPhoneCategory: str | None
[docs]
phoneCategoryText: str
[docs]
thirdPartyId: str | None
[docs]
class UrlData(TypedDict):
[docs]
thirdPartyId: str | None
[docs]
class OrganizationDataWithinMembership(TypedDict):
[docs]
organizationTypeTitle: (
Literal['Kommission', 'Fraktion', 'Kantonsrat', 'Sonstige'] | None
)
[docs]
primaryEmail: EmailData | None
[docs]
thirdPartyId: str | None
[docs]
primaryAddress: AddressData | None
[docs]
primaryPhoneNumber: PhoneNumberData | None
[docs]
primaryUrl: UrlData | None
[docs]
class PersonData(TypedDict):
[docs]
personTypeTitle: str | None
[docs]
primaryEmail: EmailData | None
[docs]
class MembershipData(TypedDict):
[docs]
emailReceptionType: str
[docs]
organization: OrganizationDataWithinMembership
[docs]
primaryAddress: AddressData | None
[docs]
primaryEmail: EmailData | None
[docs]
primaryPhoneNumber: PhoneNumberData | None
[docs]
primaryUrl: UrlData | None
[docs]
email: EmailData | None
[docs]
phoneNumber: PhoneNumberData | None
[docs]
address: AddressData | None
[docs]
urlField: UrlData | None
[docs]
start: str | bool | None
[docs]
thirdPartyId: str | None
from typing import TYPE_CHECKING, TypedDict
if TYPE_CHECKING:
from onegov.pas.models.parliamentarian_role import ParliamentaryGroupRole
from onegov.pas.models.parliamentarian_role import PartyRole
from onegov.pas.models.parliamentarian_role import Role
from collections.abc import Sequence
from sqlalchemy.orm import Session
[docs]
class ImportCategoryResult(TypedDict):
[docs]
class DataImporter:
"""Base class for all importers with common functionality."""
def __init__(self, session: Session) -> None:
[docs]
def parse_date(
self,
date_string: str | None,
) -> date | None:
if not date_string:
return None
try:
# Handle potential timezone info if present
if date_string.endswith('Z'):
date_string = date_string[:-1] + '+00:00'
dt = datetime.fromisoformat(date_string)
return dt.date()
except ValueError:
log.warning(f'Could not parse date string: {date_string}')
return None
[docs]
def _bulk_save(self, objects: list[Any], object_type: str) -> int:
"""Save a batch of objects to the database and return the count."""
count = 0
try:
if objects:
count = len(objects)
self.session.add_all(objects)
# Flush is removed, commit at the end of import_zug_kub_data
log.info(f'Saved {count} new {object_type}')
return count
except Exception as e:
log.error(
f'Error saving {count} {object_type}: {e}', exc_info=True
)
self.session.rollback()
raise RuntimeError(f'Error bulk saving {object_type}') from e
[docs]
class PeopleImporter(DataImporter):
"""Importer for Parliamentarian data from api/v2/people endpoint
(People.json)
API Inconsistency Note:
The API design is inconsistent regarding address information.
While 'people.json', which is expected to be the primary source
for person details, *lacks* address information, 'memberships.json'
*does* embed address data within the 'person' object and sometimes
at the membership level. This requires us to extract address data
from 'memberships.json' instead of the more logical 'people.json'
endpoint.
Whatever the reasons for this, we need to be careful to avoid potential
inconsistencies if addresses are not carefully managed across both
endpoints in the source system
"""
[docs]
person_attribute_map: dict[str, str] = {
'id': 'external_kub_id',
'firstName': 'first_name',
'officialName': 'last_name',
'title': 'academic_title',
'salutation': 'salutation',
}
[docs]
def bulk_import(self, people_data: Sequence[PersonData]) -> tuple[
dict[str, Parliamentarian],
dict[str, list[Parliamentarian]],
int, # Add processed count
]:
"""
Imports people from JSON data.
Returns:
A tuple containing:
- A map of external KUB ID to Parliamentarian objects.
- A dictionary with lists of created and updated parliamentarians.
- The total number of people records processed from the input.
"""
new_parliamentarians: list[Parliamentarian] = []
updated_parliamentarians: list[Parliamentarian] = []
processed_count = 0
result_map: dict[str, Parliamentarian] = {}
# Fetch existing parliamentarians by external_kub_id
existing_ids = [p['id'] for p in people_data if p.get('id')]
existing_parliamentarians = (
self.session.query(Parliamentarian)
.filter(Parliamentarian.external_kub_id.in_(existing_ids))
.all()
)
existing_map = {
p.external_kub_id: p
for p in existing_parliamentarians
if p.external_kub_id
}
for person_data in people_data:
processed_count += 1 # Count each record we attempt to process
person_id = person_data.get('id')
if not person_id:
log.warning(
f'Skipping person entry due to missing ID: '
f'{person_data.get("fullName")}'
)
continue
try:
# Check if this person ID has already been processed in this
# batch
if person_id in result_map:
log.warning(
f'Skipping duplicate person ID within import batch: '
f'{person_id}'
)
continue
# Convert person_id string to UUID for DB lookup
try:
person_uuid = UUID(person_id)
except ValueError:
log.exception(
f'Skipping person due to invalid UUID format: '
f'{person_id} - {person_data.get("fullName")}'
)
continue
parliamentarian = existing_map.get(person_uuid)
if parliamentarian:
# Update existing parliamentarian from DB
was_updated = self._update_parliamentarian_attributes(
parliamentarian, person_data
)
log.debug(
f'Updating existing parliamentarian: {person_id}'
)
if was_updated:
updated_parliamentarians.append(parliamentarian)
result_map[person_id] = parliamentarian
else:
parliamentarian = self._create_parliamentarian(person_data)
if not parliamentarian:
continue
log.debug(f'Creating new parliamentarian: {person_id}')
new_parliamentarians.append(parliamentarian)
result_map[person_id] = parliamentarian
# Add to new list, _bulk_save confirms DB insert
except Exception as e:
log.error(
f'Error processing person with id {person_id}: {e}',
exc_info=True,
)
self._bulk_save(new_parliamentarians, 'new parliamentarians')
# Filter out any objects that failed to save
# (though _bulk_save raises on DB error)
created_list = [p for p in new_parliamentarians if p.id]
updated_list = [p for p in updated_parliamentarians if p.id]
details = {'created': created_list, 'updated': updated_list}
log.info(
f'Parliamentarian import details: '
f'Created: {len(details["created"])}, '
f'Updated: {len(details["updated"])}, '
f'Processed: {processed_count}'
)
return result_map, details, processed_count
[docs]
def _update_parliamentarian_attributes(
self, parliamentarian: Parliamentarian, person_data: PersonData
) -> bool:
"""Updates attributes of an existing Parliamentarian object.
Returns True if any attributes were changed, False otherwise."""
changed = False
for json_key, model_attr in self.person_attribute_map.items():
if val := person_data.get(json_key):
# Only update if the value is different to avoid unnecessary
# writes
if getattr(parliamentarian, model_attr) != val:
setattr(parliamentarian, model_attr, val)
changed = True
# Handle nested primaryEmail
primary_email_data = person_data.get('primaryEmail')
new_email = (
primary_email_data.get('email') if primary_email_data else None
)
if new_email and parliamentarian.email_primary != new_email:
parliamentarian.email_primary = new_email
changed = True
return changed
[docs]
def _create_parliamentarian(
self, person_data: PersonData
) -> Parliamentarian | None:
"""Creates a single new Parliamentarian object from person data."""
person_id = person_data.get('id')
person_id = person_data.get('id')
if not person_id:
log.error(
f'Skipping person creation due to missing ID: '
f'{person_data.get("fullName")}'
)
return None
parliamentarian_kwargs = {}
for json_key, model_attr in self.person_attribute_map.items():
if val := person_data.get(json_key):
parliamentarian_kwargs[model_attr] = val
# Handle nested primaryEmail
primary_email_data = person_data.get('primaryEmail')
if primary_email_data and primary_email_data.get('email'):
parliamentarian_kwargs['email_primary'] = primary_email_data[
'email'
]
parliamentarian = Parliamentarian(**parliamentarian_kwargs)
return parliamentarian
[docs]
class OrganizationImporter(DataImporter):
"""Importer for organization data from organizations.json."""
[docs]
def bulk_import(
self, organizations_data: Sequence[OrganizationData]
) -> tuple[
dict[str, Commission],
dict[str, ParliamentaryGroup], # Currently unused, but kept for
# structure
dict[str, Party],
dict[str, Any], # Other orgs
dict[str, dict[str, list[Commission | Party]]], # Details
dict[str, int], # Processed counts per type
]:
"""
Imports organizations from JSON data. Returns maps, details, and
counts.
Returns:
Tuple containing maps of external IDs to:
- Commissions
- Parliamentary Groups
- Parties (created from parliamentary groups)
- Other organizations
- Dictionary containing lists of created/updated objects per type.
"""
commissions_to_create: list[Commission] = []
commissions_to_update: list[Commission] = []
parties_to_create: list[Party] = []
parties_to_update: list[Party] = []
# Details structure to return
details: dict[str, dict[str, list[Commission | Party]]] = {
'commissions': {'created': [], 'updated': []},
'parliamentary_groups': {'created': [], 'updated': []},
'parties': {'created': [], 'updated': []},
# 'other' are not ORM objects, just counted/listed
}
processed_counts = {
'commissions': 0,
'parliamentary_groups': 0, # Fraktion
'parties': 0, # Also Fraktion, mapped to Party
'other': 0, # Kantonsrat, Sonstige, Legislatur
}
# Maps to return (will contain both existing and new objects)
commission_map: dict[str, Commission] = {}
parliamentary_group_map: dict[str, ParliamentaryGroup] = {}
party_map: dict[str, Party] = {}
# Not ORM objects, no update needed
other_organizations: dict[str, Any] = {}
# Fetch existing organizations by external_kub_id
existing_ids = [o['id'] for o in organizations_data if o.get('id')]
existing_commissions = (
self.session.query(Commission)
.filter(Commission.external_kub_id.in_(existing_ids))
.all()
)
existing_parties = (
self.session.query(Party)
.filter(Party.external_kub_id.in_(existing_ids))
.all()
)
existing_commission_map = {
c.external_kub_id: c for c in existing_commissions
}
existing_party_map = {p.external_kub_id: p for p in existing_parties}
for org_data in organizations_data:
org_id = org_data.get('id')
if not org_id:
log.warning( # Downgrade to warning
f'Skipping organization without ID: {org_data.get("name")}'
)
continue
organization_type_title = org_data.get('organizationTypeTitle')
org_name = org_data.get('name')
try:
organization_type_title = org_data.get('organizationTypeTitle')
org_name = org_data.get('name', '')
if organization_type_title == 'Kommission':
processed_counts['commissions'] += 1
try:
org_uuid = UUID(org_id)
except ValueError:
log.exception(
f'Invalid UUID for commission ID: {org_id}'
)
continue
commission = existing_commission_map.get(org_uuid)
if commission:
# Update existing commission found in initial query
updated = False
if commission.name != org_name:
commission.name = org_name
updated = True
if updated:
commissions_to_update.append(commission)
log.debug(
f'Updating commission (from initial query): '
f'{org_id}'
)
# No need to add to save list, session tracks changes
else:
commission = Commission(
external_kub_id=org_uuid,
name=org_name,
type='normal',
)
log.debug(f'Creating new commission: {org_id}')
# Only add *new* commissions to the save list
commissions_to_create.append(commission)
# Add to map regardless
commission_map[org_id] = commission
elif organization_type_title == 'Fraktion':
# Count both as parliamentary_group and party for
# processing
processed_counts['parliamentary_groups'] += 1
processed_counts['parties'] += 1
try:
org_uuid = UUID(org_id)
except ValueError:
log.exception(
f'Invalid UUID for party/Fraktion ID: {org_id}'
)
continue
# Treat 'Fraktion' as Party based on observation
party = existing_party_map.get(org_uuid)
if party:
# Update existing party found in initial query
updated = False
if party.name != org_name:
party.name = org_name
updated = True
if updated:
parties_to_update.append(party)
log.debug(
f'Updating party (from Fraktion, initial '
f'query): {org_id}'
)
# No need to add to save list, session tracks changes
else:
# Create new party (since not found in initial map)
party = Party(external_kub_id=org_uuid, name=org_name)
log.debug(
f'Creating party (from Fraktion): {org_id}'
)
# Only add *new* parties to the save list
parties_to_create.append(party)
# Use org_id for party_map key consistency
party_map[org_id] = party
elif organization_type_title in ('Kantonsrat', 'Sonstige'):
processed_counts['other'] += 1
other_organizations[org_id] = {
'id': org_id,
'name': org_data.get('name', ''),
'type': organization_type_title.lower(),
}
elif org_name is not None and 'Legislatur' in org_name:
# Count Legislatur as 'other'
processed_counts['other'] += 1
else:
# Count unknown types as 'other' as well
processed_counts['other'] += 1
log.warning(
f'Unknown organization type: {organization_type_title}'
f' for {org_name}'
)
except Exception as e:
log.error(
f'Error importing organization '
f'{org_data.get("name")}: {e}',
exc_info=True,
)
# Save newly created objects
if commissions_to_create:
self._bulk_save(commissions_to_create, 'new commissions')
# if parliamentary_groups_to_create: # Not implemented
# self._bulk_save(parliamentary_groups_to_create,
# 'new parliamentary groups')
if parties_to_create:
self._bulk_save(parties_to_create, 'new parties')
# Flush is removed, commit at the end of import_zug_kub_data
# IDs will be assigned after the final commit.
details['commissions']['created'] = list(commissions_to_create)
details['commissions']['updated'] = list(commissions_to_update)
details['parties']['created'] = list(parties_to_create)
details['parties']['updated'] = list(parties_to_update)
log.info(
f'Organization import details: '
f'Commissions(C:{len(details["commissions"]["created"])}, '
f'U:{len(details["commissions"]["updated"])}, '
f'P:{processed_counts["commissions"]}), '
f'Parties(C:{len(details["parties"]["created"])}, '
f'U:{len(details["parties"]["updated"])}, '
f'P:{processed_counts["parties"]}), ' # Note: Parties from Frakt.
f'Other Processed: {processed_counts["other"]}'
)
# Return maps, details, and processed counts
return (
commission_map,
parliamentary_group_map,
party_map,
other_organizations,
details,
processed_counts, # Add processed counts here
)
[docs]
class MembershipImporter(DataImporter):
"""Importer for membership data from memberships.json.
Get an overview of the possible pairs: see extract_role_org_type_pairs:
Role - Organization Type Combinations:
=====================================
Assistentin Leitung Staatskanzlei - Sonstige
Frau Landammann - Sonstige
Landammann - Sonstige
Landschreiber - Sonstige
Mitglied - Kommission
Mitglied des Kantonsrates - Kantonsrat
Nationalrat - Sonstige
Parlamentarier - Sonstige
Parlamentarierin - Sonstige
Protokollführerin - Sonstige
Präsident des Kantonsrates - Kantonsrat
Präsident/-in - Kommission
Regierungsrat - Sonstige
Regierungsrätin - Sonstige
Standesweibel - Sonstige
Standesweibelin - Sonstige
Statthalter - Sonstige
Stv. Landschreiberin - Sonstige
Stv. Protokollführer - Sonstige
Stv. Protokollführerin - Sonstige
Stv. Standesweibelin - Sonstige
Ständerat - Sonstige
Total unique combinations: 22
"""
def __init__(self, session: Session) -> None:
[docs]
self.parliamentarian_map: dict[str, Parliamentarian] = {}
[docs]
self.commission_map: dict[str, Commission] = {}
[docs]
self.parliamentary_group_map: dict[str, ParliamentaryGroup] = {}
# party_map now keyed by external_kub_id for consistency
[docs]
self.party_map: dict[str, Party] = {}
[docs]
self.other_organization_map: dict[str, Any] = {}
[docs]
def init(
self,
session: Session,
parliamentarian_map: dict[str, Parliamentarian],
commission_map: dict[str, Commission],
parliamentary_group_map: dict[str, ParliamentaryGroup],
party_map: dict[str, Party],
other_organization_map: dict[str, Any],
) -> None:
"""Initialize the importer with maps of objects by their external KUB
ID."""
self.session = session
self.parliamentarian_map = parliamentarian_map
self.commission_map = commission_map
self.parliamentary_group_map = parliamentary_group_map
self.party_map = party_map
self.other_organization_map = other_organization_map
[docs]
def _extract_and_update_or_create_missing_parliamentarians(
self, memberships_data: Sequence[MembershipData]
) -> dict[str, list[Parliamentarian]]:
"""
Extracts/updates/creates parliamentarians found only in membership
data.
Returns a dictionary with lists of created and updated
parliamentarians found during this process.
If a parliamentarian is not already known (from people.json import),
check if they exist in the DB. If yes, update them with potentially
missing info (address, email) from membership data. If no, create a
new parliamentarian. Updates self.parliamentarian_map accordingly.
"""
parliamentarians_to_create: list[Parliamentarian] = []
# Use a set for updates to automatically handle duplicates
parliamentarians_to_update: set[Parliamentarian] = set()
# 1. Identify potential missing parliamentarian IDs
missing_person_ids = {
m['person']['id']
for m in memberships_data
if m.get('person')
and m['person'].get('id')
and m['person']['id'] not in self.parliamentarian_map
}
if not missing_person_ids:
log.debug(
'No missing parliamentarians found only in membership data.'
)
return {'created': [], 'updated': []}
existing_db_parliamentarians = (
self.session.query(Parliamentarian)
.filter(Parliamentarian.external_kub_id.in_(missing_person_ids))
.all()
)
existing_db_map = {
p.external_kub_id: p for p in existing_db_parliamentarians
}
log.debug(
f'Found {len(existing_db_map)} existing parliamentarians in DB '
f'matching missing IDs.'
)
for membership in memberships_data:
person_data = membership.get('person')
person_id = None # Initialize person_id
try:
if not person_data:
continue
person_id = person_data.get('id')
if not person_id or person_id in self.parliamentarian_map:
continue
try:
person_uuid = UUID(person_id)
existing_parliamentarian = existing_db_map.get(person_uuid)
except ValueError:
log.warning(
f'Invalid UUID format for person ID: {person_id}'
)
existing_parliamentarian = None
if existing_parliamentarian:
# Update existing DB parliamentarian if needed
updated = self._update_parliamentarian_from_membership(
existing_parliamentarian, person_data, membership
)
if updated:
# Add to set; duplicates are automatically handled
parliamentarians_to_update.add(
existing_parliamentarian
)
log.info(
f'Updated existing parliamentarian '
f'(ID: {person_id}) found via membership data.'
)
# Add to map whether updated or not
self.parliamentarian_map[person_id] = (
existing_parliamentarian
)
else:
# Create new parliamentarian
full_name = person_data.get('fullName', 'N/A')
log.info(
f'Creating new parliamentarian (ID: {person_id}) '
f'found only in membership data: {full_name}'
)
new_parliamentarian = (
self._create_parliamentarian_from_membership(
person_data, membership
)
)
if new_parliamentarian:
parliamentarians_to_create.append(new_parliamentarian)
self.parliamentarian_map[person_id] = (
new_parliamentarian
)
except Exception as e:
# Use the initialized person_id here
log_person_id = person_id if person_id else 'unknown'
log.error(
f'Error processing potential missing parliamentarian from '
f'membership (Person ID: {log_person_id}): {e}',
exc_info=True,
)
# Save new parliamentarians
if parliamentarians_to_create:
self._bulk_save(
parliamentarians_to_create,
'new parliamentarians from memberships',
)
# Flush is removed, commit at the end of import_zug_kub_data
# IDs will be assigned after the final commit.
try:
# We still need to add the newly created objects to the session
# if _bulk_save didn't run (list was empty)
if parliamentarians_to_create:
self.session.add_all(parliamentarians_to_create)
created_count = len(parliamentarians_to_create)
updated_count = len(parliamentarians_to_update)
log.info(
f'Created {created_count} and updated {updated_count} '
f'parliamentarians based *only* on membership data.'
)
# Update map with newly created objects (now with IDs)
for p in parliamentarians_to_create:
if p.external_kub_id: # Should have ID after flush
# Map using external ID before commit assigns internal ID
self.parliamentarian_map[str(p.external_kub_id)] = p
# Filter lists based on object existence, not internal ID yet
created_list = list(parliamentarians_to_create)
# Convert set to list for the return value
updated_list = list(parliamentarians_to_update)
except Exception:
log.error(
'Error flushing parliamentarians from membership data',
exc_info=True,
)
self.session.rollback()
# Return empty lists on error
created_list = []
updated_list = []
details = {'created': created_list, 'updated': updated_list}
return details
[docs]
def _update_parliamentarian_from_membership(
self,
parliamentarian: Parliamentarian,
person_data: PersonData,
membership_data: MembershipData,
) -> bool:
"""
Updates an existing parliamentarian with info from membership data,
focusing on potentially missing fields like email and address.
Returns True if any changes were made, False otherwise.
"""
changed = False
# Update basic info if missing
if not parliamentarian.first_name and person_data.get('firstName'):
parliamentarian.first_name = person_data['firstName']
changed = True
if not parliamentarian.last_name and person_data.get('officialName'):
parliamentarian.last_name = person_data['officialName']
changed = True
if not parliamentarian.salutation and person_data.get('salutation'):
parliamentarian.salutation = person_data['salutation']
changed = True
if not parliamentarian.academic_title and person_data.get('title'):
parliamentarian.academic_title = person_data['title']
changed = True
# Update email if missing or different
primary_email_data = person_data.get('primaryEmail')
new_email = (
primary_email_data.get('email') if primary_email_data else None
)
if new_email and parliamentarian.email_primary != new_email:
parliamentarian.email_primary = new_email
changed = True
# Update address if missing or different
address_data = membership_data.get('primaryAddress')
if address_data:
street = address_data.get('street', '')
house_num = address_data.get('houseNumber', '')
new_address = f'{street} {house_num}'.strip()
new_zip = address_data.get('swissZipCode')
new_city = address_data.get('town')
if new_address and parliamentarian.shipping_address != new_address:
parliamentarian.shipping_address = new_address
changed = True
if (
new_zip
and parliamentarian.shipping_address_zip_code != new_zip
):
parliamentarian.shipping_address_zip_code = new_zip
changed = True
if new_city and parliamentarian.shipping_address_city != new_city:
parliamentarian.shipping_address_city = new_city
changed = True
return changed
[docs]
def _create_parliamentarian_from_membership(
self, person_data: PersonData, membership_data: MembershipData
) -> Parliamentarian | None:
"""
Creates a new Parliamentarian object using data primarily from a
membership record.
"""
person_id = person_data.get('id')
if not person_id:
log.error(
'Cannot create parliamentarian from membership: '
'Missing person ID.'
)
return None
try:
uuid_value = UUID(person_id)
parliamentarian = Parliamentarian(
external_kub_id=uuid_value,
first_name=person_data.get('firstName', ''),
last_name=person_data.get('officialName', ''),
salutation=person_data.get('salutation'),
academic_title=person_data.get('title'),
)
# Handle email
primary_email_data = person_data.get('primaryEmail')
if primary_email_data and primary_email_data.get('email'):
parliamentarian.email_primary = primary_email_data['email']
# Handle address
address_data = membership_data.get('primaryAddress')
if address_data:
street = address_data.get('street', '')
house_num = address_data.get('houseNumber', '')
parliamentarian.shipping_address = (
f'{street} {house_num}'.strip()
)
parliamentarian.shipping_address_zip_code = address_data.get(
'swissZipCode'
)
parliamentarian.shipping_address_city = address_data.get(
'town'
)
return parliamentarian
except Exception as e:
log.error(
f'Error creating parliamentarian from membership data '
f'(Person ID: {person_id}): {e}',
exc_info=True,
)
return None
[docs]
def bulk_import(
self, memberships_data: Sequence[MembershipData]
) -> tuple[
dict[str, ImportCategoryResult | dict[str, list[Any]]], dict[str, int]
]:
"""
Imports memberships from JSON data based on organization type.
Returns:
- A dictionary containing lists of created/updated objects and
processed counts per category.
- A dictionary containing the count of processed items per type.
"""
commission_memberships_to_create: list[CommissionMembership] = []
commission_memberships_to_update: list[CommissionMembership] = []
parliamentarian_roles_to_create: list[ParliamentarianRole] = []
parliamentarian_roles_to_update: list[ParliamentarianRole] = []
# Details structure to return - Use the new TypedDict where applicable
details: dict[str, ImportCategoryResult | dict[str, Any]] = {
'parliamentarians_from_memberships': {
'created': [],
'updated': [],
},
'commission_memberships': {
'created': [],
'updated': [],
'processed': 0,
},
'parliamentarian_roles': {
'created': [],
'updated': [],
'processed': 0,
},
}
processed_counts = {
'commission_memberships': 0,
'parliamentarian_roles': 0, # Covers Fraktion, Kantonsrat,
# Sonstige
'skipped': 0, # Count memberships we couldn't process
}
# Process parliamentarians found only in memberships first
parl_details = (
self._extract_and_update_or_create_missing_parliamentarians(
memberships_data
)
)
details['parliamentarians_from_memberships']['created'] = parl_details[
'created'
]
details['parliamentarians_from_memberships']['updated'] = parl_details[
'updated'
]
# --- Pre-fetch existing memberships and roles ---
parliamentarian_ids = [
p.id for p in self.parliamentarian_map.values() if p.id
]
commission_ids = [c.id for c in self.commission_map.values() if c.id]
[p.id for p in self.party_map.values() if p.id]
[g.id for g in self.parliamentary_group_map.values() if g.id]
existing_commission_memberships_map: dict[
tuple[UUID | None, UUID | None], CommissionMembership
] = {}
if parliamentarian_ids and commission_ids:
existing_cms = (
self.session.query(CommissionMembership)
.filter(
CommissionMembership.parliamentarian_id.in_(
parliamentarian_ids
),
CommissionMembership.commission_id.in_(commission_ids),
)
.all()
)
existing_commission_memberships_map = {
(cm.parliamentarian_id, cm.commission_id): cm
for cm in existing_cms
}
log.debug(
f'Pre-fetched {len(existing_commission_memberships_map)} '
f'existing commission memberships.'
)
# Define the precise structure for the role key tuple
role_key_type = tuple[UUID, UUID | None, UUID | None, str, str | None]
existing_roles_map: dict[role_key_type, ParliamentarianRole] = {}
if parliamentarian_ids:
# Fetch all roles for the relevant parliamentarians
# We'll filter/map them client-side
existing_roles = (
self.session.query(ParliamentarianRole)
.filter(
ParliamentarianRole.parliamentarian_id.in_(
parliamentarian_ids
)
)
.options(
# Eager load related objects to prevent N+1 queries
# during updates.
selectinload(ParliamentarianRole.party),
selectinload(ParliamentarianRole.parliamentary_group),
)
.all()
)
for role_obj in existing_roles:
# Create a unique key based on the role type and relevant IDs
# Use the defined role_key_type hint for clarity
if role_obj.party_id or role_obj.parliamentary_group_id:
# Fraktion/Party Role (assuming role='member')
current_role_key = (
role_obj.parliamentarian_id,
role_obj.party_id,
role_obj.parliamentary_group_id,
'member', # Explicitly add role type for uniqueness
None, # Placeholder for additional_information
)
elif role_obj.additional_information:
# Sonstige Role (assuming role='member')
current_role_key = (
role_obj.parliamentarian_id,
None, # party_id
None, # group_id
'member', # Explicitly add role type
role_obj.additional_information,
)
else:
# Kantonsrat Role (or potentially others without
# party/group/add.info)
current_role_key = (
role_obj.parliamentarian_id,
None, # party_id
None, # group_id
role_obj.role, # Use the actual role
None, # Placeholder for additional_information
)
existing_roles_map[current_role_key] = role_obj
log.debug(
f'Pre-fetched {len(existing_roles_map)} existing '
f'parliamentarian roles.'
)
# --- End Pre-fetching ---
for membership in memberships_data:
person_id = None
processed_membership_type = False # Flag to track if counted
try:
# Safely access nested keys
person_data = membership.get('person')
if not person_data:
log.warning(
'Skipping membership: Missing person data.'
)
continue
person_id = person_data.get('id')
if not person_id:
log.warning(
'Skipping membership: Missing person ID in data.'
)
continue
org_data = membership.get('organization')
if not org_data:
log.warning(
f'Skipping membership for person {person_id}: '
f'Missing organization data.'
)
continue
org_id = org_data.get('id')
if not org_id:
log.warning(
f'Skipping membership for person {person_id}: '
f'Missing organization ID.'
)
continue
parliamentarian = self.parliamentarian_map.get(person_id)
if not parliamentarian:
log.warning(
f'Skipping membership: Parliamentarian with external '
f'KUB ID {person_id} not found.'
)
continue
org_type_title = org_data.get('organizationTypeTitle')
role_text = membership.get('role', '')
org_name = org_data.get('name', '')
if org_type_title == 'Kommission':
processed_counts['commission_memberships'] += 1
processed_membership_type = True
commission = self.commission_map.get(org_id)
if not commission:
log.warning(
f'Skipping commission membership: Commission with '
f'external KUB ID {org_id} not found.'
)
continue
membership_key = (parliamentarian.id, commission.id)
existing_membership = (
existing_commission_memberships_map.get(membership_key)
)
if existing_membership:
# Update existing membership (changes tracked by
# session)
updated = self._update_commission_membership(
existing_membership, membership
)
if updated:
commission_memberships_to_update.append(
existing_membership
)
log.debug(
f'Updating commission membership for '
f'{parliamentarian.id} in {commission.id}'
)
# No need to append, session flush handles updates.
else:
# Create new membership
membership_obj = self._create_commission_membership(
parliamentarian, commission, membership
)
if membership_obj:
commission_memberships_to_create.append(
membership_obj
)
# Add the new membership to the map to prevent
# duplicates within the same import run if data is
# redundant
if parliamentarian.id and commission.id:
new_key = (parliamentarian.id, commission.id)
existing_commission_memberships_map[
new_key
] = membership_obj
log.debug(
f'Creating commission membership for '
f'{parliamentarian.id} in {commission.id}'
)
elif org_type_title == 'Fraktion':
processed_counts['parliamentarian_roles'] += 1
processed_membership_type = True
party = self.party_map.get(org_id)
if not party:
log.warning(
f'Skipping Fraktion role: Party with external KUB '
f'ID {org_id} not found (created from Fraktion).'
)
continue
group = self.parliamentary_group_map.get(org_id)
# Use role_key_type hint for the variable
current_role_key = (
parliamentarian.id,
party.id if party else None,
group.id if group else None,
'member', # Role type
None, # additional_information
)
existing_role = existing_roles_map.get(current_role_key)
start_val = membership.get('start')
start_date_str = (
str(start_val)
if start_val and not isinstance(start_val, bool)
else None
)
end_val = membership.get('end')
end_date_str = (
str(end_val)
if end_val and not isinstance(end_val, bool)
else None
)
if existing_role:
# Update existing role
updated = self._update_parliamentarian_role(
existing_role,
role='member',
parliamentary_group=group,
parliamentary_group_role=self._map_to_parliamentary_group_role(
role_text
),
party=party,
party_role=('member' if party else 'none'),
start_date=start_date_str,
end_date=end_date_str,
)
if updated:
parliamentarian_roles_to_update.append(
existing_role
)
log.debug(
f'Updating Fraktion/Party role for '
f'{parliamentarian.id}'
)
# No need to append, session flush handles updates.
else:
# Create new role
role_obj = self._create_parliamentarian_role(
parliamentarian=parliamentarian,
role='member',
parliamentary_group=group,
parliamentary_group_role=self._map_to_parliamentary_group_role(
role_text
),
party=party,
party_role=('member' if party else 'none'),
start_date=start_date_str,
end_date=end_date_str,
)
if role_obj:
parliamentarian_roles_to_create.append(role_obj)
# Add the new role to the map
# Ensure the key matches RoleKey type
if parliamentarian.id:
current_role_key_after_create: (
role_key_type
) = (
parliamentarian.id,
role_obj.party_id,
role_obj.parliamentary_group_id,
role_obj.role,
role_obj.additional_information,
)
existing_roles_map[
current_role_key_after_create
] = role_obj
log.debug(
f'Creating Fraktion/Party role for '
f'{parliamentarian.id}'
)
elif org_type_title == 'Kantonsrat':
processed_counts['parliamentarian_roles'] += 1
processed_membership_type = True
role = self._map_to_parliamentarian_role(role_text)
start_val = membership.get('start')
start_date_str = (
str(start_val)
if start_val and not isinstance(start_val, bool)
else None
)
end_val = membership.get('end')
end_date_str = (
str(end_val)
if end_val and not isinstance(end_val, bool)
else None
)
current_role_key = (
parliamentarian.id,
None, # party_id
None, # group_id
role, # Actual role from _map_to_parliamentarian_role
None, # additional_information
)
existing_role = existing_roles_map.get(current_role_key)
if existing_role:
updated = self._update_parliamentarian_role(
existing_role,
role=role,
start_date=start_date_str,
end_date=end_date_str,
)
if updated:
parliamentarian_roles_to_update.append(
existing_role
)
log.debug(
f'Updating Kantonsrat role ({role}) for '
f'{parliamentarian.id}'
)
# No need to append, session flush handles updates.
else:
role_obj = self._create_parliamentarian_role(
parliamentarian=parliamentarian,
role=role,
start_date=start_date_str,
end_date=end_date_str,
)
if role_obj:
parliamentarian_roles_to_create.append(role_obj)
# Add the new role to the map
# Ensure the key matches RoleKey type
if parliamentarian.id:
current_role_key_after_create = (
parliamentarian.id,
role_obj.party_id,
role_obj.parliamentary_group_id,
role_obj.role,
role_obj.additional_information,
)
existing_roles_map[
current_role_key_after_create
] = role_obj
log.debug(
f'Creating Kantonsrat role ({role}) for '
f'{parliamentarian.id}'
)
elif org_type_title == 'Sonstige':
processed_counts['parliamentarian_roles'] += 1
processed_membership_type = True
additional_info = f'{role_text} - {org_name}'
start_val = membership.get('start')
start_date_str = (
str(start_val)
if start_val and not isinstance(start_val, bool)
else None
)
end_val = membership.get('end')
end_date_str = (
str(end_val)
if end_val and not isinstance(end_val, bool)
else None
)
current_role_key = ( # type: ignore[assignment]
parliamentarian.id,
None, # party_id
None, # group_id
'member', # Role type
additional_info, # additional_information
)
existing_role = existing_roles_map.get(current_role_key)
if existing_role:
updated = self._update_parliamentarian_role(
existing_role,
role='member',
additional_information=additional_info,
start_date=start_date_str,
end_date=end_date_str,
)
if updated:
parliamentarian_roles_to_update.append(
existing_role
)
log.debug(
f'Updating Sonstige role for '
f'{parliamentarian.id}: {additional_info}'
)
# No need to append, session flush handles updates.
else:
role_obj = self._create_parliamentarian_role(
parliamentarian=parliamentarian,
role='member',
additional_information=additional_info,
start_date=start_date_str,
end_date=end_date_str,
)
if role_obj:
parliamentarian_roles_to_create.append(role_obj)
# Add the new role to the map
# Ensure the key matches RoleKey type
if parliamentarian.id:
current_role_key_after_create = (
parliamentarian.id,
role_obj.party_id,
role_obj.parliamentary_group_id,
role_obj.role,
role_obj.additional_information,
)
existing_roles_map[
current_role_key_after_create
] = role_obj
log.debug(
f'Creating Sonstige role for '
f'{parliamentarian.id}: {additional_info}'
)
else:
# Count skipped/unknown types
if not processed_membership_type:
processed_counts['skipped'] += 1
log.warning(
f'Skipping membership: Unknown organization type '
f"'{org_type_title}' "
f'for organization {org_name}'
)
except Exception as e:
# Use initialized person_id for logging
log_person_id = person_id if person_id else 'unknown'
log.error(
f'Error creating membership for '
f'person_id {log_person_id}: {e}',
exc_info=True,
)
# Count errors as skipped if not already counted
if not processed_membership_type:
processed_counts['skipped'] += 1
# Save newly created memberships and roles
if commission_memberships_to_create:
self._bulk_save(
commission_memberships_to_create, 'new commission memberships'
)
if parliamentarian_roles_to_create:
self._bulk_save(
parliamentarian_roles_to_create, 'new parliamentarian roles'
)
# Flush is removed, commit at the end of import_zug_kub_data
# IDs will be assigned after the final commit.
try:
# Populate details based on object existence, not internal ID yet
details['commission_memberships']['created'] = list(
commission_memberships_to_create
)
details['commission_memberships']['updated'] = list(
commission_memberships_to_update
)
details['parliamentarian_roles']['created'] = list(
parliamentarian_roles_to_create
)
details['parliamentarian_roles']['updated'] = list(
parliamentarian_roles_to_update
)
# Update processed counts in the details dictionary
details['commission_memberships']['processed'] = processed_counts[
'commission_memberships'
]
details['parliamentarian_roles']['processed'] = processed_counts[
'parliamentarian_roles'
]
except Exception:
log.error('Error flushing memberships/roles', exc_info=True)
self.session.rollback()
# Clear lists on error, but keep processed counts
details['commission_memberships'] = {
'created': [],
'updated': [],
'processed': processed_counts['commission_memberships'],
}
details['parliamentarian_roles'] = {
'created': [],
'updated': [],
'processed': processed_counts['parliamentarian_roles'],
}
log.info(
f'Membership/Role import details: '
f'CMs(C:{len(details["commission_memberships"]["created"])}, '
f'U:{len(details["commission_memberships"]["updated"])}, '
f'P:{details["commission_memberships"]["processed"]}), '
f'Roles(C:{len(details["parliamentarian_roles"]["created"])}, '
f'U:{len(details["parliamentarian_roles"]["updated"])}, '
f'P:{details["parliamentarian_roles"]["processed"]}), '
f'Skipped: {processed_counts["skipped"]}'
)
return details, processed_counts
[docs]
def _create_commission_membership(
self,
parliamentarian: Parliamentarian,
commission: Commission,
membership_data: MembershipData,
) -> CommissionMembership | None:
"""Create a CommissionMembership object."""
try:
if not parliamentarian.id or not commission.id:
log.warning(
f'Missing ID: Parliamentarian={parliamentarian.id}, '
f'Commission={commission.id}'
)
return None
role_text = membership_data.get('role', '')
role = self._map_to_commission_role(role_text)
start = membership_data.get('start')
end = membership_data.get('end')
start_date = self.parse_date(
str(start) if start and not isinstance(start, bool) else None
)
end_date = self.parse_date(
str(end) if end and not isinstance(end, bool) else None
)
assert parliamentarian.id is not None
assert commission.id is not None
return CommissionMembership(
parliamentarian=parliamentarian,
parliamentarian_id=parliamentarian.id,
commission=commission,
commission_id=commission.id,
role=role,
start=start_date,
end=end_date,
)
except Exception as e:
log.error(
f'Error creating commission membership: {e}', exc_info=True
)
return None
[docs]
def _update_commission_membership(
self, membership: CommissionMembership, membership_data: MembershipData
) -> bool:
"""
Updates an existing CommissionMembership object.
Returns True if changed.
"""
changed = False
new_role = self._map_to_commission_role(
membership_data.get('role', '')
)
start_val = membership_data.get('start')
new_start = self.parse_date(
str(start_val)
if start_val and not isinstance(start_val, bool)
else None
)
end_val = membership_data.get('end')
new_end = self.parse_date(
str(end_val) if end_val and not isinstance(end_val, bool) else None
)
if membership.role != new_role:
membership.role = new_role
changed = True
if membership.start != new_start:
membership.start = new_start
changed = True
if membership.end != new_end:
membership.end = new_end
changed = True
return changed
[docs]
def _create_parliamentarian_role(
self,
parliamentarian: Parliamentarian,
role: Role,
parliamentary_group: ParliamentaryGroup | None = None,
parliamentary_group_role: ParliamentaryGroupRole | None = None,
party: Party | None = None,
party_role: PartyRole | None = None,
additional_information: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> ParliamentarianRole | None:
try:
# Ensure parliamentarian has an ID
if not parliamentarian.id:
log.warning(
'Skipping parliamentarian role: Parliamentarian missing '
f'ID: {parliamentarian.first_name} '
f'{parliamentarian.last_name}'
)
return None
assert parliamentarian.id is not None
return ParliamentarianRole(
parliamentarian=parliamentarian,
parliamentarian_id=parliamentarian.id,
role=role,
parliamentary_group=parliamentary_group,
parliamentary_group_role=parliamentary_group_role or 'none',
party=party,
party_role=party_role or 'none',
additional_information=additional_information,
start=self.parse_date(start_date),
end=self.parse_date(end_date),
)
except Exception as e:
log.error(
f'Error creating parliamentarian role: {e}', exc_info=True
)
return None
[docs]
def _update_parliamentarian_role(
self,
role_obj: ParliamentarianRole,
role: Role,
parliamentary_group: ParliamentaryGroup | None = None,
parliamentary_group_role: ParliamentaryGroupRole | None = None,
party: Party | None = None,
party_role: PartyRole | None = None,
additional_information: str | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> bool:
"""
Updates an existing ParliamentarianRole object.
Returns True if changed.
"""
changed = False
new_start = self.parse_date(start_date)
new_end = self.parse_date(end_date)
# Update fields only if they are provided for the specific role type
if role_obj.role != role:
role_obj.role = role
changed = True
# Update related objects if provided
if (
parliamentary_group is not None
and role_obj.parliamentary_group != parliamentary_group
):
role_obj.parliamentary_group = parliamentary_group
changed = True
new_group_role = parliamentary_group_role or 'none'
if role_obj.parliamentary_group_role != new_group_role:
role_obj.parliamentary_group_role = new_group_role
changed = True
if party is not None and role_obj.party != party:
role_obj.party = party
changed = True
new_party_role = party_role or 'none'
if role_obj.party_role != new_party_role:
role_obj.party_role = new_party_role
changed = True
if (
additional_information is not None
and role_obj.additional_information != additional_information
):
role_obj.additional_information = additional_information
changed = True
if role_obj.start != new_start:
role_obj.start = new_start
changed = True
if role_obj.end != new_end:
role_obj.end = new_end
changed = True
return changed
[docs]
def _map_to_commission_role(
self, role_text: str
) -> Literal['president', 'extended_member', 'guest', 'member']:
"""Map a role text to a CommissionMembership role enum value."""
role_text = role_text.lower().strip()
if 'präsident' in role_text:
return 'president'
if 'erweitert' in role_text: # Check specific terms first
return 'extended_member'
if 'gast' in role_text:
return 'guest'
# Default to 'member' if none of the above match
return 'member'
[docs]
def _map_to_parliamentarian_role(self, role_text: str) -> Role:
"""Map a role text to a parliamentarian role enum value."""
role_text = role_text.lower().strip()
# Order matters: check more specific roles first
if 'präsident' in role_text:
return 'president'
if any(
term in role_text for term in ['stimmenzähler', 'vote counter']
):
return 'vote_counter'
# 'vizepräsident' maps to 'member' as per model/data
if any(
term in role_text
for term in [
'vizepräsident',
'vize-präsident',
'vize präsident',
]
):
return 'member'
# Default to 'member' if none of the specific roles match
return 'member'
[docs]
def _map_to_parliamentary_group_role(
self, role_text: str
) -> ParliamentaryGroupRole | None:
"""Map a role text to a ParliamentaryGroupRole enum value."""
role_text = role_text.lower().strip()
# Order matters: check more specific roles first
if 'präsident' in role_text:
return 'president'
if any(
term in role_text for term in ['stimmenzähler', 'vote counter']
):
return 'vote_counter'
# 'vizepräsident' maps to 'member' as per model/data
if any(
term in role_text
for term in [
'vizepräsident',
'vize-präsident',
'vize präsident',
]
):
return 'member'
# Check for 'mitglied' explicitly if needed, else covered by default
if 'mitglied' in role_text:
return 'member'
# Default to 'none' if no specific role is identified
return 'none'
@classmethod
[docs]
def count_unique_fullnames(
people_data: list[Any],
organizations_data: list[Any],
memberships_data: list[Any],
) -> set[str]:
"""Counts unique 'fullName' entries across different data structures."""
all_names: set[str] = set()
def extract_full_names(data: Sequence[Any]) -> None:
"""Recursively extracts 'fullName' from nested dicts/lists."""
for item in data:
if isinstance(item, dict):
if 'fullName' in item and isinstance(item['fullName'], str):
all_names.add(item['fullName'])
# Recurse into values that are lists or dicts
for value in item.values():
if isinstance(value, (dict, list)):
# Pass as sequence if list, wrap dict in list
extract_full_names(
[value] if isinstance(value, dict) else value
)
elif isinstance(item, list):
# If the top-level item is a list, recurse into it
extract_full_names(item)
# Process each top-level data list
extract_full_names(people_data)
extract_full_names(organizations_data)
extract_full_names(memberships_data)
return all_names
[docs]
def import_zug_kub_data(
session: Session,
people_data: Sequence[PersonData],
organization_data: Sequence[OrganizationData],
membership_data: Sequence[MembershipData],
user_id: UUID | None = None,
) -> dict[str, ImportCategoryResult]:
"""
Imports data from KUB JSON files within a single transaction,
logs the outcome, and returns details of changes including processed
counts.
Returns a dictionary where keys are categories (e.g., 'parliamentarians')
and values are dictionaries containing 'created' (list), 'updated' (list),
and 'processed' (int).
Rolls back changes within this import if an internal error occurs.
Logs the attempt regardless of success or failure.
"""
import_details: dict[str, ImportCategoryResult] = {}
log_status = 'failed'
log_details: dict[str, Any] = {}
final_error: Exception | None = None
try:
# Use a savepoint; rollback occurs automatically on exception
with session.begin_nested():
people_importer = PeopleImporter(session)
(parliamentarian_map, people_details, people_processed) = (
people_importer.bulk_import(people_data)
)
import_details['parliamentarians'] = {
'created': people_details['created'],
'updated': people_details['updated'],
'processed': people_processed,
}
organization_importer = OrganizationImporter(session)
(
commission_map,
parliamentary_group_map,
party_map,
other_organization_map,
org_details,
org_processed_counts,
) = organization_importer.bulk_import(organization_data)
for category, details_list_dict in org_details.items():
processed_count = org_processed_counts.get(category, 0)
# Only add categories that have actual ORM objects
# (Commissions, Parties)
if category in ('commissions', 'parties'):
import_details[category] = {
'created': details_list_dict.get('created', []),
'updated': details_list_dict.get('updated', []),
'processed': processed_count,
}
# Add 'other' processed count to log_details separately
log_details['other_organizations_processed'] = (
org_processed_counts.get('other', 0)
)
log_details['parliamentary_groups_processed'] = (
org_processed_counts.get('parliamentary_groups', 0)
)
membership_importer = MembershipImporter(session)
membership_importer.init(
session,
parliamentarian_map,
commission_map,
parliamentary_group_map,
party_map,
other_organization_map,
)
(membership_details, membership_processed_counts) = (
membership_importer.bulk_import(membership_data)
)
for category, details_dict in membership_details.items():
if category == 'parliamentarians_from_memberships':
if isinstance(
details_dict, dict
):
import_details['parliamentarians']['created'].extend(
details_dict.get('created', [])
)
import_details['parliamentarians']['updated'].extend(
details_dict.get('updated', [])
)
elif category in (
'commission_memberships',
'parliamentarian_roles',
):
import_details[category] = cast(
'ImportCategoryResult', details_dict
)
log_details['skipped_memberships'] = (
membership_processed_counts.get('skipped', 0)
)
log_details['summary'] = {}
for k, v in import_details.items():
# v is now guaranteed to be ImportCategoryResult
log_details['summary'][k] = {
'created_count': len(v['created']),
'updated_count': len(v['updated']),
'processed_count': v['processed'],
}
log_details['summary']['other_organizations_processed'] = (
log_details['other_organizations_processed']
)
log_details['summary']['parliamentary_groups_processed'] = (
log_details['parliamentary_groups_processed']
)
log_details['summary']['skipped_memberships'] = log_details[
'skipped_memberships'
]
log_status = 'completed'
log.info(
'KUB data import processing successful within transaction.'
)
except Exception as e:
final_error = e
log_status = 'failed'
log_details['error'] = str(e)
log.error(f'KUB data import failed: {e}', exc_info=True)
finally:
try:
import_log = ImportLog(
user_id=user_id,
details=log_details,
status=log_status,
# source_info is not defined, remove it
)
session.add(import_log)
if session.is_active:
session.flush()
log.info(
f'KUB data import attempt logged with status: {log_status}'
)
except Exception as log_e:
log.error(
f'Failed to log import status: {log_e}', exc_info=True
)
if final_error:
raise RuntimeError('KUB data import failed.') from final_error
return import_details