from __future__ import annotations
import click
import transaction
from openpyxl import load_workbook
from onegov.core.cli import command_group
from onegov.translator_directory.collections.translator import (
TranslatorCollection)
from onegov.translator_directory import log
from onegov.translator_directory.models.language import Language
from onegov.translator_directory.models.translator import Translator
from onegov.translator_directory.utils import (
update_drive_distances, geocode_translator_addresses)
from onegov.user import User
from onegov.user.auth.clients import LDAPClient
from onegov.user.auth.provider import ensure_user
from onegov.user.sync import ZugUserSource
from sqlalchemy import or_, and_
from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Iterator
from ldap3.core.connection import Connection as LDAPConnection
from onegov.translator_directory.app import TranslatorDirectoryApp
from onegov.translator_directory.request import TranslatorAppRequest
from sqlalchemy.orm import Session
from uuid import UUID
[docs]
def fetch_users(
app: TranslatorDirectoryApp,
session: Session,
ldap_server: str,
ldap_username: str,
ldap_password: str,
admin_group: str,
editor_group: str,
verbose: bool = False,
skip_deactivate: bool = False,
dry_run: bool = False
) -> None:
""" Implements the fetch-users cli command. """
admin_group = admin_group.lower()
editor_group = editor_group.lower()
sources = ZugUserSource.factory(verbose=verbose)
translator_coll = TranslatorCollection(app, user_role='admin')
translators = {translator.email for translator in translator_coll.query()}
def users(connection: LDAPConnection) -> Iterator[dict[str, Any]]:
for src in sources:
for base, search_filter, attrs in src.bases_filters_attributes:
success = connection.search(
base, search_filter, attributes=attrs
)
if not success:
log.error('Error importing events', exc_info=True)
raise RuntimeError(
f"Could not query '{base}' "
f"with filter '{search_filter}'"
)
yield from src.map_entries(
connection.entries,
admin_group=admin_group,
editor_group=editor_group,
base=base,
search_filter=search_filter
)
def handle_inactive(synced_ids: list[UUID]) -> None:
inactive = session.query(User).filter(
and_(
User.id.notin_(synced_ids),
or_(
User.source == 'ldap',
User.role == 'member'
)
)
)
for user_ in inactive:
if user_.active:
log.info(f'Deactivating inactive user {user_.username}')
user_.active = False
client = LDAPClient(ldap_server, ldap_username, ldap_password)
client.try_configuration()
count = 0
synced_users = []
for data in users(client.connection):
if data['mail'] in translators:
log.info(f'Skipping {data["mail"]}, translator exists')
continue
if data['type'] == 'ldap':
source = 'ldap'
source_id = data['source_id']
force_role = True
elif data['type'] == 'regular':
source = None
source_id = None
force_role = False
else:
log.error('Unknown auth provider', exc_info=False)
raise NotImplementedError()
user = ensure_user(
source=source,
source_id=source_id,
session=session,
username=data['mail'],
role=data['role'],
force_role=force_role,
force_active=True
)
synced_users.append(user.id)
count += 1
log.info(f'Synchronized {count} users')
if not skip_deactivate:
handle_inactive(synced_users)
if dry_run:
transaction.abort()
@cli.command(name='fetch-users', context_settings={'singular': True})
@click.option('--ldap-server', required=True)
@click.option('--ldap-username', required=True)
@click.option('--ldap-password', required=True)
@click.option('--admin-group', required=True, help='group id for role admin')
@click.option('--editor-group', required=True, help='group id for role editor')
@click.option('--verbose', is_flag=True, default=False)
@click.option('--skip-deactivate', is_flag=True, default=False)
@click.option('--dry-run', is_flag=True, default=False)
[docs]
def fetch_users_cli(
ldap_server: str,
ldap_username: str,
ldap_password: str,
admin_group: str,
editor_group: str,
verbose: bool,
skip_deactivate: bool,
dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
r""" Updates the list of users by fetching matching users
from a remote LDAP server.
This is currently highly specific for the Canton of Zug and therefore most
values are hard-coded.
Example:
.. code-block:: bash
onegov-translator --select /translator_directory/zug fetch-users \
--ldap-server 'ldaps://1.2.3.4' \
--ldap-username 'foo' \
--ldap-password 'bar' \
--admin-group 'ou=Admins' \
--editor-group 'ou=Editors'
"""
def execute(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
fetch_users(
app,
request.session,
ldap_server,
ldap_username,
ldap_password,
admin_group,
editor_group,
verbose,
skip_deactivate,
dry_run
)
return execute
@cli.command(name='update-drive-distance', context_settings={'singular': True})
@click.option('--dry-run/-no-dry-run', default=False)
@click.option('--only-empty/--all', default=True)
@click.option(
'--tolerance-factor',
help='Do not overwrite existing distances if off by +- a factor',
default=0.3,
type=float
)
@click.option(
'--max-tolerance',
type=int,
help='Tolerate this maximum deviation (km) from an old saved distance',
default=15
)
@click.option(
'--max-distance',
type=int,
help='Do accept routes longer than this distance (km)',
default=300
)
[docs]
def drive_distances_cli(
dry_run: bool,
only_empty: bool,
tolerance_factor: float,
max_tolerance: int,
max_distance: int
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
def get_distances(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
tot, routes_found, _distance_changed, no_routes, tolerance_failed = (
update_drive_distances(
request,
only_empty,
tolerance_factor,
max_tolerance,
max_distance
)
)
click.secho(f'Directions not found: {len(no_routes)}/{tot}',
fg='yellow')
click.secho(f'Over tolerance: {len(tolerance_failed)}/{routes_found}',
fg='yellow')
if no_routes:
click.secho(
'Listing all translators whose directions could not be found')
for trs in no_routes:
click.secho(f'- {request.link(trs, name="edit")}')
if tolerance_failed:
click.secho(
'Listing all translators who failed distance check')
for trs, new_dist in tolerance_failed:
click.secho(f'- {request.link(trs, name="edit")}')
click.secho(f' old: {trs.drive_distance}; new: {new_dist}')
if dry_run:
transaction.abort()
return get_distances
@cli.command(name='geocode', context_settings={'singular': True})
@click.option('--dry-run/-no-dry-run', default=False)
@click.option('--only-empty/--all', default=True)
[docs]
def geocode_cli(
dry_run: bool,
only_empty: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
def do_geocode(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
if not app.mapbox_token:
click.secho('No mapbox token found, aborting...', fg='yellow')
return
trs_total, total, geocoded, skipped, not_found = (
geocode_translator_addresses(
request, only_empty,
bbox=None
)
)
click.secho(f'{total} translators of {trs_total} have an address')
click.secho(f'Changed: {geocoded}/{total - skipped}, '
f'skipped: {skipped}/{total}',
fg='green')
click.secho(f'Coordinates not found: '
f'{len(not_found)}/{total - skipped}',
fg='yellow')
click.secho('Listing all translators whose address could not be found')
for trs in not_found:
click.secho(f'- {request.link(trs, name="edit")}')
if dry_run:
transaction.abort()
return do_geocode
@cli.command(name='update-accounts', context_settings={'singular': True})
@click.option('--dry-run/-no-dry-run', default=False)
[docs]
def update_accounts_cli(
dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
""" Updates user accounts for translators. """
def do_update_accounts(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
translators = TranslatorCollection(request.app, user_role='admin')
for translator in translators.query():
translators.update_user(translator, translator.email)
if dry_run:
transaction.abort()
return do_update_accounts
[docs]
LANGUAGES = (
'Afrikaans',
'Albanisch',
'Amharisch',
'Anyin',
'Arabisch',
'Arabisch (Dialekte)',
'Arabisch (Hocharabisch)',
'Arabisch (Masri)',
'Arabisch (Nahost)',
'Aramäisch',
'Armenisch',
'Aserbaidschanisch',
'Badini',
'Bangla',
'Bengalisch',
'Bilen',
'Bosnisch',
'Bulgarisch',
'Chinesisch',
'Chinesisch (Hokkien)',
'Chinesisch (Mandarin)',
'Dänisch',
'Dari',
'Dari (Afghanistan)',
'Deutsch',
'Diola',
'Edo',
'Englisch',
'Ewe',
'Farsi',
'Farsi (Persisch)',
'Farsi Persisch (Afghanistan Iran)',
'Flämisch',
'Französisch',
'Friesisch',
'Galicisch',
'Gebärdensprache',
'Georgisch',
'Griechisch',
'Gujarati',
'Hebräisch',
'Hindi',
'Ibo',
'Igbo',
'Ijaw',
'Indonesisch',
'Irakisch',
'Iranisch',
'Italienisch',
'Italienisch (Dialekte Süditalien)',
'Itsekiri',
'Japanisch',
'Kabyé',
'Kalabari',
'Kantonesisch',
'Kasachisch',
'Keine schriftlichen Übersetzungen',
'Keine Verdolmetschung',
'Kotokoli',
'Kreolisch',
'Kroatisch',
'Kurdisch',
'Kurdisch (Dialekte)',
'Kurmanci',
'Kyrillisch (Serbien)',
'Lettisch',
'Litauisch',
'Mandarin',
'Mandinka',
'Marathi',
'Marokkanisch',
'Mazedonisch',
'Mina',
'Moldauisch',
'Mongolisch',
'Montenegrinisch',
'Niederländisch',
'Oromo',
'Pakistanisch',
'Panjabi',
'Paschto (Afghanistan, Pakistan)',
'Paschtu',
'Patois',
'Persisch',
'Pidgin',
'Pidgin-Englisch',
'Pidgin-Französisch',
'Pidgin-Nigerianisch',
'Pilipino',
'Polnisch',
'Portugiesisch',
'Portugiesisch (Brasil)',
'Punjabi',
'Rumänisch',
'Russisch',
'Schwedisch',
'Serbisch',
'Serbokroatisch',
'Shandong-Dialekt',
'Shanghai-Dialekt',
'Singhalesisch',
'Slowakisch',
'Somali',
'Sorani',
'Spanisch',
'Suaheli',
'Tadschikisch',
'Tagalog',
'Tamil',
'Tamilisch',
'Telugu',
'Tem',
'Thailändisch',
'Tibetisch',
'Tigrinya',
'Tschechisch',
'Türkisch',
'Türkisch (Dialekte)',
'Turkmenisch',
'Uigurisch',
'Ukrainisch',
'Ungarisch',
'Urdu',
'Usbekisch',
'VG ZG',
'Vietnamesisch',
'Weissrussisch',
'Wolof',
'Yoruba',
'Yue-Chinesisch'
)
@cli.command(name='create-languages', context_settings={'singular': True})
@click.option('--dry-run', is_flag=True, default=False)
[docs]
def create_languages(
dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
"""
Create languages for the selected translator schema. Languages get
created if they don't exist to prevent id changes.
This command is useful when new languages were added to the LANGUAGES list.
NOTE: No language will be deleted. If a language is not in the LANGUAGES
list the script will print a message.
Example:
onegov-translator --select /translator_directory/schaffhausen
create-languages --dry-run
"""
def do_create_languages(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
# Compare existing languages
existing = request.session.query(Language).all()
existing_lang_names = [lang.name for lang in existing]
for language in existing:
if language.name not in LANGUAGES:
click.secho(f"Language '{language.name}' is "
f'unknown. You may delete it if not in use from '
f"'/languages'", fg='yellow')
# create languages if not existing (to prevent id changes)
add_count = 0
for language_name in LANGUAGES:
if language_name not in existing_lang_names:
add_count += 1
lang = Language(name=language_name)
request.session.add(lang)
click.secho(f'Inserted {add_count} languages of total '
f'{len(LANGUAGES)}', fg='green')
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
else:
request.session.flush()
return do_create_languages
@cli.command(name='force-delete-languages',
context_settings={'singular': True})
@click.option('--dry-run', is_flag=True, default=False)
[docs]
def force_delete_languages(
dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
"""
This command forcefully deletes all languages from the database and all
references will be lost.
This command is useful after the languages have changed and
assigned a lot for testing.
Example:
onegov-translator --select /translator_directory/schaffhausen
delete-languages --dry-run
"""
def do_delete_languages(
request: TranslatorAppRequest,
app: TranslatorDirectoryApp
) -> None:
i = input('Are you sure you want to delete all languages and losing '
'all references to it? [y/N]: ')
if i.lower() != 'y':
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
return
del_count = 0
languages = request.session.query(Language)
for lang in languages:
del_count += 1
request.session.delete(lang)
click.secho(f'Deleted {del_count} languages', fg='green')
if dry_run:
transaction.abort()
click.secho('Aborting transaction', fg='yellow')
else:
request.session.flush()
return do_delete_languages
@cli.command(
name='recalculate-travel-details', context_settings={'singular': True}
)
@click.option('--dry-run/-no-dry-run', default=False)
@click.option(
'--status',
type=click.Choice(['pending', 'confirmed', 'all']),
default='all',
help='Which reports to fix',
)
[docs]
def recalculate_travel_details_cli(
dry_run: bool, status: str
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
r"""Recalculate travel compensation and distance for open time reports.
This fixes incorrect travel calculations due to old logic errors.
Only updates reports that have not yet been exported to accounting.
Example:
onegov-translator --select /translator_directory/zug \
recalculate-travel-details --dry-run
"""
def do_recalculate(
request: TranslatorAppRequest, app: TranslatorDirectoryApp
) -> None:
from decimal import Decimal
from onegov.translator_directory.models.time_report import (
TranslatorTimeReport,
)
from onegov.translator_directory.utils import (
calculate_distance_to_location,
)
# Query open time reports (not exported)
query = request.session.query(TranslatorTimeReport).filter(
TranslatorTimeReport.exported == False
)
# Filter by status if specified
if status == 'pending':
query = query.filter(TranslatorTimeReport.status == 'pending')
elif status == 'confirmed':
query = query.filter(TranslatorTimeReport.status == 'confirmed')
reports = query.all()
updated_count = 0
skipped_count = 0
click.secho(f'Processing {len(reports)} time reports...', fg='blue')
for report in reports:
translator = report.translator
if not translator:
click.secho(
f'Skipping report {report.id}: translator not found',
fg='yellow',
)
skipped_count += 1
continue
# Calculate travel details
compensation = Decimal('0')
distance = None
# Skip travel calculation if requested
if report.assignment_type == 'on-site':
one_way_km = None
# Try to calculate distance if we have coordinates
if translator.coordinates:
location_key = report.assignment_location or ''
one_way_km = calculate_distance_to_location(
request, translator.coordinates, location_key, None
)
# Fall back to translator's drive_distance
if one_way_km is None and translator.drive_distance:
one_way_km = float(translator.drive_distance)
# Calculate compensation based on distance tiers
if one_way_km is not None:
distance = one_way_km
if one_way_km <= 25:
compensation = Decimal('20')
elif one_way_km <= 50:
compensation = Decimal('50')
elif one_way_km <= 100:
compensation = Decimal('100')
else:
compensation = Decimal('150')
# Update the report
old_comp = report.travel_compensation
old_dist = report.travel_distance
report.travel_compensation = compensation
report.travel_distance = distance
# Recalculate total compensation
breakdown = report.calculate_compensation_breakdown()
report.total_compensation = breakdown['total']
if old_comp != compensation or old_dist != distance:
click.secho(
f'✓ {translator.title}: '
f'comp {old_comp}→{compensation}, '
f'dist {old_dist}→{distance}',
fg='green',
)
updated_count += 1
else:
skipped_count += 1
click.secho(f'Updated: {updated_count}/{len(reports)}', fg='green')
if dry_run:
transaction.abort()
click.secho('Dry run: transaction aborted', fg='yellow')
else:
request.session.flush()
click.secho('Changes committed', fg='green')
return do_recalculate
@cli.command(
name='import-contract-numbers', context_settings={'singular': True}
)
@click.option(
'--file',
'-f',
required=True,
type=click.Path(exists=True),
help='Path to Excel file with contract numbers',
)
@click.option('--dry-run/-no-dry-run', default=False)
[docs]
def import_contract_numbers_cli(
file: str, dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
r"""Import contract numbers from Excel file and update translators.
The Excel file must have columns:
- 'Ma-Nr.' (personnel number, preferred for matching)
- 'Vertragsnummer' (contract number to update)
- 'Mitarbeiter' (optional, used as fallback for name-based matching)
If translator not found by Ma-Nr., will search by name from Mitarbeiter
column (supports formats: "Lastname, Firstname" or "Firstname Lastname").
Example:
onegov-translator --select /translator_directory/schaffhausen \
import-contract-numbers -f /tmp/file.xlsx --dry-run
"""
def do_import(
request: TranslatorAppRequest, app: TranslatorDirectoryApp
) -> None:
try:
wb = load_workbook(file)
ws = wb.active
if ws is None:
click.secho(
'Fehler: Kein aktives Arbeitsblatt in Excel-Datei '
'gefunden',
fg='red',
)
return
headers = {}
for col_idx, cell in enumerate(ws[1], start=1):
if cell.value:
headers[cell.value] = col_idx
required = {'Ma-Nr.', 'Vertragsnummer'}
if not required.issubset(headers.keys()):
click.secho(
f'Fehler: Excel-Datei muss folgende Spalten enthalten: '
f'{required}',
fg='red',
)
return
ma_nr_col = headers['Ma-Nr.']
contract_col = headers['Vertragsnummer']
mitarbeiter_col = headers.get('Mitarbeiter')
def find_by_name(name_str: str) -> Translator | None:
"""Parse name string and search for translator."""
if not name_str:
return None
parts = [p.strip() for p in name_str.split(',')]
if len(parts) == 2:
last_name, first_name = parts
else:
parts = name_str.split()
if len(parts) < 2:
return None
first_name = ' '.join(parts[:-1])
last_name = parts[-1]
translator = (
request.session.query(Translator)
.filter(
Translator.first_name.ilike(first_name),
Translator.last_name.ilike(last_name),
)
.first()
)
if translator:
return translator
translator = (
request.session.query(Translator)
.filter(
Translator.first_name.ilike(last_name),
Translator.last_name.ilike(first_name),
)
.first()
)
return translator
updated_by_number = 0
updated_by_name = 0
not_found = []
errors = []
for row_idx, row in enumerate(
ws.iter_rows(min_row=2, values_only=False), start=2
):
ma_nr_cell = row[ma_nr_col - 1]
contract_cell = row[contract_col - 1]
mitarbeiter_cell = (
row[mitarbeiter_col - 1] if mitarbeiter_col else None
)
if ma_nr_cell.value is None:
continue
try:
pers_id = int(ma_nr_cell.value) # type: ignore[arg-type]
except (ValueError, TypeError):
errors.append(
f'Zeile {row_idx}: Ungültige Ma-Nr. '
f'"{ma_nr_cell.value}"'
)
continue
contract_number = contract_cell.value
if not contract_number:
continue
found_by_number = (
request.session.query(Translator)
.filter_by(pers_id=pers_id)
.first()
)
found_by_name = None
if mitarbeiter_cell and mitarbeiter_cell.value:
found_by_name = find_by_name(str(mitarbeiter_cell.value))
if found_by_number:
translator = found_by_number
matched_by = 'number'
elif found_by_name:
translator = found_by_name
matched_by = 'name'
else:
not_found.append(
(
pers_id,
(
mitarbeiter_cell.value
if mitarbeiter_cell
else None
),
)
)
continue
old_contract = translator.contract_number
translator.contract_number = str(contract_number)
if matched_by == 'number':
updated_by_number += 1
else:
updated_by_name += 1
click.secho(
f'✓ {translator.title}: contract '
f'{old_contract}→{translator.contract_number}',
fg='green',
)
click.secho('\nImport-Zusammenfassung:')
total_updated = updated_by_number + updated_by_name
total_entries = total_updated + len(not_found)
click.secho(f'Verarbeitete CSV-Zeilen: {total_entries}', fg='blue')
click.secho(f'Aktualisiert: {total_updated}', fg='green')
click.secho(f' - Nach Nummer: {updated_by_number}')
click.secho(f' - Nach Name: {updated_by_name}')
total_translators = request.session.query(Translator).count()
click.secho(
f'\nGesamtzahl Übersetzer im System: {total_translators}',
fg='blue',
)
if not_found:
click.secho(f'Nicht gefunden: {len(not_found)}', fg='red')
for pers_id, name in not_found:
click.secho(f' - Ma-Nr. {pers_id} ({name})')
if errors:
click.secho(f'Fehler: {len(errors)}', fg='red')
for error in errors[:10]:
click.secho(f' - {error}')
if len(errors) > 10:
click.secho(f' ... und {len(errors) - 10} weitere')
without_contract = (
request.session.query(Translator)
.filter(
(Translator.contract_number == None)
| (Translator.contract_number == ''),
Translator.email != None,
)
.order_by(Translator.last_name, Translator.first_name)
.all()
)
if without_contract:
click.secho(
f'\nÜbersetzer ohne Vertragsnummer: '
f'{len(without_contract)}',
fg='yellow',
)
for translator in without_contract:
ma_nr = translator.pers_id or 'N/A'
name = translator.title
click.secho(f' - Ma-Nr. {ma_nr} ({name})')
if dry_run:
transaction.abort()
click.secho('\nTestlauf: Transaktion abgebrochen', fg='yellow')
else:
request.session.flush()
click.secho('\nÄnderungen gespeichert', fg='green')
except Exception as e:
click.secho(f'Fehler beim Lesen der Excel-Datei: {e}', fg='red')
if dry_run:
transaction.abort()
return do_import
@cli.command(name='change-finanzstelle', context_settings={'singular': True})
@click.option(
'--ticket-id',
'-t',
required=True,
type=str,
help='Ticket number (e.g., TRP-2024-00123)',
)
@click.option(
'--finanzstelle',
'-f',
required=True,
type=click.Choice(
[
'migrationsamt_und_passbuero',
'staatsanwaltschaft',
'gefaengnisverwaltung',
'polizei',
'obergericht',
'kantonsgericht',
'verkehrsabteilung_staatsanwaltschaft',
'jugendanwaltschaft',
]
),
help='New finanzstelle key',
)
@click.option('--dry-run/-no-dry-run', default=False)
[docs]
def change_finanzstelle_cli(
ticket_id: str, finanzstelle: str, dry_run: bool
) -> Callable[[TranslatorAppRequest, TranslatorDirectoryApp], None]:
r"""Change the finanzstelle of a time report by ticket ID.
Example:
onegov-translator --select /translator_directory/schaffhausen \
change-finanzstelle -t TRP-2024-00123 -f polizei --dry-run
"""
def do_change(
request: TranslatorAppRequest, app: TranslatorDirectoryApp
) -> None:
from onegov.ticket import Ticket
from onegov.translator_directory.models.time_report import (
TranslatorTimeReport,
)
from onegov.translator_directory.constants import FINANZSTELLE
ticket = (
request.session.query(Ticket)
.filter(Ticket.number == ticket_id)
.first()
)
if not ticket:
click.secho(f'Ticket {ticket_id} not found', fg='red')
return
handler_data = ticket.handler_data.get('handler_data', {})
time_report_id = handler_data.get('time_report_id')
if not time_report_id:
click.secho(f'Ticket {ticket_id} has no time report', fg='red')
return
time_report = (
request.session.query(TranslatorTimeReport)
.filter(TranslatorTimeReport.id == time_report_id)
.first()
)
if not time_report:
click.secho(
f'Time report with ID {time_report_id} not found', fg='red'
)
return
old_finanzstelle = time_report.finanzstelle
new_finanzstelle = finanzstelle
old_name = (
FINANZSTELLE[old_finanzstelle].name
if old_finanzstelle in FINANZSTELLE
else old_finanzstelle
)
new_name = FINANZSTELLE[new_finanzstelle].name
time_report.finanzstelle = new_finanzstelle
click.secho(
f'✓ Ticket {ticket_id}: finanzstelle changed',
fg='green',
)
click.secho(f' From: {old_name} ({old_finanzstelle})')
click.secho(f' To: {new_name} ({new_finanzstelle})')
if dry_run:
transaction.abort()
click.secho('Dry run: transaction aborted', fg='yellow')
else:
request.session.flush()
click.secho('Changes committed', fg='green')
return do_change