Source code for winterthur.directory_search_widgets

from __future__ import annotations

from functools import cached_property
from itertools import islice
from onegov.core.templates import render_macro
from onegov.directory import DirectoryEntry
from onegov.form import as_internal_id
from onegov.org.models.directory import ExtendedDirectoryEntryCollection
from onegov.org.models.search import Search
from onegov.search.search_index import SearchIndex
from onegov.winterthur.app import WinterthurApp
from sqlalchemy import func
from sqlalchemy import exc
from sqlalchemy.dialects.postgresql import array


from typing import ClassVar, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from markupsafe import Markup
    from onegov.org.layout import DefaultLayout
    from onegov.org.models import ExtendedDirectory
    from onegov.winterthur.request import WinterthurRequest
    from sqlalchemy.orm import Query
    from uuid import UUID


[docs] def lines(value: str | tuple[str, ...] | list[str]) -> Iterator[str]: if isinstance(value, (tuple, list)): yield from value yield from str(value).split('\n')
@WinterthurApp.directory_search_widget('inline')
[docs] class InlineDirectorySearch:
[docs] name: ClassVar[Literal['inline']]
def __init__( self, request: WinterthurRequest, directory: ExtendedDirectory, search_query: dict[str, str] | None ) -> None:
[docs] self.app = request.app
[docs] self.request = request
[docs] self.directory = directory
[docs] self.search_query = search_query
@cached_property
[docs] def term(self) -> str | None: return (self.search_query or {}).get('term', None)
@cached_property
[docs] def searchable(self) -> tuple[str, ...]: return tuple(self.directory.configuration.searchable or ())
@cached_property
[docs] def entry_ids(self) -> tuple[UUID, ...]: if not self.term: return () search = Search( self.request, query=self.term, page=0 ) query = search.generic_search().join( DirectoryEntry, (SearchIndex.owner_id_uuid == DirectoryEntry.id) & (DirectoryEntry.directory_id == self.directory.id) ).limit(100) # TODO: We may be able to get rid of this limit try: return tuple( entry_id for entry_id, in query.with_entities( SearchIndex.owner_id_uuid ) ) except exc.InternalError: self.request.session.rollback() return ()
[docs] def html(self, layout: DefaultLayout) -> Markup: return render_macro(layout.macros['inline_search'], self.request, { 'term': self.term, 'directory': self.directory, 'title': self.directory.title, 'action': self.request.class_link( ExtendedDirectoryEntryCollection, variables={ 'directory_name': self.directory.name, 'search': self.name } ) })
[docs] def fragments( self, entry: DirectoryEntry ) -> Iterator[tuple[str, tuple[str, ...]]]: assert self.term is not None for name in self.searchable: key = as_internal_id(name) fragment_iter = ( f'{name}: {line.lstrip(" -")}' for line in lines(entry.values[key]) if self.term in line ) fragments = tuple(islice(fragment_iter, 3)) if fragments: yield name, fragments
# FIXME: I think these fragments can contain Markup, so for now # we are being potentially unsafe here. The documentation # is unclear about what we get back here, but we used to # render this with the structure keyword
[docs] def lead( self, layout: DefaultLayout, entry: DirectoryEntry ) -> str | None: if not self.term: return None # FIXME: Implement result highlighting using Postgres return None
[docs] def adapt[T](self, query: Query[T]) -> Query[T]: if not self.term: return query ids = self.entry_ids query = query.filter(DirectoryEntry.id.in_(ids)) if ids: query = query.order_by(False) query = query.order_by( func.array_position( array(ids), DirectoryEntry.id ) ) return query