from __future__ import annotations
import json
import logging
from onegov.core.elements import Link
from onegov.core.security import Private
from onegov.core.utils import dictionary_to_binary
from onegov.org.models import Organisation
from onegov.pas import _, PasApp
from onegov.pas.forms.data_import import DataImportForm
from onegov.pas.importer.json_import import (
import_zug_kub_data,
Commission,
CommissionMembership,
Parliamentarian,
ParliamentarianRole,
Party,
)
from onegov.pas.layouts import ImportLayout
from typing import Any, TYPE_CHECKING, TypedDict
if TYPE_CHECKING:
from onegov.core.types import LaxFileDict, RenderData
from collections.abc import Sequence
from onegov.town6.request import TownRequest
from webob import Response
[docs]
class ImportCategoryResult(TypedDict):
# Define a type alias for the complex import details structure
# Replace with the actual type returned by the importer
ImportResultsDict = dict[str, ImportCategoryResult]
[docs]
log = logging.getLogger('onegov.pas.data_import')
[docs]
def load_and_concatenate_json(
sources: Sequence[LaxFileDict]
) -> list[Any]:
"""
Loads and concatenates the 'results' list from multiple JSON files.
"""
all_results: list[Any] = []
for file_info in sources:
filename = file_info.get('filename', 'unknown file')
# Ensure file_info has the expected structure for dictionary_to_binary
if not all(k in file_info for k in (
'data', 'filename', 'mimetype', 'size')
):
log.warning(
f'Skipping invalid file data structure for {filename}. '
f'Expected keys: data, filename, mimetype, size.'
)
continue
try:
content_bytes = dictionary_to_binary(file_info)
content_str = content_bytes.decode('utf-8')
data = json.loads(content_str)
results_list = data.get('results')
if isinstance(results_list, list):
all_results.extend(results_list)
else:
log.warning(
f'Skipping file {filename}: "results" key not found '
f'or is not a list in the JSON data.'
)
except json.JSONDecodeError as e:
log.error(
f'Error decoding JSON from file {filename}.', exc_info=True
)
raise RuntimeError(
f'Error decoding JSON from file {filename}.'
) from e
except UnicodeDecodeError as e:
log.error(
f'Error decoding file {filename} as UTF-8.', exc_info=True
)
raise RuntimeError(
f'Error decoding file {filename} as UTF-8.'
) from e
except Exception as e:
log.error(
f'Unexpected error processing file {filename}: {e}',
exc_info=True
)
raise RuntimeError(
f'Unexpected error processing file {filename}'
) from e
return all_results
@PasApp.form(
model=Organisation,
template='data_import.pt',
name='pas-import',
form=DataImportForm,
permission=Private,
)
[docs]
def handle_data_import(
self: Organisation, request: TownRequest, form: DataImportForm
) -> RenderData | Response:
layout = ImportLayout(self, request)
processed_import_details: dict[str, dict[str, Any]] = {} # For template
error_message: str | None = None
total_processed = 0
total_created = 0
total_updated = 0
# Helper function to get display title for various imported objects
def get_item_display_title(item: Any) -> str:
if isinstance(item, Parliamentarian):
return item.title # Already includes first/last name etc.
elif isinstance(item, (Commission, Party)):
return item.name
elif isinstance(item, CommissionMembership):
# Ensure related objects are loaded or handle potential errors
parl_title = (item.parliamentarian.title
if item.parliamentarian else 'Unknown Parl.')
comm_name = (item.commission.name
if item.commission else 'Unknown Comm.')
return f'{parl_title} in {comm_name} ({item.role})'
elif isinstance(item, ParliamentarianRole):
parl_title = (item.parliamentarian.title
if item.parliamentarian else 'Unknown Parl.')
role_details: list[str] = [str(item.role)]
if item.party:
role_details.append(f'Party: {item.party.name}')
if item.parliamentary_group:
role_details.append(f'Group: {item.parliamentary_group.name}')
if item.additional_information:
role_details.append(f'({item.additional_information})')
return f'{parl_title} - {" ".join(role_details)}'
elif hasattr(item, 'title') and isinstance(item.title, str):
return item.title # Fallback for unexpected types with a title
elif hasattr(item, 'name') and isinstance(item.name, str):
return item.name # Fallback for unexpected types with a name
else:
return f'Unknown Object (Type: {type(item).__name__})'
# Extract category details for template rendering
def extract_category_details(
import_details: ImportResultsDict
) -> dict[str, dict[str, Any]]:
"""
Process import details to prepare data for template rendering.
Extracts and calculates values that were previously defined in TAL.
"""
processed_details = {}
for category_name, details in import_details.items():
# details is now guaranteed to be ImportCategoryResult
created = details.get('created', [])
updated = details.get('updated', [])
processed = details.get('processed', 0)
# Ensure we're dealing with lists before calling len()
created_list = created if isinstance(created, list) else []
updated_list = updated if isinstance(updated, list) else []
created_count = len(created_list)
updated_count = len(updated_list)
category_title = category_name.replace('_', ' ').title()
processed_details[category_name] = {
'created': created,
'updated': updated,
'processed': processed,
'created_count': created_count,
'updated_count': updated_count,
'category_title': category_title,
}
return processed_details
if request.method == 'POST' and form.validate():
try:
# Load and concatenate data from uploaded files
people_data = load_and_concatenate_json(
form.people_source.data
)
organization_data = load_and_concatenate_json(
form.organizations_source.data
)
membership_data = load_and_concatenate_json(
form.memberships_source.data
)
# Get raw results from the import function
# The return type is dict[str, ImportCategoryResult]
import_results = import_zug_kub_data(
session=request.session,
people_data=people_data,
organization_data=organization_data,
membership_data=membership_data,
user_id=(request.current_user.id
if request.current_user else None)
)
# Process results for template and calculate totals
any_changes = False
if import_results: # Ensure import_results is not None
for details in import_results.values():
# details is now guaranteed to be ImportCategoryResult
created = details.get('created', [])
updated = details.get('updated', [])
processed = details.get('processed', 0) # Already int
# Ensure we're dealing with lists before calling len()
created_list = created if isinstance(created, list) else []
updated_list = updated if isinstance(updated, list) else []
created_count = len(created_list)
updated_count = len(updated_list)
total_created += created_count
total_updated += updated_count
# Ensure processed is an int
processed_int = (processed if isinstance(processed, int)
else 0)
total_processed += processed_int
if created_count > 0 or updated_count > 0:
any_changes = True
# Use the extracted function to prepare data for the template
# Pass import_results which is ImportResultsDict
processed_import_details = extract_category_details(
import_results
)
# Generate success/info message based on totals
if any_changes:
request.message(
_(
'Data import completed. Processed ${processed} items: '
'${created} created, ${updated} updated.',
mapping={
'processed': total_processed,
'created': total_created,
'updated': total_updated
}
), 'success'
)
else:
request.message(
_(
'Data import completed. Processed ${processed} items. '
'No changes were needed - data is already up to date.',
mapping={'processed': total_processed}
), 'info'
)
layout.breadcrumbs.append(Link(_('Import result'), '#'))
except Exception as e:
log.error(f'Data import failed: {e}', exc_info=True)
# Provide a more user-friendly error message
request.message(
_('Data import failed: ${error}', mapping={'error': str(e)}),
'warning'
)
# Display the exception message
cause = e.__cause__ or e.__context__
error_msg = f'Error during import: {e}'
if cause:
error_msg += f'\nCaused by: {cause}'
error_message = error_msg
return {
'title': _('Import'),
'layout': layout,
'form': form,
'import_details': processed_import_details,
'error_message': error_message,
'get_item_display_title': get_item_display_title,
'errors': form.errors
}