Source code for pas.utils

from __future__ import annotations

from onegov.pas.collections import AttendenceCollection
from onegov.pas.models.attendence import Attendence
from onegov.pas.models.commission import PASCommission
from onegov.pas.models.commission_membership import PASCommissionMembership
from onegov.pas.models.party import Party
from onegov.pas.models.parliamentarian import PASParliamentarian
from onegov.pas.models.parliamentarian_role import PASParliamentarianRole
from decimal import Decimal
from babel.numbers import format_decimal
from datetime import date
from uuid import UUID


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.pas.models import SettlementRun
    from onegov.pas.models.attendence import Attendence
    from onegov.town6.request import TownRequest
    from onegov.user import User
    from sqlalchemy.orm import Session


[docs] def format_swiss_number(value: Decimal | int) -> str: if not isinstance(value, (Decimal, int)): raise TypeError(f'Expected Decimal or int, got {type(value).__name__}') if isinstance(value, int): value = Decimal(value) return format_decimal(value, format='#,##0.00', locale='de_CH')
[docs] def is_commission_president( parliamentarian: PASParliamentarian, attendance_or_commission_id: Attendence | UUID, settlement_run: SettlementRun ) -> bool: """ Check if a parliamentarian is president of the commission for the given attendance or commission_id during the settlement run period. """ if isinstance(attendance_or_commission_id, UUID): commission_id = attendance_or_commission_id return any( cm.role == 'president' for cm in parliamentarian.commission_memberships if ( cm.commission_id == commission_id and ( cm.end is None or cm.end >= settlement_run.start ) and ( cm.start is None or cm.start <= settlement_run.end ) ) ) else: attendance = attendance_or_commission_id return any( cm.role == 'president' for cm in parliamentarian.commission_memberships if ( attendance.commission and cm.commission_id == attendance.commission.id and ( cm.end is None or cm.end >= settlement_run.start ) and ( cm.start is None or cm.start <= settlement_run.end ) ) )
[docs] def get_parliamentarians_with_settlements( session: Session, start_date: date, end_date: date ) -> list[PASParliamentarian]: """ Get all parliamentarians who were active and had settlements during the specified period. """ active_parliamentarians = session.query(PASParliamentarian).filter( PASParliamentarian.id.in_( session.query(PASParliamentarianRole.parliamentarian_id).filter( (PASParliamentarianRole.start.is_(None) | ( PASParliamentarianRole.start <= end_date)), (PASParliamentarianRole.end.is_(None) | ( PASParliamentarianRole.end >= start_date)) ) ) ).order_by( PASParliamentarian.last_name, PASParliamentarian.first_name ).all() # Get all parliamentarians with attendances in one query parliamentarians_with_attendances = { pid[0] for pid in session.query(Attendence.parliamentarian_id).filter( Attendence.date >= start_date, Attendence.date <= end_date ).distinct() } # Filter the active parliamentarians to only those with attendances return [ p for p in active_parliamentarians if p.id in parliamentarians_with_attendances ]
[docs] def get_parties_with_settlements( session: Session, start_date: date, end_date: date ) -> list[Party]: """ Get all parties that had active members with attendances during the specified period. This function ensures accurate party filtering by checking that parliamentarians were active members of their party at the time of each attendance, properly handling cases where parliamentarians switch parties or have changing membership dates. """ return ( session.query(Party) .filter( Party.id.in_( session.query(PASParliamentarianRole.party_id) .join(PASParliamentarian) .join( Attendence, PASParliamentarian.id == Attendence.parliamentarian_id, ) .filter( Attendence.date >= start_date, Attendence.date <= end_date, PASParliamentarianRole.party_id.isnot(None), ( PASParliamentarianRole.start.is_(None) | (PASParliamentarianRole.start <= Attendence.date) ), ( PASParliamentarianRole.end.is_(None) | (PASParliamentarianRole.end >= Attendence.date) ), ) .distinct() ) ) .order_by(Party.name) .all() )
[docs] def is_parliamentarian(user: User | None) -> bool: """Check if a user has parliamentarian role.""" parls = {'parliamentarian', 'commission_president'} if not user: return False if user.role in parls: return True return False
[docs] def is_parliamentarian_role(role: str | None) -> bool: """Check if a role is a parliamentarian role.""" parls = {'parliamentarian', 'commission_president'} return role in parls if role else False
[docs] def get_active_commission_memberships( user: User | None ) -> list[PASCommissionMembership]: """Get active commission memberships for a parliamentarian user.""" if (not user or not hasattr(user, 'parliamentarian') or not user.parliamentarian): return [] parliamentarian = user.parliamentarian return [ m for m in parliamentarian.commission_memberships if not m.end or m.end >= date.today() ]
[docs] def get_commissions_with_memberships( session: Session, start_date: date, end_date: date ) -> list[PASCommission]: """ Get all commissions that had active memberships during the specified period. This function ensures accurate commission filtering by checking that commissions had active members during the period, properly handling cases where memberships have changing dates. """ return ( session.query(PASCommission) .filter( PASCommission.id.in_( session.query(PASCommissionMembership.commission_id) .filter( ( PASCommissionMembership.start.is_(None) | (PASCommissionMembership.start <= end_date) ), ( PASCommissionMembership.end.is_(None) | (PASCommissionMembership.end >= start_date) ), ) .distinct() ) ) .order_by(PASCommission.name) .all() )
# FIXME: Should these two functions be a CLI command instead? Maybe switch # to `click.echo` from `print` depending on the answer.
[docs] def debug_party_export( settlement_run: SettlementRun, request: TownRequest, party: Party ) -> None: """Debug function to trace party export data retrieval""" session = request.session # 1. Check basic party info print(f'Party ID: {party.id}, Name: {party.name}') # noqa: T201 # 2. Check date range print(f'Date range: {settlement_run.start} to {settlement_run.end}') # noqa: T201 # 3. Get all attendances without party filter first base_attendances = ( AttendenceCollection(session) .query() .filter( Attendence.date >= settlement_run.start, Attendence.date <= settlement_run.end ) .all() ) print(f'Total attendances in date range: {len(base_attendances)}') # noqa: T201 # 4. Check parliamentarian roles for attendance in base_attendances: parl = attendance.parliamentarian print(f'\nParliamentarian: {parl.first_name} {parl.last_name}') # noqa: T201 print(f'Attendance date: {attendance.date}') # noqa: T201 roles = session.query(PASParliamentarianRole).filter( PASParliamentarianRole.parliamentarian_id == parl.id, PASParliamentarianRole.party_id == party.id, ).all() print('Roles:') # noqa: T201 for role in roles: print(f'- Start: {role.start}, End: {role.end}') # noqa: T201 # Check if this attendance should be included should_include = any( (role.start is None or role.start <= attendance.date) and (role.end is None or role.end >= attendance.date) for role in roles ) print(f'Should include: {should_include}') # noqa: T201 # 5. Try the actual party filter party_attendances = ( AttendenceCollection(session) .by_party( party_id=str(party.id), start_date=settlement_run.start, end_date=settlement_run.end ) .query() .all() ) print(f'\nFinal filtered attendances: {len(party_attendances)}') # noqa: T201
[docs] def debug_party_export2( request: TownRequest, party: Party ) -> None: session = request.session print(f'Party ID: {party.id}') # noqa: T201 # Check roles directly all_roles = session.query(PASParliamentarianRole).filter( PASParliamentarianRole.party_id == party.id ).all() print(f'\nTotal roles for party: {len(all_roles)}') # noqa: T201 for role in all_roles: print(f'Role: {role.party_id} -> {role.parliamentarian_id}') # noqa: T201 # Check all parties all_parties = session.query(Party).all() print('\nAll parties:') # noqa: T201 for p in all_parties: print(f'ID: {p.id}, Name: {p.name}') # noqa: T201