from __future__ import annotations
from onegov.pas.collections import AttendenceCollection
from onegov.pas.models.attendence import Attendence
from onegov.pas.models.commission import PASCommission
from onegov.pas.models.commission_membership import PASCommissionMembership
from onegov.pas.models.party import Party
from onegov.pas.models.parliamentarian import PASParliamentarian
from onegov.pas.models.parliamentarian_role import PASParliamentarianRole
from decimal import Decimal
from babel.numbers import format_decimal
from datetime import date
from uuid import UUID
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from onegov.pas.models import SettlementRun
from onegov.pas.models.attendence import Attendence
from onegov.town6.request import TownRequest
from onegov.user import User
from sqlalchemy.orm import Session
[docs]
def is_commission_president(
parliamentarian: PASParliamentarian,
attendance_or_commission_id: Attendence | UUID,
settlement_run: SettlementRun
) -> bool:
"""
Check if a parliamentarian is president of the commission for the given
attendance or commission_id during the settlement run period.
"""
if isinstance(attendance_or_commission_id, UUID):
commission_id = attendance_or_commission_id
return any(
cm.role == 'president'
for cm in parliamentarian.commission_memberships
if (
cm.commission_id == commission_id and (
cm.end is None or cm.end >= settlement_run.start
) and (
cm.start is None or cm.start <= settlement_run.end
)
)
)
else:
attendance = attendance_or_commission_id
return any(
cm.role == 'president'
for cm in parliamentarian.commission_memberships
if (
attendance.commission and
cm.commission_id == attendance.commission.id and (
cm.end is None or cm.end >= settlement_run.start
) and (
cm.start is None or cm.start <= settlement_run.end
)
)
)
[docs]
def get_parliamentarians_with_settlements(
session: Session,
start_date: date,
end_date: date
) -> list[PASParliamentarian]:
"""
Get all parliamentarians who were active and had settlements during the
specified period.
"""
active_parliamentarians = session.query(PASParliamentarian).filter(
PASParliamentarian.id.in_(
session.query(PASParliamentarianRole.parliamentarian_id).filter(
(PASParliamentarianRole.start.is_(None) | (
PASParliamentarianRole.start <= end_date)),
(PASParliamentarianRole.end.is_(None) | (
PASParliamentarianRole.end >= start_date))
)
)
).order_by(
PASParliamentarian.last_name,
PASParliamentarian.first_name
).all()
# Get all parliamentarians with attendances in one query
parliamentarians_with_attendances = {
pid[0] for pid in
session.query(Attendence.parliamentarian_id).filter(
Attendence.date >= start_date,
Attendence.date <= end_date
).distinct()
}
# Filter the active parliamentarians to only those with attendances
return [
p for p in active_parliamentarians
if p.id in parliamentarians_with_attendances
]
[docs]
def get_parties_with_settlements(
session: Session,
start_date: date,
end_date: date
) -> list[Party]:
"""
Get all parties that had active members with attendances during the
specified period.
This function ensures accurate party filtering by checking that
parliamentarians were active members of their party at the time of each
attendance, properly handling cases where parliamentarians switch parties
or have changing membership dates.
"""
return (
session.query(Party)
.filter(
Party.id.in_(
session.query(PASParliamentarianRole.party_id)
.join(PASParliamentarian)
.join(
Attendence,
PASParliamentarian.id == Attendence.parliamentarian_id,
)
.filter(
Attendence.date >= start_date,
Attendence.date <= end_date,
PASParliamentarianRole.party_id.isnot(None),
(
PASParliamentarianRole.start.is_(None)
| (PASParliamentarianRole.start <= Attendence.date)
),
(
PASParliamentarianRole.end.is_(None)
| (PASParliamentarianRole.end >= Attendence.date)
),
)
.distinct()
)
)
.order_by(Party.name)
.all()
)
[docs]
def is_parliamentarian(user: User | None) -> bool:
"""Check if a user has parliamentarian role."""
parls = {'parliamentarian', 'commission_president'}
if not user:
return False
if user.role in parls:
return True
return False
[docs]
def is_parliamentarian_role(role: str | None) -> bool:
"""Check if a role is a parliamentarian role."""
parls = {'parliamentarian', 'commission_president'}
return role in parls if role else False
[docs]
def get_active_commission_memberships(
user: User | None
) -> list[PASCommissionMembership]:
"""Get active commission memberships for a parliamentarian user."""
if (not user or not hasattr(user, 'parliamentarian')
or not user.parliamentarian):
return []
parliamentarian = user.parliamentarian
return [
m for m in parliamentarian.commission_memberships
if not m.end or m.end >= date.today()
]
[docs]
def get_commissions_with_memberships(
session: Session,
start_date: date,
end_date: date
) -> list[PASCommission]:
"""
Get all commissions that had active memberships during the
specified period.
This function ensures accurate commission filtering by checking that
commissions had active members during the period, properly handling
cases where memberships have changing dates.
"""
return (
session.query(PASCommission)
.filter(
PASCommission.id.in_(
session.query(PASCommissionMembership.commission_id)
.filter(
(
PASCommissionMembership.start.is_(None)
| (PASCommissionMembership.start <= end_date)
),
(
PASCommissionMembership.end.is_(None)
| (PASCommissionMembership.end >= start_date)
),
)
.distinct()
)
)
.order_by(PASCommission.name)
.all()
)
# FIXME: Should these two functions be a CLI command instead? Maybe switch
# to `click.echo` from `print` depending on the answer.
[docs]
def debug_party_export(
settlement_run: SettlementRun,
request: TownRequest,
party: Party
) -> None:
"""Debug function to trace party export data retrieval"""
session = request.session
# 1. Check basic party info
print(f'Party ID: {party.id}, Name: {party.name}') # noqa: T201
# 2. Check date range
print(f'Date range: {settlement_run.start} to {settlement_run.end}') # noqa: T201
# 3. Get all attendances without party filter first
base_attendances = (
AttendenceCollection(session)
.query()
.filter(
Attendence.date >= settlement_run.start,
Attendence.date <= settlement_run.end
)
.all()
)
print(f'Total attendances in date range: {len(base_attendances)}') # noqa: T201
# 4. Check parliamentarian roles
for attendance in base_attendances:
parl = attendance.parliamentarian
print(f'\nParliamentarian: {parl.first_name} {parl.last_name}') # noqa: T201
print(f'Attendance date: {attendance.date}') # noqa: T201
roles = session.query(PASParliamentarianRole).filter(
PASParliamentarianRole.parliamentarian_id == parl.id,
PASParliamentarianRole.party_id == party.id,
).all()
print('Roles:') # noqa: T201
for role in roles:
print(f'- Start: {role.start}, End: {role.end}') # noqa: T201
# Check if this attendance should be included
should_include = any(
(role.start is None or role.start <= attendance.date) and
(role.end is None or role.end >= attendance.date)
for role in roles
)
print(f'Should include: {should_include}') # noqa: T201
# 5. Try the actual party filter
party_attendances = (
AttendenceCollection(session)
.by_party(
party_id=str(party.id),
start_date=settlement_run.start,
end_date=settlement_run.end
)
.query()
.all()
)
print(f'\nFinal filtered attendances: {len(party_attendances)}') # noqa: T201
[docs]
def debug_party_export2(
request: TownRequest,
party: Party
) -> None:
session = request.session
print(f'Party ID: {party.id}') # noqa: T201
# Check roles directly
all_roles = session.query(PASParliamentarianRole).filter(
PASParliamentarianRole.party_id == party.id
).all()
print(f'\nTotal roles for party: {len(all_roles)}') # noqa: T201
for role in all_roles:
print(f'Role: {role.party_id} -> {role.parliamentarian_id}') # noqa: T201
# Check all parties
all_parties = session.query(Party).all()
print('\nAll parties:') # noqa: T201
for p in all_parties:
print(f'ID: {p.id}, Name: {p.name}') # noqa: T201