from __future__ import annotations
from markupsafe import Markup
from onegov.core.crypto import random_token
from onegov.core.orm.abstract import AdjacencyList
from onegov.core.orm.abstract import associated
from onegov.core.orm.mixins import ContentMixin
from onegov.core.orm.mixins import TimestampMixin
from onegov.core.orm.mixins import UTCPublicationMixin
from onegov.core.utils import normalize_for_url
from onegov.file import File
from onegov.file.utils import as_fileintent
from onegov.file.utils import content_type_from_fileobj
from onegov.file.utils import extension_for_content_type
from onegov.gis import CoordinatesMixin
from onegov.people.models.membership import AgencyMembership
from onegov.search import ORMSearchable
from decimal import Decimal
from sqlalchemy import func
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import object_session
from sqlalchemy.orm import relationship
from sqlalchemy.orm import DynamicMapped
from sqlalchemy.orm import Mapped
from translationstring import TranslationString
from typing import Any
from typing import IO
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from _typeshed import SupportsRichComparison
from collections.abc import Callable
from collections.abc import Iterator
from depot.io.interfaces import StoredFile
from typing import TypeAlias
from uuid import UUID
[docs]
AgencySortKey: TypeAlias = Callable[['Agency'], SupportsRichComparison]
AgencyMembershipSortKey: TypeAlias = Callable[
[AgencyMembership],
SupportsRichComparison
]
[docs]
class AgencyOrganigram(File):
[docs]
__mapper_args__ = {'polymorphic_identity': 'agency_organigram'}
[docs]
class Agency(AdjacencyList, ContentMixin, TimestampMixin, ORMSearchable,
UTCPublicationMixin, CoordinatesMixin):
""" An agency (organization) containing people through memberships. """
[docs]
__tablename__ = 'agencies'
#: the type of the item, this can be used to create custom polymorphic
#: subclasses of this class. See
#: `<https://docs.sqlalchemy.org/en/improve_toc/\
#: orm/extensions/declarative/inheritance.html>`_.
[docs]
type: Mapped[str] = mapped_column(default=lambda: 'generic')
[docs]
__mapper_args__ = {
'polymorphic_on': type,
'polymorphic_identity': 'generic',
}
# HACK: We don't want to set up translations in this module for this single
# string, we know we already have a translation in a different domain
# so we just manually specify it for now.
[docs]
fts_type_title = TranslationString('Agencies', domain='onegov.agency')
[docs]
fts_title_property = 'title'
[docs]
fts_properties = {
'title': {'type': 'text', 'weight': 'A'},
'description': {'type': 'localized', 'weight': 'B'},
'portrait': {'type': 'localized', 'weight': 'B'},
}
# NOTE: When an agency was last changed should not influence how
# relevant it is in the search results
@property
[docs]
def fts_last_change(self) -> None:
return None
#: a short description of the agency
[docs]
description: Mapped[str | None]
#: describes the agency
[docs]
portrait: Mapped[Markup | None]
#: location address (street name and number) of agency
[docs]
location_address: Mapped[str | None]
#: location code and city of agency
[docs]
location_code_city: Mapped[str | None]
#: postal address (street name and number) of agency
[docs]
postal_address: Mapped[str | None]
#: postal code and city of agency
[docs]
postal_code_city: Mapped[str | None]
#: the phone number of agency
[docs]
phone: Mapped[str | None]
#: the direct phone number of agency
[docs]
phone_direct: Mapped[str | None]
#: the email of agency
[docs]
email: Mapped[str | None]
#: the website related to agency
[docs]
website: Mapped[str | None]
#: opening hours of agency
[docs]
opening_hours: Mapped[str | None]
#: a reference to the organization chart
[docs]
organigram = associated(AgencyOrganigram, 'organigram', 'one-to-one')
[docs]
memberships: DynamicMapped[AgencyMembership] = relationship(
back_populates='agency',
cascade='all, delete-orphan',
order_by='AgencyMembership.order_within_agency'
)
if TYPE_CHECKING:
# override the attributes from AdjacencyList
[docs]
parent: Mapped[Agency | None]
children: Mapped[list[Agency]]
@property
def root(self) -> Agency: ...
@property
def ancestors(self) -> Iterator[Agency]: ...
@property
[docs]
def organigram_file(self) -> StoredFile | None:
""" Returns the file-like content of the organigram. """
try:
return self.organigram.reference.file if self.organigram else None
except (OSError, Exception):
return None
@organigram_file.setter
def organigram_file(self, value: IO[bytes]) -> None:
""" Sets the organigram, expects a file-like value. """
assert value is not None
filename = 'organigram-{}.{}'.format(
normalize_for_url(self.title),
extension_for_content_type(content_type_from_fileobj(value))
)
if self.organigram:
self.organigram.reference = as_fileintent(value, filename)
self.organigram.name = filename
else:
organigram = AgencyOrganigram(id=random_token())
organigram.reference = as_fileintent(value, filename)
organigram.name = filename
self.organigram = organigram
[docs]
def add_person(
self,
person_id: UUID,
title: str,
*,
order_within_agency: int = 2 ** 16,
# FIXME: Specify the arguments supported by AgencyMembership
**kwargs: Any
) -> AgencyMembership:
""" Appends a person to the agency with the given title. """
session = object_session(self)
assert session is not None
order_within_person = session.query(
func.coalesce(func.max(AgencyMembership.order_within_person), -1)
).filter_by(person_id=person_id).scalar() + 1
next_order_within_agency = session.query(
func.coalesce(func.max(AgencyMembership.order_within_agency), -1)
).filter_by(agency_id=self.id).scalar() + 1
if order_within_agency > next_order_within_agency:
# just use the next available number
order_within_agency = next_order_within_agency
else:
# move everyone after the desired position up by one
for membership in self.memberships:
if membership.order_within_agency >= order_within_agency:
membership.order_within_agency += 1
membership = AgencyMembership(
person_id=person_id,
title=title,
order_within_agency=order_within_agency,
order_within_person=order_within_person,
**kwargs
)
self.memberships.append(membership)
session.flush()
return membership
[docs]
def sort_children(
self,
sortkey: AgencySortKey | None = None
) -> None:
""" Sorts the suborganizations.
Sorts by the agency title by default.
"""
if sortkey is None:
def sortkey(agency: Agency) -> str:
return normalize_for_url(agency.title)
children = sorted(self.children, key=sortkey)
for order, child in enumerate(children):
child.order = Decimal(order)
[docs]
def sort_relationships(
self,
sortkey: AgencyMembershipSortKey | None = None
) -> None:
""" Sorts the relationships.
Sorts by last name, first name.by default.
"""
if sortkey is None:
def sortkey(membership: AgencyMembership) -> str:
return normalize_for_url(membership.person.title)
memberships = sorted(self.memberships, key=sortkey)
for order, membership in enumerate(memberships):
membership.order_within_agency = order