from __future__ import annotations
import json
import logging
import queue
import requests
import threading
import urllib3
import uuid
from dataclasses import dataclass
from urllib.parse import urlparse, urlunparse
from onegov.core.utils import binary_to_dictionary
from onegov.pas.models.import_log import ImportLog
from onegov.pas.views.data_import import load_and_concatenate_json
from onegov.pas.importer.zug_kub_importer import import_zug_kub_data
from onegov.pas.importer.types import OutputLogHandler
from sqlalchemy.orm.attributes import flag_modified
from typing import TYPE_CHECKING, Any, Self
if TYPE_CHECKING:
from types import TracebackType
from onegov.core.types import LaxFileDict
from onegov.pas.app import PasApp
from onegov.pas.request import PasRequest
from onegov.pas.log import OutputHandler
[docs]
log = logging.getLogger('onegov.pas.orchestrator')
# Disable SSL warnings for self-signed certificates
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@dataclass
[docs]
class UpdateResult:
[docs]
parliamentarian_id: str
[docs]
custom_values: dict[str, str]
[docs]
addresses: list[dict[str, Any]]
[docs]
error: str | None = None
[docs]
def _fetch_custom_data_worker(
parliamentarian_queue: queue.Queue[Any],
result_queue: queue.Queue[UpdateResult],
session: requests.Session,
base_url: str
) -> None:
"""Worker thread that fetches API data for parliamentarians."""
while True:
try:
parliamentarian = parliamentarian_queue.get_nowait()
except queue.Empty:
break
try:
person_id = parliamentarian.external_kub_id
url = f'{base_url}/people/{person_id}'
response = session.get(url, timeout=30)
response.raise_for_status()
person_data = response.json()
custom_values = person_data.get('customValues', {})
addresses = person_data.get('addresses', [])
sex = person_data.get('sex')
result_queue.put(UpdateResult(
parliamentarian_id=parliamentarian.id,
title=parliamentarian.title,
custom_values=custom_values,
addresses=addresses,
sex=sex
))
except Exception as e:
result_queue.put(UpdateResult(
parliamentarian_id=parliamentarian.id,
title=parliamentarian.title,
custom_values={},
addresses=[],
sex=None,
error=str(e)
))
finally:
parliamentarian_queue.task_done()
[docs]
def create_mock_file_data(
data: list[dict[str, Any]], filename: str
) -> LaxFileDict:
json_content = {'results': data}
json_str = json.dumps(json_content, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
file_dict = binary_to_dictionary(json_bytes, filename)
return file_dict # type: ignore[return-value]
[docs]
class KubImporter:
"""
KUB API importer that manages shared session and state for both
data import and custom data updates.
"""
def __init__(
self,
token: str,
base_url: str,
output: OutputHandler | None = None
):
[docs]
self.base_url = base_url.rstrip('/') # Normalize
# Shared requests session for connection pooling
[docs]
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Token {token}',
'Accept': 'application/json'
})
# Disable SSL verification for self-signed certificates
self.session.verify = False
[docs]
def __enter__(self) -> Self:
return self
[docs]
def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None
) -> None:
self.session.close()
[docs]
def _check_api_accessibility(self) -> None:
"""Check if the API is accessible."""
if self.output:
self.output.info('Checking API accessibility...')
test_url = f'{self.base_url}/people'
try:
response = self.session.get(test_url, timeout=10)
if response.status_code != 200:
raise RuntimeError(
f'API check failed: {response.status_code} - '
f'{response.text}'
)
if self.output:
self.output.success('API is accessible')
except requests.exceptions.RequestException as e:
raise RuntimeError(f'API check failed: {e}') from e
[docs]
def _update_address_fields(
self,
parliamentarian: Any,
addresses: list[dict[str, Any]]
) -> list[str]:
"""
Update parliamentarian address fields based on addresses data.
Returns list of updated field names.
"""
updated_fields: list[str] = []
# Find default address (isDefault=True) or first address
default_address = None
for address in addresses:
if address.get('isDefault', False):
default_address = address
break
if not default_address and addresses:
default_address = addresses[0]
if not default_address:
return updated_fields
# Update shipping address (street + house number)
street = default_address.get('street', '')
house_number = default_address.get('houseNumber', '')
if street or house_number:
if street and house_number:
address_value = f'{street} {house_number}'
elif street:
address_value = street
else:
address_value = house_number
current_value = getattr(parliamentarian, 'shipping_address', None)
if current_value != address_value:
parliamentarian.shipping_address = address_value
updated_fields.append('shipping_address')
# Update address addition (addressLine1)
address_line1 = default_address.get('addressLine1', '')
if address_line1:
current_value = getattr(
parliamentarian, 'shipping_address_addition', None
)
if current_value != address_line1:
parliamentarian.shipping_address_addition = address_line1
updated_fields.append('shipping_address_addition')
# Update zip code
zip_code = default_address.get('swissZipCode', '')
if zip_code:
current_value = getattr(
parliamentarian, 'shipping_address_zip_code', None
)
if current_value != zip_code:
parliamentarian.shipping_address_zip_code = zip_code
updated_fields.append('shipping_address_zip_code')
# Update city
city = default_address.get('town', '')
if city:
current_value = getattr(
parliamentarian, 'shipping_address_city', None
)
if current_value != city:
parliamentarian.shipping_address_city = city
updated_fields.append('shipping_address_city')
return updated_fields
[docs]
def _update_gender_field(
self, parliamentarian: Any, sex: int
) -> bool:
gender_mapping = {
1: 'male',
2: 'female'
}
new_gender = gender_mapping.get(sex)
if new_gender is None:
return False
current_gender = getattr(parliamentarian, 'gender', None)
if current_gender != new_gender:
parliamentarian.gender = new_gender
return True
return False
[docs]
def update_custom_data(
self,
request: PasRequest,
app: PasApp,
max_workers: int = 3,
import_log_id: uuid.UUID | None = None
) -> tuple[int, int, list[dict[str, Any]]]:
"""
Multi-threaded function that updates parliamentarians with custom
field data.
Uses a queue-based approach where worker threads fetch API data
concurrently while the main thread handles all database updates to
maintain thread safety.
Returns:
Tuple of (updated_count, error_count, output_messages)
"""
from onegov.parliament.models import Parliamentarian
field_mappings = {
'personalnummer': 'personnel_number',
'vertragsnummer': 'contract_number',
'wahlkreis': 'district',
'beruf': 'occupation',
'adress_anrede': 'salutation_for_address',
'brief_anrede': 'salutation_for_letter'
}
all_parliamentarians = request.session.query(Parliamentarian).all()
parliamentarians_with_id = [
p for p in all_parliamentarians if p.external_kub_id is not None
]
parliamentarians_without_id = [
p for p in all_parliamentarians if p.external_kub_id is None
]
# Log warning if parliamentarians without external_kub_id exist
if parliamentarians_without_id:
missing_names = [p.title for p in parliamentarians_without_id]
warning_msg = (
f'Warning: {len(parliamentarians_without_id)} '
f'parliamentarians found without external_kub_id: '
f'{", ".join(missing_names)}. These will not be synchronized.'
)
if self.output:
self.output.error(warning_msg)
log.warning(
f'Found {len(parliamentarians_without_id)} parliamentarians '
f'without external_kub_id: {missing_names}. '
f'These will not be synchronized.'
)
if not parliamentarians_with_id:
if self.output:
self.output.info(
'No parliamentarians with external_kub_id found'
)
return 0, 0, []
parliamentarians = parliamentarians_with_id
if self.output:
self.output.info(
f'Found {len(parliamentarians)} parliamentarians to update '
f'with custom data using {max_workers} workers'
)
parliamentarian_queue: queue.Queue[Any] = queue.Queue()
result_queue: queue.Queue[UpdateResult] = queue.Queue()
for p in parliamentarians:
parliamentarian_queue.put(p)
# Start worker threads (API fetching only)
threads = []
for i in range(max_workers):
t = threading.Thread(
target=_fetch_custom_data_worker,
args=(
parliamentarian_queue, result_queue,
self.session, self.base_url
)
)
t.start()
threads.append(t)
# Main thread handles all DB updates (single session, thread-safe)
updated_count = 0
error_count = 0
processed = 0
# Process results as they come in from worker threads
while processed < len(parliamentarians):
try:
# Wait for next result with timeout
result = result_queue.get(timeout=120) # 2 min timeout
processed += 1
if result.error:
error_count += 1
if self.output:
self.output.error(
f'✗ Failed to fetch {result.title}: '
f'{result.error}'
)
else:
# Update parliamentarian in main thread's session
parliamentarian = request.session.query(
Parliamentarian).filter(
Parliamentarian.id == result.parliamentarian_id
).first()
if parliamentarian:
updated_fields = []
for custom_key, attr_name in field_mappings.items():
if custom_key in result.custom_values:
setattr(
parliamentarian, attr_name,
result.custom_values[custom_key]
)
updated_fields.append(attr_name)
# Process address data
if result.addresses:
address_updated = self._update_address_fields(
parliamentarian, result.addresses
)
updated_fields.extend(address_updated)
# Process gender from sex field
if result.sex is not None:
gender_updated = self._update_gender_field(
parliamentarian, result.sex
)
if gender_updated:
updated_fields.append('gender')
if updated_fields:
updated_count += 1
if self.output:
self.output.success(
f'✓ Updated {result.title}: '
f'{", ".join(updated_fields)}'
)
else:
if self.output:
self.output.info(
f'No custom data found for {result.title}'
)
result_queue.task_done()
except queue.Empty:
if self.output:
self.output.error('Timeout waiting for API results')
error_count += (len(parliamentarians) - processed)
break
# Wait for all worker threads to complete
timed_out = False
for t in threads:
t.join(timeout=30)
# Check if any threads timed out
for t in threads:
if t.is_alive():
timed_out = True
if self.output:
self.output.error('Warning: Some worker threads timed out')
break
# Update ImportLog if provided
output_messages = []
if import_log_id:
import_log = request.session.query(ImportLog).filter(
ImportLog.id == import_log_id
).first()
if import_log and hasattr(self.output, 'get_messages'):
# DatabaseOutputHandler
output_messages = self.output.get_messages() # type: ignore
# Update ImportLog with custom data results
if not import_log.details:
import_log.details = {}
import_log.details['custom_data_update'] = {
'updated': updated_count,
'errors': error_count,
'processed': len(parliamentarians)
}
import_log.details.setdefault('output_messages', []).extend(
output_messages
)
flag_modified(import_log, 'details')
# If timeout occurred, update the status to 'timeout'
if timed_out:
import_log.status = 'timeout'
request.session.flush()
return updated_count, error_count, output_messages
[docs]
def import_data(
self, request: PasRequest, app: PasApp, import_type: str = 'automatic'
) -> tuple[dict[str, Any], list[Any], list[Any], list[Any], uuid.UUID]:
"""
Performs KUB data import using the shared session.
Returns:
Tuple of (import_results, people_data, organization_data,
membership_data, import_log_id)
"""
people_data: list[Any] = []
organization_data: list[Any] = []
membership_data: list[Any] = []
import_results: dict[str, Any] = {}
import_log = ImportLog(
user_id=(
request.current_user.id if request.current_user else None
),
details={'status': 'started'},
status='in_progress',
import_type=import_type,
)
request.session.add(import_log)
request.session.flush()
import_log_id = import_log.id
try:
self._check_api_accessibility()
if self.output:
self.output.info('Fetching people data...')
people_raw = self._fetch_api_data_with_pagination('people')
if self.output:
self.output.success(
f'Fetched {len(people_raw)} people records'
)
if self.output:
self.output.info('Fetching organizations data...')
organizations_raw = self._fetch_api_data_with_pagination(
'organizations'
)
if self.output:
self.output.success(
f'Fetched {len(organizations_raw)} organization records'
)
if self.output:
self.output.info('Fetching memberships data...')
memberships_raw = self._fetch_api_data_with_pagination(
'memberships'
)
if self.output:
self.output.success(
f'Fetched {len(memberships_raw)} membership records'
)
# Process raw data through load_and_concatenate_json
# We wrap API responses to use the same method as form upload
if self.output:
self.output.info('Processing people data...')
people_data = load_and_concatenate_json(
[create_mock_file_data(people_raw, 'people.json')]
)
if self.output:
self.output.info('Processing organizations data...')
organization_data = load_and_concatenate_json(
[
create_mock_file_data(
organizations_raw, 'organizations.json'
)
]
)
if self.output:
self.output.info('Processing memberships data...')
membership_data = load_and_concatenate_json(
[create_mock_file_data(memberships_raw, 'memberships.json')]
)
# Perform the import
if self.output:
self.output.info('Starting data import...')
# Create logger that forwards to output handler
import_logger = None
if self.output:
import_logger = logging.getLogger('onegov.pas.import.temp')
import_logger.setLevel(logging.DEBUG)
# Clear any existing handlers
import_logger.handlers.clear()
# Add our custom handler
handler = OutputLogHandler(self.output)
handler.setLevel(logging.DEBUG)
import_logger.addHandler(handler)
import_logger.propagate = False
try:
import_results = import_zug_kub_data(
session=request.session,
people_data=people_data,
organization_data=organization_data,
membership_data=membership_data,
user_id=(
request.current_user.id
if request.current_user
else None
),
logger=import_logger,
create_import_log=False # Orchestrator handles ImportLog
)
finally:
# Clean up the temporary logger
if import_logger:
import_logger.handlers.clear()
# Update ImportLog with results
import_log.details.update({
'import_results': import_results,
'status': 'completed'
})
flag_modified(import_log, 'details')
import_log.status = 'completed'
# Display results
if self.output:
total_created = 0
total_updated = 0
total_processed = 0
for category, details in import_results.items():
created_count = len(details.get('created', []))
updated_count = len(details.get('updated', []))
processed_count = details.get('processed', 0)
total_created += created_count
total_updated += updated_count
total_processed += processed_count
if (created_count > 0 or updated_count > 0 or
processed_count > 0):
self.output.info(
f'{category}: {created_count} created, '
f'{updated_count} updated, '
f'{processed_count} processed'
)
self.output.success(
f'Import completed successfully! '
f'Total: {total_created} created, '
f'{total_updated} updated, '
f'{total_processed} processed'
)
except Exception as e:
import_log.details.update({
'error': str(e),
'status': 'failed'
})
flag_modified(import_log, 'details')
import_log.status = 'failed'
if self.output:
self.output.error(f'Import failed: {e}')
raise
finally:
# Always store output messages and source data for debugging
if hasattr(self.output, 'get_messages'):
output_messages = self.output.get_messages() # type: ignore
import_log.details['output_messages'] = output_messages
flag_modified(import_log, 'details')
if import_log.import_type != 'automatic':
# Store response json for finding issues quickly, but
# not the automatic ones, as this would just fill up
# disk space
import_log.people_source = people_data
import_log.organizations_source = organization_data
import_log.memberships_source = membership_data
request.session.flush()
return (
import_results,
people_data,
organization_data,
membership_data,
import_log_id,
)
[docs]
def run_full_sync(
self,
request: PasRequest,
app: PasApp,
import_type: str,
update_custom: bool = True,
max_workers: int = 3,
) -> tuple[dict[str, Any], uuid.UUID]:
"""
This is the main entry point for cronjobs and automated imports.
Complete KUB synchronization including import and custom data update.
Returns:
Tuple of (combined_results, import_log_id)
"""
# Perform main import
import_results, _people_data, _org_data, _membership_data, log_id = (
self.import_data(request, app, import_type=import_type)
)
combined_results = {
'import': import_results,
'custom_data': None
}
# Update custom data if requested
if update_custom:
if self.output:
self.output.info('Starting custom data update...')
try:
updated_count, error_count, _output_messages = (
self.update_custom_data(
request, app, max_workers, log_id
)
)
combined_results['custom_data'] = {
'updated': updated_count,
'errors': error_count
}
if self.output:
self.output.success(
f'Custom data update completed: {updated_count} '
f'updated, {error_count} errors'
)
except Exception as e:
combined_results['custom_data'] = {
'error': str(e)
}
if self.output:
self.output.error(f'Custom data update failed: {e}')
raise
else:
if self.output:
self.output.info('Skipping custom data update')
return combined_results, log_id