from __future__ import annotations
from functools import cached_property
from itertools import islice
from onegov.core.templates import render_macro
from onegov.directory import DirectoryEntry
from onegov.form import as_internal_id
from onegov.org.models.directory import ExtendedDirectoryEntryCollection
from onegov.org.models.search import Search
from onegov.search.search_index import SearchIndex
from onegov.winterthur.app import WinterthurApp
from sqlalchemy import func
from sqlalchemy import exc
from sqlalchemy.dialects.postgresql import array
from typing import ClassVar, Literal, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Iterator
from markupsafe import Markup
from onegov.org.layout import DefaultLayout
from onegov.org.models import ExtendedDirectory
from onegov.winterthur.request import WinterthurRequest
from sqlalchemy.orm import Query
from uuid import UUID
[docs]
def lines(value: str | tuple[str, ...] | list[str]) -> Iterator[str]:
if isinstance(value, (tuple, list)):
yield from value
yield from str(value).split('\n')
@WinterthurApp.directory_search_widget('inline')
[docs]
class InlineDirectorySearch:
[docs]
name: ClassVar[Literal['inline']]
def __init__(
self,
request: WinterthurRequest,
directory: ExtendedDirectory,
search_query: dict[str, str] | None
) -> None:
[docs]
self.directory = directory
[docs]
self.search_query = search_query
@cached_property
[docs]
def term(self) -> str | None:
return (self.search_query or {}).get('term', None)
@cached_property
[docs]
def searchable(self) -> tuple[str, ...]:
return tuple(self.directory.configuration.searchable or ())
@cached_property
[docs]
def entry_ids(self) -> tuple[UUID, ...]:
if not self.term:
return ()
search = Search(
self.request,
query=self.term,
page=0
)
query = search.generic_search().join(
DirectoryEntry,
(SearchIndex.owner_id_uuid == DirectoryEntry.id)
& (DirectoryEntry.directory_id == self.directory.id)
).limit(100) # TODO: We may be able to get rid of this limit
try:
return tuple(
entry_id
for entry_id, in query.with_entities(
SearchIndex.owner_id_uuid
)
)
except exc.InternalError:
self.request.session.rollback()
return ()
[docs]
def html(self, layout: DefaultLayout) -> Markup:
return render_macro(layout.macros['inline_search'],
self.request, {
'term': self.term,
'directory': self.directory,
'title': self.directory.title,
'action': self.request.class_link(
ExtendedDirectoryEntryCollection,
variables={
'directory_name': self.directory.name,
'search': self.name
}
)
})
[docs]
def fragments(
self,
entry: DirectoryEntry
) -> Iterator[tuple[str, tuple[str, ...]]]:
assert self.term is not None
for name in self.searchable:
key = as_internal_id(name)
fragment_iter = (
f'{name}: {line.lstrip(" -")}'
for line in lines(entry.values[key])
if self.term in line
)
fragments = tuple(islice(fragment_iter, 3))
if fragments:
yield name, fragments
# FIXME: I think these fragments can contain Markup, so for now
# we are being potentially unsafe here. The documentation
# is unclear about what we get back here, but we used to
# render this with the structure keyword
[docs]
def lead(
self,
layout: DefaultLayout,
entry: DirectoryEntry
) -> str | None:
if not self.term:
return None
# FIXME: Implement result highlighting using Postgres
return None
[docs]
def adapt[T](self, query: Query[T]) -> Query[T]:
if not self.term:
return query
ids = self.entry_ids
query = query.filter(DirectoryEntry.id.in_(ids))
if ids:
query = query.order_by(False)
query = query.order_by(
func.array_position(
array(ids),
DirectoryEntry.id
)
)
return query