pas.importer.orchestrator

Attributes

log

Classes

UpdateResult

KubImporter

KUB API importer that manages shared session and state for both

Functions

_fetch_custom_data_worker(→ None)

Worker thread that fetches API data for parliamentarians.

create_mock_file_data(→ onegov.core.types.LaxFileDict)

Module Contents

pas.importer.orchestrator.log[source]
class pas.importer.orchestrator.UpdateResult[source]
parliamentarian_id: str[source]
title: str[source]
custom_values: dict[str, str][source]
addresses: list[dict[str, Any]][source]
sex: int | None[source]
error: str | None = None[source]
pas.importer.orchestrator._fetch_custom_data_worker(parliamentarian_queue: queue.Queue[Any], result_queue: queue.Queue[UpdateResult], session: requests.Session, base_url: str) None[source]

Worker thread that fetches API data for parliamentarians.

pas.importer.orchestrator.create_mock_file_data(data: list[dict[str, Any]], filename: str) onegov.core.types.LaxFileDict[source]
class pas.importer.orchestrator.KubImporter(token: str, base_url: str, output: onegov.pas.log.OutputHandler | None = None)[source]

KUB API importer that manages shared session and state for both data import and custom data updates.

token[source]
base_url[source]
output = None[source]
session[source]
__enter__() Self[source]
__exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) None[source]
_check_api_accessibility() None[source]

Check if the API is accessible.

_update_address_fields(parliamentarian: Any, addresses: list[dict[str, Any]]) list[str][source]

Update parliamentarian address fields based on addresses data.

Returns list of updated field names.

_update_gender_field(parliamentarian: Any, sex: int) bool[source]
_fetch_api_data_with_pagination(endpoint: str) list[dict[str, Any]][source]

Fetches all data from a paginated API endpoint using the shared session.

Args:
endpoint: The API endpoint (e.g., ‘people’, ‘organizations’,

‘memberships’)

Returns:

List of all records from all pages

update_custom_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, max_workers: int = 3, import_log_id: uuid.UUID | None = None) tuple[int, int, list[dict[str, Any]]][source]

Multi-threaded function that updates parliamentarians with custom field data.

Uses a queue-based approach where worker threads fetch API data concurrently while the main thread handles all database updates to maintain thread safety.

Returns:

Tuple of (updated_count, error_count, output_messages)

import_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str = 'automatic') tuple[dict[str, Any], list[Any], list[Any], list[Any], uuid.UUID][source]

Performs KUB data import using the shared session.

Returns:
Tuple of (import_results, people_data, organization_data,

membership_data, import_log_id)

run_full_sync(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str, update_custom: bool = True, max_workers: int = 3) tuple[dict[str, Any], uuid.UUID][source]

This is the main entry point for cronjobs and automated imports. Complete KUB synchronization including import and custom data update.

Returns:

Tuple of (combined_results, import_log_id)