from __future__ import annotations
import morepath
from morepath.request import Response
from sqlalchemy import and_, func, type_coerce
from sqlalchemy.orm import undefer
from onegov.core.orm.types import JSON
from onegov.core.security import Public, Private
from onegov.org import _, OrgApp
from onegov.org.elements import Link
from onegov.org.forms import PersonForm
from onegov.org.layout import PersonLayout, PersonCollectionLayout
from onegov.org.models import AtoZ, Topic
from onegov.people import Person, PersonCollection
from onegov.search import SearchIndex
from onegov.search.utils import language_from_locale
from markupsafe import Markup
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterable
from onegov.core.types import RenderData
from onegov.org.request import OrgRequest
from sqlalchemy.orm import Query
from webob import Response as BaseResponse
[docs]
def organisations_as_dict(person: Person) -> dict[str, list[str]]:
current_org: str = ''
org_dict: dict[str, list[str]] = {}
for org in person.content.get('organisations_multiple', []):
if org.startswith('-'):
sub_org = org.lstrip('-')
if current_org:
org_dict.setdefault(current_org, []).append(sub_org)
else:
current_org = org
if current_org not in org_dict:
org_dict[current_org] = []
return org_dict
[docs]
def get_top_level_organisations(
data: list[dict[str, list[str]] | str]) -> list[str]:
top_level_organisations: list[str] = []
for item in data:
if isinstance(item, dict):
top_level_organisations.extend(item.keys())
elif isinstance(item, str):
top_level_organisations.append(item)
return top_level_organisations
[docs]
def get_sub_organisations(
data: list[dict[str, list[str]] | str]) -> list[str]:
sub_organisations: set[str] = set()
for item in data:
if isinstance(item, dict):
for sub_orgs in item.values():
sub_organisations.update(sub_orgs)
return list(sub_organisations)
[docs]
def people_by_organisation(
query: Query[Person],
org: str | None,
sub_org: str | None,
) -> Query[Person]:
if org:
query = query.filter(
func.jsonb_contains(
Person.content['organisations_multiple'],
type_coerce([org], JSON)
)
)
if sub_org:
query = query.filter(
func.jsonb_contains(
Person.content['organisations_multiple'],
type_coerce([f'-{sub_org}'], JSON)
)
)
return query
[docs]
def people_by_search_term(
query: Query[Person],
search_term: str | None,
language: str | None = None,
) -> Query[Person]:
if not search_term:
return query
language = language_from_locale(language)
return query.join(
SearchIndex,
and_(
SearchIndex.owner_id_uuid == Person.id,
SearchIndex.owner_type == 'Person'
)
).filter(SearchIndex.data_vector.op('@@')(
func.websearch_to_tsquery(language, search_term)
))
@OrgApp.html(model=PersonCollection, template='people.pt', permission=Public)
[docs]
def view_people(
self: PersonCollection,
request: OrgRequest,
layout: PersonCollectionLayout | None = None
) -> RenderData:
_org = request.params.get('organisation')
selected_org: str | None = _org if isinstance(_org, str) and _org else None
_sub_org = request.params.get('sub_organisation')
selected_sub_org: str | None = (
_sub_org if isinstance(_sub_org, str) and _sub_org else None)
selected_search: str | None = None
if request.app.fts_search_enabled:
_search = request.params.get('search')
selected_search = (
_search if isinstance(_search, str) and _search else None
)
top_orgs = get_top_level_organisations(
request.app.org.organisation_hierarchy or [])
sub_orgs = get_sub_organisations(
request.app.org.organisation_hierarchy or [])
if selected_org:
index = top_orgs.index(selected_org)
top_org = request.app.org.organisation_hierarchy[index]
if isinstance(top_org, dict):
sub_orgs = top_org[selected_org]
if selected_sub_org and selected_sub_org not in sub_orgs:
sub_orgs.append(selected_sub_org)
query = self.query().order_by(Person.last_name, Person.first_name)
query = people_by_organisation(query, selected_org, selected_sub_org)
query = people_by_search_term(query, selected_search, request.locale)
people = query.all()
class AtoZPeople(AtoZ[Person]):
def get_title(self, item: Person) -> str:
return item.title
def get_items(self) -> list[Person]:
return people
return {
'title': _('People'),
'count': len(people),
'people': AtoZPeople(request).get_items_by_letter().items(),
'layout': layout or PersonCollectionLayout(self, request),
'organisations_as_dict': organisations_as_dict,
'organisations': sorted(top_orgs),
'sub_organisations': sorted(sub_orgs),
'selected_organisation': selected_org,
'selected_sub_organisation': selected_sub_org,
'selected_search': selected_search,
}
@OrgApp.html(model=Person, template='person.pt', permission=Public)
[docs]
def view_person(
self: Person,
request: OrgRequest,
layout: PersonLayout | None = None
) -> RenderData:
query = request.session.query(Topic)
query = query.options(undefer(Topic.content))
org_to_func = person_functions_by_organization(self, query, request)
return {
'title': self.title,
'person': self,
'layout': layout or PersonLayout(self, request),
'organization_to_function': org_to_func,
'organisations_as_dict': organisations_as_dict,
}
[docs]
def person_functions_by_organization(
subject_person: Person,
topics: Iterable[Topic],
request: OrgRequest
) -> Iterable[Markup]:
""" Collects 1:1 mappings of all context-specific functions and
organizations for a person. Organizations are pages where `subject_person`
is listed as a person.
Returns an Iterable of Markup in the form:
- Organization 1: Function A
- Organization 2: Function B
- ...
This is not necessarily the same as person.function!
"""
sorted_topics = sorted(
(
(func, topic)
for topic in topics
for pers in (topic.people or [])
if (
pers.id == subject_person.id
and (func := getattr(pers, 'context_specific_function', None))
is not None
and getattr(pers, 'display_function_in_person_directory',
False) is not False
)
),
key=lambda pair: pair[1].title,
)
if not sorted_topics:
return ()
return (
Markup('<span><a href="{url}">{title}</a>: {function}</span>').format(
url=request.link(topic),
title=topic.title,
function=function
)
for function, topic in sorted_topics
)
@OrgApp.form(
model=PersonCollection,
name='new',
template='form.pt',
permission=Private,
form=PersonForm
)
[docs]
def handle_new_person(
self: PersonCollection,
request: OrgRequest,
form: PersonForm,
layout: PersonCollectionLayout | None = None
) -> RenderData | BaseResponse:
if form.submitted(request):
person = self.add(**form.get_useful_data())
request.success(_('Added a new person'))
return morepath.redirect(request.link(person))
layout = layout or PersonCollectionLayout(self, request)
layout.breadcrumbs.append(Link(_('New'), '#'))
layout.include_editor()
layout.edit_mode = True
return {
'layout': layout,
'title': _('New person'),
'form': form
}
@OrgApp.form(
model=Person,
name='edit',
template='form.pt',
permission=Private,
form=PersonForm
)
[docs]
def handle_edit_person(
self: Person,
request: OrgRequest,
form: PersonForm,
layout: PersonLayout | None = None
) -> RenderData | BaseResponse:
if form.submitted(request):
form.populate_obj(self)
request.success(_('Your changes were saved'))
return morepath.redirect(request.link(self))
else:
form.process(obj=self)
layout = layout or PersonLayout(self, request)
layout.breadcrumbs.append(Link(_('Edit'), '#'))
layout.include_editor()
layout.edit_mode = True
return {
'layout': layout,
'title': self.title,
'form': form
}
@OrgApp.view(model=Person, request_method='DELETE', permission=Private)
[docs]
def handle_delete_person(self: Person, request: OrgRequest) -> None:
request.assert_valid_csrf_token()
PersonCollection(request.session).delete(self)
@OrgApp.view(model=Person, name='vcard', permission=Public)
[docs]
def vcard_export_person(self: Person, request: OrgRequest) -> Response:
""" Returns the persons vCard. """
exclude = [*request.app.org.excluded_person_fields(request), 'notes']
return Response(
self.vcard(exclude),
content_type='text/vcard',
content_disposition='inline; filename=card.vcf'
)