from __future__ import annotations
import json
from onegov.core.security import Private
from morepath import redirect
from onegov.pas import PasApp, _
from onegov.pas.collections import ImportLogCollection
from onegov.pas.cronjobs import trigger_kub_data_import
from onegov.pas.layouts.import_log import (
ImportLogCollectionLayout,
ImportLogLayout
)
from onegov.pas.models import ImportLog
from webob import Response
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from onegov.pas.request import PasRequest
from onegov.core.types import RenderData
from webob import Response
@PasApp.html(
model=ImportLogCollection,
template='import_logs.pt',
permission=Private
)
[docs]
def view_import_logs(
self: ImportLogCollection, request: PasRequest
) -> RenderData:
request.include('importlog')
layout = ImportLogCollectionLayout(self, request)
# Prepare logs with translated fields
logs = [
{
'log': log,
'translated_status': request.translate(_(log.status)),
'translated_import_type': request.translate(_(log.import_type))
}
for log in self.for_listing().all()
]
return {
'layout': layout,
'title': _('Import History'),
'logs': logs
}
@PasApp.html(
model=ImportLog,
template='import_log.pt',
permission=Private
)
[docs]
def view_import_log(
self: ImportLog, request: PasRequest
) -> RenderData:
request.include('importlog')
layout = ImportLogLayout(self, request)
details_formatted = json.dumps(
self.details, indent=2, sort_keys=True, ensure_ascii=False
)
details = self.details or {}
summary = details.get('summary', {})
output_messages = details.get('output_messages', [])
import_categories = []
import_results = details.get('import_results', {})
if import_results:
for category, stats in import_results.items():
if category != 'total_import_summary':
updated_count = len(stats.get('updated', []))
created_count = len(stats.get('created', []))
processed_count = stats.get('processed', 0)
import_categories.append({
'name': category,
'display_text': f'Updated: {updated_count}, '
f'Created: {created_count}, '
f'Processed: {processed_count}'
})
processed_logs = []
for log_entry in output_messages:
if isinstance(log_entry, dict) and 'level' in log_entry:
level = log_entry.get('level', 'info')
message = log_entry.get('message', '')
processed_logs.append({
'level': level.upper(),
'css_class': f'log-level log-{level}',
'message': message
})
# Translate status for display
translated_status = request.translate(_(self.status))
# Translate overall_status if it exists in summary
translated_overall_status = None
if 'overall_status' in summary:
translated_overall_status = request.translate(
_(summary['overall_status'])
)
return {
'layout': layout,
'title': _('Import Log Details'),
'log': self,
'translated_status': translated_status,
'translated_overall_status': translated_overall_status,
'details_formatted': details_formatted,
'summary': summary,
'import_categories': import_categories,
'processed_logs': processed_logs,
}
@PasApp.view(
model=ImportLogCollection,
name='trigger-import',
permission=Private,
request_method='POST'
)
[docs]
def trigger_import_view(
self: ImportLogCollection, request: PasRequest
) -> Response:
"""Trigger manual KUB data import."""
try:
trigger_kub_data_import(request, import_type='upload')
request.success(_(
'Import triggered successfully. The process may take up to 30 '
'seconds to complete.'
))
except ValueError as e:
request.alert(str(e))
except Exception as e:
request.alert(_('Import failed: ${error}', mapping={'error': str(e)}))
return redirect(request.link(self))
@PasApp.view(
model=ImportLog,
name='download-source',
permission=Private
)
[docs]
def download_source_data(
self: ImportLog, request: PasRequest
) -> Response:
"""Download source JSON data based on type parameter."""
source_type = request.params.get('type')
# Load only the specific deferred column we need
if source_type in ('people', 'organizations', 'memberships'):
if source_type == 'people':
result = request.session.query(ImportLog.people_source).filter(
ImportLog.id == self.id
).scalar()
self.people_source = result
elif source_type == 'organizations':
result = request.session.query(
ImportLog.organizations_source
).filter(ImportLog.id == self.id).scalar()
self.organizations_source = result
elif source_type == 'memberships':
result = request.session.query(
ImportLog.memberships_source
).filter(ImportLog.id == self.id).scalar()
self.memberships_source = result
if source_type == 'people':
if not self.people_source:
return Response(
status=404, text='No people source data available'
)
json_data = json.dumps(
self.people_source, indent=2, ensure_ascii=False
)
filename = f'import-log-{self.id}-people-source.json'
elif source_type == 'organizations':
if not self.organizations_source:
return Response(
status=404, text='No organizations source data available'
)
json_data = json.dumps(
self.organizations_source, indent=2, ensure_ascii=False
)
filename = f'import-log-{self.id}-organizations-source.json'
elif source_type == 'memberships':
if not self.memberships_source:
return Response(
status=404, text='No memberships source data available'
)
json_data = json.dumps(
self.memberships_source, indent=2, ensure_ascii=False
)
filename = f'import-log-{self.id}-memberships-source.json'
else:
return Response(status=400, text='Invalid source type')
return Response(
body=json_data.encode('utf-8'),
content_type='application/json; charset=utf-8',
content_disposition=f'attachment; filename="{filename}"'
)