pas.importer.orchestrator
Attributes
Exceptions
| Raised when the KUB API is not accessible. | 
Classes
| KUB API importer that manages shared session and state for both | 
Functions
| 
 | Worker thread that fetches API data for parliamentarians. | 
| 
 | 
Module Contents
- exception pas.importer.orchestrator.APIAccessibilityError[source]
- Bases: - Exception- Raised when the KUB API is not accessible. 
- pas.importer.orchestrator._fetch_custom_data_worker(parliamentarian_queue: queue.Queue[Any], result_queue: queue.Queue[UpdateResult], session: requests.Session, base_url: str) None[source]
- Worker thread that fetches API data for parliamentarians. 
- pas.importer.orchestrator.create_mock_file_data(data: list[dict[str, Any]], filename: str) onegov.core.types.LaxFileDict[source]
- class pas.importer.orchestrator.KubImporter(token: str, base_url: str, output: onegov.pas.log.OutputHandler | None = None)[source]
- KUB API importer that manages shared session and state for both data import and custom data updates. - __exit__(exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: types.TracebackType | None) None[source]
 - _update_address_fields(parliamentarian: Any, addresses: list[dict[str, Any]]) list[str][source]
- Update parliamentarian address fields based on addresses data. - Returns list of updated field names. 
 - _fetch_api_data_with_pagination(endpoint: str) list[dict[str, Any]][source]
- Fetches all data from a paginated API endpoint using the shared session. - Args:
- endpoint: The API endpoint (e.g., ‘people’, ‘organizations’,
- ‘memberships’) 
 
- Returns:
- List of all records from all pages 
 
 - update_custom_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, max_workers: int = 3, import_log_id: uuid.UUID | None = None) tuple[int, int, list[dict[str, Any]]][source]
- Multi-threaded function that updates parliamentarians with custom field data. - Uses a queue-based approach where worker threads fetch API data concurrently while the main thread handles all database updates to maintain thread safety. - Returns:
- Tuple of (updated_count, error_count, output_messages) 
 
 - import_data(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str = 'automatic') tuple[dict[str, Any], list[Any], list[Any], list[Any], uuid.UUID][source]
- Performs KUB data import using the shared session. - Returns:
- Tuple of (import_results, people_data, organization_data,
- membership_data, import_log_id) 
 
 
 - run_full_sync(request: onegov.pas.request.PasRequest, app: onegov.pas.app.PasApp, import_type: str, update_custom: bool = True, max_workers: int = 3) tuple[dict[str, Any], uuid.UUID][source]
- This is the main entry point for cronjobs and automated imports. Complete KUB synchronization including import and custom data update. - Returns:
- Tuple of (combined_results, import_log_id)