pas.importer.orchestrator
Attributes
Classes
KUB API importer that manages shared session and state for both |
Functions
|
Worker thread that fetches API data for parliamentarians. |
|
Module Contents
- pas.importer.orchestrator._fetch_custom_data_worker(parliamentarian_queue: queue.Queue[Any], result_queue: queue.Queue[UpdateResult], session: requests.Session, base_url: str) None [source]
Worker thread that fetches API data for parliamentarians.
- pas.importer.orchestrator.create_mock_file_data(data: list[dict[str, Any]], filename: str) onegov.core.types.LaxFileDict [source]
- class pas.importer.orchestrator.KubImporter(token: str, base_url: str, output: onegov.pas.log.OutputHandler | None = None)[source]
KUB API importer that manages shared session and state for both data import and custom data updates.
- __exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) None [source]
- _update_address_fields(parliamentarian: Any, addresses: list[dict[str, Any]]) list[str] [source]
Update parliamentarian address fields based on addresses data.
Returns list of updated field names.
- _fetch_api_data_with_pagination(endpoint: str) list[dict[str, Any]] [source]
Fetches all data from a paginated API endpoint using the shared session.
- Args:
- endpoint: The API endpoint (e.g., ‘people’, ‘organizations’,
‘memberships’)
- Returns:
List of all records from all pages
- update_custom_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, max_workers: int = 3, import_log_id: uuid.UUID | None = None) tuple[int, int, list[dict[str, Any]]] [source]
Multi-threaded function that updates parliamentarians with custom field data.
Uses a queue-based approach where worker threads fetch API data concurrently while the main thread handles all database updates to maintain thread safety.
- Returns:
Tuple of (updated_count, error_count, output_messages)
- import_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str = 'automatic') tuple[dict[str, Any], list[Any], list[Any], list[Any], uuid.UUID] [source]
Performs KUB data import using the shared session.
- Returns:
- Tuple of (import_results, people_data, organization_data,
membership_data, import_log_id)
- run_full_sync(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str, update_custom: bool = True, max_workers: int = 3) tuple[dict[str, Any], uuid.UUID] [source]
This is the main entry point for cronjobs and automated imports. Complete KUB synchronization including import and custom data update.
- Returns:
Tuple of (combined_results, import_log_id)