Source code for pas.collections.attendence

from __future__ import annotations

from sqlalchemy import desc, or_
from sqlalchemy.orm import joinedload

from onegov.core.collection import GenericCollection
from onegov.pas.models import (
    Attendence,
    PASParliamentarian,
    PASParliamentarianRole,
    SettlementRun,
)

from typing import TYPE_CHECKING, Self

if TYPE_CHECKING:
    from datetime import date
    from sqlalchemy.orm import Query, Session
    from onegov.pas.request import PasRequest


[docs] class AttendenceCollection(GenericCollection[Attendence]): def __init__( self, session: Session, settlement_run_id: str | None = None, date_from: date | None = None, date_to: date | None = None, type: str | None = None, parliamentarian_id: str | None = None, commission_id: str | None = None, party_id: str | None = None, # New parameter ): super().__init__(session)
[docs] self.settlement_run_id = settlement_run_id
[docs] self.date_from = date_from
[docs] self.date_to = date_to
[docs] self.type = type
[docs] self.parliamentarian_id = parliamentarian_id
[docs] self.commission_id = commission_id
[docs] self.party_id = party_id
@property
[docs] def model_class(self) -> type[Attendence]: return Attendence
[docs] def query(self) -> Query[Attendence]: query = super().query() # Eagerly load related data to prevent N+1 queries query = query.options( joinedload(Attendence.parliamentarian).joinedload(PASParliamentarian.roles), joinedload(Attendence.commission) ) if self.settlement_run_id: settlement_run = self.session.query(SettlementRun).get( self.settlement_run_id ) if settlement_run: query = query.filter( Attendence.date >= settlement_run.start, Attendence.date <= settlement_run.end, ) if self.date_from: query = query.filter(Attendence.date >= self.date_from) if self.date_to: query = query.filter(Attendence.date <= self.date_to) if self.type: query = query.filter(Attendence.type == self.type) if self.parliamentarian_id: query = query.filter( Attendence.parliamentarian_id == self.parliamentarian_id ) if self.commission_id: query = query.filter( Attendence.commission_id == self.commission_id ) # Check for any overlap in party membership period if self.party_id: query = ( query.join(Attendence.parliamentarian) .join(PASParliamentarian.roles) .filter( PASParliamentarianRole.party_id == self.party_id, or_( PASParliamentarianRole.start.is_(None), PASParliamentarianRole.start <= Attendence.date ), or_( PASParliamentarianRole.end.is_(None), PASParliamentarianRole.end >= Attendence.date ) ) ) return query.order_by(desc(Attendence.date))
[docs] def for_filter( self, settlement_run_id: str | None = None, date_from: date | None = None, date_to: date | None = None, type: str | None = None, parliamentarian_id: str | None = None, commission_id: str | None = None, party_id: str | None = None, # New parameter ) -> Self: return self.__class__( self.session, settlement_run_id=settlement_run_id, date_from=date_from, date_to=date_to, type=type, parliamentarian_id=parliamentarian_id, commission_id=commission_id, party_id=party_id, )
[docs] def by_party( self, party_id: str, start_date: date, end_date: date ) -> Self: """ Filter attendances by party membership during a period. Returns attendances where the parliamentarian belonged to the party at any point during the period. """ return self.for_filter( settlement_run_id=self.settlement_run_id, date_from=start_date, date_to=end_date, type=self.type, parliamentarian_id=self.parliamentarian_id, commission_id=self.commission_id, party_id=party_id, )
[docs] def for_parliamentarian(self, parliamentarian_id: str) -> Self: """Returns attendances for a specific parliamentarian only.""" return self.for_filter(parliamentarian_id=parliamentarian_id)
[docs] def for_commission_president( self, parliamentarian_id: str, active_commission_ids: list[str] ) -> Query[Attendence]: """ Returns attendances for a commission president: - Their own attendances - Attendances of members in commissions they preside over """ query = self.query() return query.filter( (Attendence.parliamentarian_id == parliamentarian_id) | (Attendence.commission_id.in_(active_commission_ids)) )
[docs] def view_for_parliamentarian( self, request: PasRequest ) -> list[Attendence]: """ Returns filtered attendances based on user role and permissions. This encapsulates the filtering logic previously in the view. """ user = request.current_user if not request.is_parliamentarian: # Admins see all attendances return self.query().all() if not (user and hasattr(user, 'parliamentarian') and user.parliamentarian): return [] parliamentarian = user.parliamentarian if user.role == 'commission_president': from datetime import date # Commission presidents see own + commission members' attendances # We have to check all but usually president of just one active_presidencies = [ cm.commission_id for cm in parliamentarian.commission_memberships if (cm.role == 'president' and (cm.end is None or cm.end >= date.today())) ] if active_presidencies: return self.for_commission_president( str(parliamentarian.id), active_presidencies ).all() else: # Fallback to own attendances only return self.for_parliamentarian( str(parliamentarian.id) ).query().all() else: # Regular parliamentarians see only their own attendances return self.for_parliamentarian( str(parliamentarian.id) ).query().all()