Source code for pas.collections.parliamentarian

from __future__ import annotations

import logging
from datetime import date
from email_validator import EmailNotValidError, validate_email
from onegov.core.utils import toggle
from onegov.core.crypto import random_password
from onegov.parliament.collections import ParliamentarianCollection
from onegov.pas.models import PASParliamentarian
from onegov.user import UserCollection
from sqlalchemy.orm import selectinload

[docs] log = logging.getLogger('onegov.pas.collections.parliamentarian')
from typing import TYPE_CHECKING if TYPE_CHECKING: from typing import Any, Self from onegov.core import Framework
[docs] class PASParliamentarianCollection( ParliamentarianCollection[PASParliamentarian] ): def __init__(self, app: Framework, **kwargs: Any) -> None: super().__init__(app.session(), **kwargs)
[docs] self.app = app
[docs] def for_filter( self, active: bool | None = None, party: str | None = None, ) -> Self: active_ = toggle(self.active, active) party_ = toggle(self.party, party) return self.__class__( self.app, active=active_, party=party_ )
@property
[docs] def model_class(self) -> type[PASParliamentarian]: return PASParliamentarian
[docs] def add(self, **kwargs: Any) -> PASParliamentarian: item = super().add(**kwargs) if not item.email_primary: log.warning( f'Creating parliamentarian {item.title} without' 'email_primary. This will prevent user account' 'creation and may cause permission-related failures.' ) self.update_user(item, item.email_primary) self.session.flush() return item
[docs] def delete(self, item: PASParliamentarian) -> None: self.update_user(item, None) self.session.delete(item) self.session.flush()
[docs] def _is_current_commission_president( self, item: PASParliamentarian ) -> bool: """Check if the parliamentarian is currently a president of any commission.""" today = date.today() return any( membership.role == 'president' and (membership.start is None or membership.start <= today) and (membership.end is None or membership.end >= today) for membership in item.commission_memberships )
[docs] def _representatives_by_email( self, parliamentarians: list[PASParliamentarian], ) -> dict[str, PASParliamentarian]: """Pick one parliamentarian per unique email, prioritizing commission presidents.""" by_email: dict[str, PASParliamentarian] = {} for parl in parliamentarians: if not parl.email_primary: continue key = parl.email_primary.lower() existing = by_email.get(key) if existing is None or ( self._is_current_commission_president(parl) and not self._is_current_commission_president(existing) ): by_email[key] = parl return by_email
[docs] def update_user( self, item: PASParliamentarian, new_email: str | None, users_cache: dict[str, Any] | None = None, ) -> None: """Keep the parliamentarian and its user account in sync. * Creates a new user account if an email address is set (if not already existing). * Disable user accounts if an email has been deleted. * Change usernames if an email has changed. * Make sure used user accounts have the right role. * Make sure used user accounts are activated. * Make sure the password is changed if activated or disabled. Optional users_cache parameter allows to pre-fetch the users to avoid N+1 queries. """ old_email = item.email_primary users = UserCollection(self.session) if users_cache is not None: old_user = ( users_cache.get(old_email.lower()) if old_email else None ) new_user = ( users_cache.get(new_email.lower()) if new_email else None ) else: old_user = users.by_username(old_email) if old_email else None new_user = users.by_username(new_email) if new_email else None create = False enable = None disable = [] if not new_email: # email has been unset: disable obsolete users disable.extend([old_user, new_user]) else: if new_email == old_email: # email has not changed, old_user == new_user if not old_user: create = True else: enable = old_user else: # email has changed: ensure user exist if old_user and new_user: disable.append(old_user) enable = new_user elif not old_user and not new_user: create = True else: enable = old_user if old_user else new_user if create: assert new_email is not None role = ( 'commission_president' if self._is_current_commission_president(item) else 'parliamentarian' ) log.info(f'Creating user {new_email} with role {role}') # NOTE: Explicitly mark them as inactive *first*. Only in the SSO # login via on_ensure_user callback we finally set active to True. new_user_obj = users.add( new_email, random_password(16), role=role, realname=item.title, active=False, ) if users_cache is not None: users_cache[new_email.lower()] = new_user_obj if enable: if enable.role == 'admin': return role = ( 'commission_president' if self._is_current_commission_president(item) else 'parliamentarian' ) saml_sources = {'saml2', 'ldap'} corrections = { 'username': new_email, 'role': role, 'active': True, **( {} if enable.source in saml_sources else {'source': None, 'source_id': None} ), } corrections = { attribute: value for attribute, value in corrections.items() if getattr(enable, attribute) != value } if corrections: log.info( f'Correcting user {enable.username} to {corrections}' ) for attribute, value in corrections.items(): setattr(enable, attribute, value) enable.logout_all_sessions(self.app) for user in disable: if user: log.info(f'Deactivating user {user.username}') user.active = False user.logout_all_sessions(self.app)
[docs] def sync_user_accounts(self) -> dict[str, Any]: """Sync user accounts for all parliamentarians. Groups by email, picks one representative per email to avoid role conflicts. Prioritizes commission presidents. Returns dict with 'synced', 'skipped', and 'created' (list of new user emails). """ parliamentarians = ( self.query() .options(selectinload(self.model_class.commission_memberships)) .all() ) # We use username.lower() to avoid potential # onegov.user.errors.ExistingUserError users_cache = { user.username.lower(): user for user in UserCollection(self.session).query() } representatives = self._representatives_by_email(parliamentarians) synced = 0 skipped = 0 created: list[str] = [] for parl in representatives.values(): email = parl.email_primary assert email is not None try: validate_email(email, check_deliverability=False) except EmailNotValidError as e: log.warning( f'Skipping {parl.title} ' f'with invalid email {email}: {e}' ) skipped += 1 continue is_new = email.lower() not in users_cache self.update_user(parl, email, users_cache) if is_new: created.append(email) synced += 1 self.session.flush() return { 'synced': synced, 'skipped': skipped, 'created': created, }