pas.importer.orchestrator ========================= .. py:module:: pas.importer.orchestrator Attributes ---------- .. autoapisummary:: pas.importer.orchestrator.log Classes ------- .. autoapisummary:: pas.importer.orchestrator.UpdateResult pas.importer.orchestrator.KubImporter Functions --------- .. autoapisummary:: pas.importer.orchestrator._fetch_custom_data_worker pas.importer.orchestrator.create_mock_file_data Module Contents --------------- .. py:data:: log .. py:class:: UpdateResult .. py:attribute:: parliamentarian_id :type: str .. py:attribute:: title :type: str .. py:attribute:: custom_values :type: dict[str, str] .. py:attribute:: addresses :type: list[dict[str, Any]] .. py:attribute:: sex :type: int | None .. py:attribute:: error :type: str | None :value: None .. py:function:: _fetch_custom_data_worker(parliamentarian_queue: queue.Queue[Any], result_queue: queue.Queue[UpdateResult], session: requests.Session, base_url: str) -> None Worker thread that fetches API data for parliamentarians. .. py:function:: create_mock_file_data(data: list[dict[str, Any]], filename: str) -> onegov.core.types.LaxFileDict .. py:class:: KubImporter(token: str, base_url: str, output: onegov.pas.log.OutputHandler | None = None) KUB API importer that manages shared session and state for both data import and custom data updates. .. py:attribute:: token .. py:attribute:: base_url .. py:attribute:: output :value: None .. py:attribute:: session .. py:method:: __enter__() -> Self .. py:method:: __exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) -> None .. py:method:: _check_api_accessibility() -> None Check if the API is accessible. .. py:method:: _update_address_fields(parliamentarian: Any, addresses: list[dict[str, Any]]) -> list[str] Update parliamentarian address fields based on addresses data. Returns list of updated field names. .. py:method:: _update_gender_field(parliamentarian: Any, sex: int) -> bool .. py:method:: _fetch_api_data_with_pagination(endpoint: str) -> list[dict[str, Any]] Fetches all data from a paginated API endpoint using the shared session. Args: endpoint: The API endpoint (e.g., 'people', 'organizations', 'memberships') Returns: List of all records from all pages .. py:method:: update_custom_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, max_workers: int = 3, import_log_id: uuid.UUID | None = None) -> tuple[int, int, list[dict[str, Any]]] Multi-threaded function that updates parliamentarians with custom field data. Uses a queue-based approach where worker threads fetch API data concurrently while the main thread handles all database updates to maintain thread safety. Returns: Tuple of (updated_count, error_count, output_messages) .. py:method:: import_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str = 'automatic') -> tuple[dict[str, Any], list[Any], list[Any], list[Any], uuid.UUID] Performs KUB data import using the shared session. Returns: Tuple of (import_results, people_data, organization_data, membership_data, import_log_id) .. py:method:: run_full_sync(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str, update_custom: bool = True, max_workers: int = 3) -> tuple[dict[str, Any], uuid.UUID] This is the main entry point for cronjobs and automated imports. Complete KUB synchronization including import and custom data update. Returns: Tuple of (combined_results, import_log_id)