Source code for pas.cli
from __future__ import annotations
import click
import logging
import requests
import urllib3
import json
from onegov.core.cli import command_group
from onegov.pas.excel_header_constants import (
commission_expected_headers_variant_1,
commission_expected_headers_variant_2,
)
from onegov.pas.data_import import import_commissions_excel
from onegov.pas.views.data_import import load_and_concatenate_json
from onegov.pas.importer.zug_kub_importer import import_zug_kub_data
from onegov.pas.collections.import_log import ImportLogCollection
from onegov.core.utils import binary_to_dictionary
from onegov.pas.log import ClickOutputHandler
from onegov.pas.log import OutputHandler
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from onegov.core.types import LaxFileDict
from collections.abc import Callable
from onegov.pas.app import PasApp
from onegov.pas.app import TownRequest
from typing import TypeAlias
@cli.command('import-commission-data')
@click.argument('excel_file', type=click.Path(exists=True))
[docs]
def import_commission_data(
excel_file: str,
) -> Processor:
"""Import commission data from an Excel or csv file.
Assumes that the name of the commission is the filename.
Each row of this import contains a single line, which is a single
parliamentarian and all the information about them.
Example:
onegov-pas --select '/onegov_pas/zug' import-commission-data \
"Kommission_Gesundheit_und_Soziales.xlsx"
"""
def import_data(request: TownRequest, app: PasApp) -> None:
try:
import_commissions_excel(
excel_file,
request.session,
excel_file,
expected_headers=commission_expected_headers_variant_1,
)
click.echo('Ok.')
except Exception:
click.echo('Trying the other variant of headers...')
import_commissions_excel(
excel_file,
request.session,
excel_file,
expected_headers=commission_expected_headers_variant_2,
)
click.echo('Ok.')
return import_data
@cli.command('check-api')
@click.option('--url', default='',
help='API endpoint to check')
@click.option('--token', default='', help='Authorization token')
[docs]
def check_api(url: str, token: str) -> None:
""" Check if the KuB API is reachable.
Example:
onegov-pas check-api --token "your-token-here"
"""
# Disable SSL warnings (self signed)
urllib3.disable_warnings(
urllib3.exceptions.InsecureRequestWarning)
headers = {
'Authorization': f'Token {token}',
'Accept': 'application/json'
}
try:
response = requests.get(
url, headers=headers, verify=False, timeout=10 # nosec: B501
)
click.echo(f'Status Code: {response.status_code}')
click.echo(f'Response: {response.text}')
if response.status_code == 200:
click.echo('✓ API is reachable')
else:
click.echo('✗ API returned non-200 status')
except requests.exceptions.RequestException as e:
click.echo(f'✗ Failed to reach API: {e}')
[docs]
def fetch_api_data_with_pagination(
endpoint: str, token: str, base_url: str
) -> list[dict[str, Any]]:
"""
Fetches all data from a paginated API endpoint.
Args:
endpoint: The API endpoint (e.g., 'people', 'organizations',
'memberships')
token: Authorization token
base_url: Base URL for the API
Returns:
List of all records from all pages
"""
# Disable SSL warnings (self signed)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'Authorization': f'Token {token}',
'Accept': 'application/json'
}
all_results = []
url = f'{base_url}/{endpoint}'
while url:
try:
response = requests.get(
url, headers=headers, verify=False, timeout=30 # nosec: B501
)
response.raise_for_status()
data = response.json()
results = data.get('results', [])
all_results.extend(results)
# Get next page URL
url = data.get('next')
except requests.exceptions.RequestException as e:
raise click.ClickException(
f'Failed to fetch from {endpoint}: {e}'
) from e
except ValueError as e:
raise click.ClickException(
f'Invalid JSON response from {endpoint}: {e}'
) from e
return all_results
[docs]
def create_mock_file_data(
data: list[dict[str, Any]], filename: str
) -> LaxFileDict:
json_content = {'results': data}
json_str = json.dumps(json_content, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
file_dict = binary_to_dictionary(json_bytes, filename)
return file_dict # type: ignore[return-value]
[docs]
def perform_kub_import(
request: TownRequest,
app: PasApp,
token: str,
base_url: str,
output: OutputHandler | None = None
) -> tuple[dict[str, Any], list[Any], list[Any], list[Any]]:
"""
Reusable function that performs KUB data import.
Args:
request: TownRequest object
app: PasApp object
token: Authorization token for KUB API
base_url: Base URL for the KUB API
output: Optional output handler for progress messages
Returns:
Tuple of (import_results, people_data, organization_data,
membership_data)
"""
try:
if output:
output.info('Checking API accessibility...')
# Check API accessibility
headers = {
'Authorization': f'Token {token}',
'Accept': 'application/json'
}
test_url = f'{base_url}/people'
try:
response = requests.get(
test_url, headers=headers, verify=False, timeout=10 # nosec: B501
)
if response.status_code != 200:
raise RuntimeError(
f'API check failed: {response.status_code} - '
f'{response.text}'
)
if output:
output.success('API is accessible')
except requests.exceptions.RequestException as e:
raise RuntimeError(f'API check failed: {e}') from e
if output:
output.info('Fetching people data...')
people_raw = fetch_api_data_with_pagination('people', token, base_url)
if output:
output.success(f'Fetched {len(people_raw)} people records')
if output:
output.info('Fetching organizations data...')
organizations_raw = fetch_api_data_with_pagination(
'organizations', token, base_url
)
if output:
output.success(
f'Fetched {len(organizations_raw)} organization records'
)
if output:
output.info('Fetching memberships data...')
memberships_raw = fetch_api_data_with_pagination(
'memberships', token, base_url
)
if output:
output.success(
f'Fetched {len(memberships_raw)} membership records'
)
# Create mock file data structures for load_and_concatenate_json
people_mock_files = [create_mock_file_data(people_raw, 'people.json')]
organizations_mock_files = [
create_mock_file_data(organizations_raw, 'organizations.json')
]
memberships_mock_files = [
create_mock_file_data(memberships_raw, 'memberships.json')
]
# Use load_and_concatenate_json to process the data
if output:
output.info('Processing people data...')
people_data = load_and_concatenate_json(people_mock_files)
if output:
output.info('Processing organizations data...')
organization_data = load_and_concatenate_json(organizations_mock_files)
if output:
output.info('Processing memberships data...')
membership_data = load_and_concatenate_json(memberships_mock_files)
# Perform the import
if output:
output.info('Starting data import...')
import_results = import_zug_kub_data(
session=request.session,
people_data=people_data,
organization_data=organization_data,
membership_data=membership_data,
user_id=request.current_user.id if request.current_user else None
)
# Display results
if output:
total_created = 0
total_updated = 0
total_processed = 0
for category, details in import_results.items():
created_count = len(details.get('created', []))
updated_count = len(details.get('updated', []))
processed_count = details.get('processed', 0)
total_created += created_count
total_updated += updated_count
total_processed += processed_count
if (created_count > 0 or updated_count > 0 or
processed_count > 0):
output.info(
f'{category}: {created_count} created, '
f'{updated_count} updated, {processed_count} processed'
)
output.success(
f'Import completed successfully! '
f'Total: {total_created} created, {total_updated} updated, '
f'{total_processed} processed'
)
return import_results, people_data, organization_data, membership_data
except Exception as e:
if output:
output.error(f'Import failed: {e}')
raise
@cli.command('import-kub-data')
@click.option('--token', required=True, help='Authorization token for KUB API')
@click.option('--base-url', help='Base URL for the KUB API, ending in /api/v2')
[docs]
def import_kub_data(token: str, base_url: str) -> Processor:
"""
Import data from the KUB API endpoints.
Fetches data from /people, /organizations, and /memberships endpoints
and imports them using the existing import logic.
Example:
onegov-pas --select '/onegov_pas/zug' import-kub-data \
--token "your-token-here"
"""
def cli_wrapper(request: TownRequest, app: PasApp) -> None:
"""CLI wrapper that calls the reusable import function."""
output_handler = ClickOutputHandler()
try:
import_results, people_data, organization_data, membership_data = (
perform_kub_import(
request, app, token, base_url, output_handler
)
)
# Add the concatenated data to the most recent ImportLog entry
output_handler.info('Saving concatenated data to import log...')
import_log_collection = ImportLogCollection(request.session)
most_recent_log = (
import_log_collection.most_recent_completed_cli_import()
)
if most_recent_log:
most_recent_log.people_source = people_data
most_recent_log.organizations_source = organization_data
most_recent_log.memberships_source = membership_data
request.session.flush()
output_handler.success('Concatenated data saved to import log')
# Display detailed import results
output_handler.info('Import Results Summary:')
for category, details in import_results.items():
created_count = len(details.get('created', []))
updated_count = len(details.get('updated', []))
processed_count = details.get('processed', 0)
if (created_count > 0 or updated_count > 0
or processed_count > 0):
output_handler.info(
f' {category}: {created_count} created, '
f'{updated_count} updated, {processed_count} processed'
)
except Exception as e:
output_handler.error(f'Import failed: {e}')
raise
return cli_wrapper
@cli.command('update-custom-data')
@click.option('--token', required=True, help='Authorization token for KUB API')
@click.option('--base-url', required=True,
help='Base URL for the KUB API, ending in /api/v2')
[docs]
def update_custom_data(token: str, base_url: str) -> Processor:
"""
Update parliamentarians with customFields data which
somehow is not included in the main /people api.
Needs to be run after import_kub_data
Example:
onegov-pas --select '/onegov_pas/zug' update-custom-data \
--token "your-token-here" \
--base-url "url-ending-in/api/v2"
"""
def update_data(request: TownRequest, app: PasApp) -> None:
from onegov.parliament.models import Parliamentarian
field_mappings = {
'personalnummer': 'personnel_number',
'vertragsnummer': 'contract_number',
'wahlkreis': 'district',
'beruf': 'occupation',
'adress_anrede': 'salutation_for_address',
'brief_anrede': 'salutation_for_letter'
}
output_handler = ClickOutputHandler()
# Disable SSL warnings (self signed)
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
headers = {
'Authorization': f'Token {token}',
'Accept': 'application/json'
}
# Get all parliamentarians with external_kub_id
parliamentarians = request.session.query(Parliamentarian).filter(
Parliamentarian.external_kub_id.isnot(None)
).all()
if not parliamentarians:
output_handler.info(
'No parliamentarians with external_kub_id found'
)
return
output_handler.info(
f'Found {len(parliamentarians)} parliamentarians to update'
)
updated_count = 0
error_count = 0
for parliamentarian in parliamentarians:
try:
person_id = parliamentarian.external_kub_id
url = f'{base_url}/people/{person_id}'
output_handler.info(
f'Fetching data for {parliamentarian.title} ({person_id})'
)
response = requests.get(
url, headers=headers, verify=False, timeout=30 # nosec: B501
)
response.raise_for_status()
person_data = response.json()
custom_values = person_data.get('customValues', {})
# Update parliamentarian with custom values
updated_fields = []
for custom_key, attr_name in field_mappings.items():
if custom_key in custom_values:
setattr(
parliamentarian, attr_name,
custom_values[custom_key]
)
updated_fields.append(attr_name)
if updated_fields:
updated_count += 1
output_handler.success(
f'Updated {parliamentarian.title}: '
f'{", ".join(updated_fields)}'
)
else:
output_handler.info(
f'No custom data found for {parliamentarian.title}'
)
except requests.exceptions.RequestException as e:
error_count += 1
output_handler.error(
f'Failed to fetch data for {parliamentarian.title}: {e}'
)
except Exception as e:
error_count += 1
output_handler.error(
f'Error updating {parliamentarian.title}: {e}'
)
# Commit the changes
try:
request.session.commit()
output_handler.success(
f'Update completed: {updated_count} updated, '
f'{error_count} errors'
)
except Exception as e:
request.session.rollback()
output_handler.error(f'Failed to commit changes: {e}')
raise
return update_data