Source code for winterthur.directory_search_widgets

from __future__ import annotations

from functools import cached_property
from itertools import islice
from onegov.core.templates import render_macro
from onegov.directory import DirectoryEntry
from onegov.form import as_internal_id
from onegov.org.models.directory import ExtendedDirectoryEntryCollection
from onegov.org.models.search import Search
from onegov.search.search_index import SearchIndex
from onegov.winterthur.app import WinterthurApp
from sqlalchemy import func
from sqlalchemy.dialects.postgresql import array


from typing import ClassVar, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from markupsafe import Markup
    from onegov.org.layout import DefaultLayout
    from onegov.org.models import ExtendedDirectory
    from onegov.winterthur.request import WinterthurRequest
    from sqlalchemy.orm import Query
    from typing import TypeVar
    from uuid import UUID

[docs] T = TypeVar('T')
[docs] def lines(value: str | tuple[str, ...] | list[str]) -> Iterator[str]: if isinstance(value, (tuple, list)): yield from value yield from str(value).split('\n')
@WinterthurApp.directory_search_widget('inline')
[docs] class InlineDirectorySearch:
[docs] name: ClassVar[Literal['inline']]
def __init__( self, request: WinterthurRequest, directory: ExtendedDirectory, search_query: dict[str, str] | None ) -> None:
[docs] self.app = request.app
[docs] self.request = request
[docs] self.directory = directory
[docs] self.search_query = search_query
@cached_property
[docs] def term(self) -> str | None: return (self.search_query or {}).get('term', None)
@cached_property
[docs] def searchable(self) -> tuple[str, ...]: return tuple(self.directory.configuration.searchable or ())
@cached_property
[docs] def entry_ids(self) -> tuple[UUID, ...]: if not self.term: return () search = Search( self.request, query=self.term, page=0 ) query = search.generic_search().join( DirectoryEntry, (SearchIndex.owner_id_uuid == DirectoryEntry.id) & (DirectoryEntry.directory_id == self.directory.id) ).limit(100) # TODO: We may be able to get rid of this limit return tuple( entry_id for entry_id, in query.with_entities( SearchIndex.owner_id_uuid ) )
[docs] def html(self, layout: DefaultLayout) -> Markup: return render_macro(layout.macros['inline_search'], self.request, { 'term': self.term, 'directory': self.directory, 'title': self.directory.title, 'action': self.request.class_link( ExtendedDirectoryEntryCollection, variables={ 'directory_name': self.directory.name, 'search': self.name } ) })
[docs] def fragments( self, entry: DirectoryEntry ) -> Iterator[tuple[str, tuple[str, ...]]]: assert self.term is not None for name in self.searchable: key = as_internal_id(name) fragment_iter = ( f'{name}: {line.lstrip(" -")}' for line in lines(entry.values[key]) if self.term in line ) fragments = tuple(islice(fragment_iter, 3)) if fragments: yield name, fragments
# FIXME: I think these fragments can contain Markup, so for now # we are being potentially unsafe here. The documentation # is unclear about what we get back here, but we used to # render this with the structure keyword
[docs] def lead( self, layout: DefaultLayout, entry: DirectoryEntry ) -> str | None: if not self.term: return None # FIXME: Implement result highlighting using Postgres return None
[docs] def adapt(self, query: Query[T]) -> Query[T]: if not self.term: return query ids = self.entry_ids query = query.filter(DirectoryEntry.id.in_(ids)) if ids: query = query.order_by(False) query = query.order_by( func.array_position( array(ids), # type:ignore[call-overload] DirectoryEntry.id ) ) return query