Source code for org.views.people

from __future__ import annotations

import morepath
from morepath.request import Response
from sqlalchemy import and_, func, type_coerce
from sqlalchemy.orm import undefer
from onegov.core.orm.types import JSON
from onegov.core.security import Public, Private
from onegov.org import _, OrgApp
from onegov.org.elements import Link
from onegov.org.forms import PersonForm
from onegov.org.layout import PersonLayout, PersonCollectionLayout
from onegov.org.models import AtoZ, Topic
from onegov.people import Person, PersonCollection
from onegov.search import SearchIndex
from onegov.search.utils import language_from_locale
from markupsafe import Markup


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterable
    from onegov.core.types import RenderData
    from onegov.org.request import OrgRequest
    from sqlalchemy.orm import Query
    from webob import Response as BaseResponse


[docs] def organisations_as_dict(person: Person) -> dict[str, list[str]]: current_org: str = '' org_dict: dict[str, list[str]] = {} for org in person.content.get('organisations_multiple', []): if org.startswith('-'): sub_org = org.lstrip('-') if current_org: org_dict.setdefault(current_org, []).append(sub_org) else: current_org = org if current_org not in org_dict: org_dict[current_org] = [] return org_dict
[docs] def get_top_level_organisations( data: list[dict[str, list[str]] | str]) -> list[str]: top_level_organisations: list[str] = [] for item in data: if isinstance(item, dict): top_level_organisations.extend(item.keys()) elif isinstance(item, str): top_level_organisations.append(item) return top_level_organisations
[docs] def get_sub_organisations( data: list[dict[str, list[str]] | str]) -> list[str]: sub_organisations: set[str] = set() for item in data: if isinstance(item, dict): for sub_orgs in item.values(): sub_organisations.update(sub_orgs) return list(sub_organisations)
[docs] def people_by_organisation( query: Query[Person], org: str | None, sub_org: str | None, ) -> Query[Person]: if org: query = query.filter( func.jsonb_contains( Person.content['organisations_multiple'], type_coerce([org], JSON) ) ) if sub_org: query = query.filter( func.jsonb_contains( Person.content['organisations_multiple'], type_coerce([f'-{sub_org}'], JSON) ) ) return query
[docs] def people_by_search_term( query: Query[Person], search_term: str | None, language: str | None = None, ) -> Query[Person]: if not search_term: return query language = language_from_locale(language) return query.join( SearchIndex, and_( SearchIndex.owner_id_uuid == Person.id, SearchIndex.owner_type == 'Person' ) ).filter(SearchIndex.data_vector.op('@@')( func.websearch_to_tsquery(language, search_term) ))
@OrgApp.html(model=PersonCollection, template='people.pt', permission=Public)
[docs] def view_people( self: PersonCollection, request: OrgRequest, layout: PersonCollectionLayout | None = None ) -> RenderData: _org = request.params.get('organisation') selected_org: str | None = _org if isinstance(_org, str) and _org else None _sub_org = request.params.get('sub_organisation') selected_sub_org: str | None = ( _sub_org if isinstance(_sub_org, str) and _sub_org else None) selected_search: str | None = None if request.app.fts_search_enabled: _search = request.params.get('search') selected_search = ( _search if isinstance(_search, str) and _search else None ) top_orgs = get_top_level_organisations( request.app.org.organisation_hierarchy or []) sub_orgs = get_sub_organisations( request.app.org.organisation_hierarchy or []) if selected_org: index = top_orgs.index(selected_org) top_org = request.app.org.organisation_hierarchy[index] if isinstance(top_org, dict): sub_orgs = top_org[selected_org] if selected_sub_org and selected_sub_org not in sub_orgs: sub_orgs.append(selected_sub_org) query = self.query().order_by(Person.last_name, Person.first_name) query = people_by_organisation(query, selected_org, selected_sub_org) query = people_by_search_term(query, selected_search, request.locale) people = query.all() class AtoZPeople(AtoZ[Person]): def get_title(self, item: Person) -> str: return item.title def get_items(self) -> list[Person]: return people return { 'title': _('People'), 'count': len(people), 'people': AtoZPeople(request).get_items_by_letter().items(), 'layout': layout or PersonCollectionLayout(self, request), 'organisations_as_dict': organisations_as_dict, 'organisations': sorted(top_orgs), 'sub_organisations': sorted(sub_orgs), 'selected_organisation': selected_org, 'selected_sub_organisation': selected_sub_org, 'selected_search': selected_search, }
@OrgApp.html(model=Person, template='person.pt', permission=Public)
[docs] def view_person( self: Person, request: OrgRequest, layout: PersonLayout | None = None ) -> RenderData: query = request.session.query(Topic) query = query.options(undefer(Topic.content)) org_to_func = person_functions_by_organization(self, query, request) return { 'title': self.title, 'person': self, 'layout': layout or PersonLayout(self, request), 'organization_to_function': org_to_func, 'organisations_as_dict': organisations_as_dict, }
[docs] def person_functions_by_organization( subject_person: Person, topics: Iterable[Topic], request: OrgRequest ) -> Iterable[Markup]: """ Collects 1:1 mappings of all context-specific functions and organizations for a person. Organizations are pages where `subject_person` is listed as a person. Returns an Iterable of Markup in the form: - Organization 1: Function A - Organization 2: Function B - ... This is not necessarily the same as person.function! """ sorted_topics = sorted( ( (func, topic) for topic in topics for pers in (topic.people or []) if ( pers.id == subject_person.id and (func := getattr(pers, 'context_specific_function', None)) is not None and getattr(pers, 'display_function_in_person_directory', False) is not False ) ), key=lambda pair: pair[1].title, ) if not sorted_topics: return () return ( Markup('<span><a href="{url}">{title}</a>: {function}</span>').format( url=request.link(topic), title=topic.title, function=function ) for function, topic in sorted_topics )
@OrgApp.form( model=PersonCollection, name='new', template='form.pt', permission=Private, form=PersonForm )
[docs] def handle_new_person( self: PersonCollection, request: OrgRequest, form: PersonForm, layout: PersonCollectionLayout | None = None ) -> RenderData | BaseResponse: if form.submitted(request): person = self.add(**form.get_useful_data()) request.success(_('Added a new person')) return morepath.redirect(request.link(person)) layout = layout or PersonCollectionLayout(self, request) layout.breadcrumbs.append(Link(_('New'), '#')) layout.include_editor() layout.edit_mode = True return { 'layout': layout, 'title': _('New person'), 'form': form }
@OrgApp.form( model=Person, name='edit', template='form.pt', permission=Private, form=PersonForm )
[docs] def handle_edit_person( self: Person, request: OrgRequest, form: PersonForm, layout: PersonLayout | None = None ) -> RenderData | BaseResponse: if form.submitted(request): form.populate_obj(self) request.success(_('Your changes were saved')) return morepath.redirect(request.link(self)) else: form.process(obj=self) layout = layout or PersonLayout(self, request) layout.breadcrumbs.append(Link(_('Edit'), '#')) layout.include_editor() layout.edit_mode = True return { 'layout': layout, 'title': self.title, 'form': form }
@OrgApp.view(model=Person, request_method='DELETE', permission=Private)
[docs] def handle_delete_person(self: Person, request: OrgRequest) -> None: request.assert_valid_csrf_token() PersonCollection(request.session).delete(self)
@OrgApp.view(model=Person, name='vcard', permission=Public)
[docs] def vcard_export_person(self: Person, request: OrgRequest) -> Response: """ Returns the persons vCard. """ exclude = [*request.app.org.excluded_person_fields(request), 'notes'] return Response( self.vcard(exclude), content_type='text/vcard', content_disposition='inline; filename=card.vcf' )