Source code for agency.collections.agencies

from onegov.agency.models import ExtendedAgency
from onegov.agency.utils import filter_modified_or_created
from onegov.core.collection import GenericCollection, Pagination
from onegov.core.orm.abstract import AdjacencyListCollection
from sqlalchemy import or_, func
from sqlalchemy.orm import joinedload
from sqlalchemy.orm import undefer


from typing import Literal, Self
from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Collection
    from sqlalchemy.orm import Query
    from sqlalchemy.orm import Session
    from typing import TypedDict
    from typing_extensions import Unpack

[docs] class FilterParams(TypedDict, total=False):
[docs] title: str | None
[docs] updated_gt: str | None
[docs] updated_ge: str | None
[docs] updated_eq: str | None
[docs] updated_le: str | None
[docs] updated_lt: str | None
[docs] class ExtendedAgencyCollection(AdjacencyListCollection[ExtendedAgency]):
[docs] __listclass__ = ExtendedAgency
# Used to create link for root pdf based on timestamp def __init__( self, session: 'Session', # FIXME: These really should be float/int, we just need to add # convertes to the path configuration... root_pdf_modified: str | None = None, browse: str | None = None ) -> None: super().__init__(session)
[docs] self.root_pdf_modified = root_pdf_modified
[docs] self.browse = browse
[docs] class PaginatedAgencyCollection( GenericCollection[ExtendedAgency], Pagination[ExtendedAgency] ): def __init__( self, session: 'Session', page: int = 0, parent: str | Literal[False] | None = None, exclude_hidden: bool = True, joinedload: 'Collection[str] | None' = None, title: str | None = None, updated_gt: str | None = None, updated_ge: str | None = None, updated_eq: str | None = None, updated_le: str | None = None, updated_lt: str | None = None, undefer: 'Collection[str] | None' = None ) -> None: super().__init__(session)
[docs] self.page = page
# filter keywords
[docs] self.parent = parent
[docs] self.title = title
[docs] self.updated_gt = updated_gt
[docs] self.updated_ge = updated_ge
[docs] self.updated_eq = updated_eq
[docs] self.updated_le = updated_le
[docs] self.updated_lt = updated_lt
# end filter keywords
[docs] self.exclude_hidden = exclude_hidden
[docs] self.joinedload = joinedload or []
[docs] self.undefer = undefer or []
@property
[docs] def model_class(self) -> type[ExtendedAgency]: return ExtendedAgency
[docs] def __eq__(self, other: object) -> bool: return ( isinstance(other, self.__class__) and other.page == self.page and other.parent == self.parent )
[docs] def subset(self) -> 'Query[ExtendedAgency]': return self.query()
@property
[docs] def page_index(self) -> int: return self.page
[docs] def page_by_index(self, index: int) -> Self: return self.__class__( self.session, page=index, title=self.title, updated_gt=self.updated_gt, updated_ge=self.updated_ge, updated_eq=self.updated_eq, updated_le=self.updated_le, updated_lt=self.updated_lt, )
[docs] def for_filter(self, **kwargs: 'Unpack[FilterParams]') -> Self: return self.__class__( session=self.session, title=kwargs.get('title', self.title), updated_gt=kwargs.get('updated_gt', self.updated_gt), updated_ge=kwargs.get('updated_ge', self.updated_ge), updated_eq=kwargs.get('updated_eq', self.updated_eq), updated_le=kwargs.get('updated_le', self.updated_le), updated_lt=kwargs.get('updated_lt', self.updated_lt), )
[docs] def query(self) -> 'Query[ExtendedAgency]': query = super().query() for attribute in self.undefer: query = query.options( undefer(getattr(ExtendedAgency, attribute)) ) for attribute in self.joinedload: query = query.options( joinedload(getattr(ExtendedAgency, attribute)) ) if self.exclude_hidden: query = query.filter( or_( ExtendedAgency.meta['access'] == 'public', ExtendedAgency.meta['access'] == None, ), ExtendedAgency.published.is_(True) ) if self.parent is False: query = query.filter(ExtendedAgency.parent_id == None) elif self.parent: query = query.filter(ExtendedAgency.parent_id == self.parent) if self.title: # if multiple words in search filter for title we 'or' link # them using ilike query = query.filter(or_(*( func.lower( func.unaccent(ExtendedAgency.title) ).ilike(f'%{element}%') for element in self.title.split() ))) if self.updated_gt: query = filter_modified_or_created(query, '>', self.updated_gt, ExtendedAgency) if self.updated_ge: query = filter_modified_or_created(query, '>=', self.updated_ge, ExtendedAgency) if self.updated_eq: query = filter_modified_or_created(query, '==', self.updated_eq, ExtendedAgency) if self.updated_le: query = filter_modified_or_created(query, '<=', self.updated_le, ExtendedAgency) if self.updated_lt: query = filter_modified_or_created(query, '<', self.updated_lt, ExtendedAgency) return query