Source code for search.dsl

from elasticsearch_dsl import Search as BaseSearch  # type:ignore
from elasticsearch_dsl.response import Hit as BaseHit  # type:ignore
from elasticsearch_dsl.response import Response as BaseResponse


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterator
    from onegov.search.indexer import TypeMappingRegistry
    from onegov.search.mixins import Searchable
    from sqlalchemy.orm import Query, Session
    from typing import type_check_only
    from typing import Self


[docs] def type_from_hit(hit: BaseHit) -> str: return hit.meta.index.split('-')[-2]
[docs] class Response(BaseResponse): """ Extends the default response (list of results) with additional methods to query the SQLAlchemy models behind the results. """ @classmethod
[docs] def bind( cls, session: 'Session | None', mappings: 'TypeMappingRegistry | None', explain: bool ) -> type['BoundResponse']: class BoundResponse(cls): # type:ignore[valid-type,misc] pass BoundResponse.session = session BoundResponse.mappings = mappings BoundResponse.explain = explain return BoundResponse
[docs] def hits_by_type(self, type: str) -> 'Iterator[BaseHit]': for hit in self.hits: if type_from_hit(hit) == type: yield hit
[docs] def query(self, type: str) -> 'Query[Any] | None': """ Returns an SQLAlchemy query for the given type. You must provide a type, because a query can't consist of multiple unrelated tables. If no results match the type, None is returned. """ hits = list(self.hits_by_type(type)) if not hits: return None model = self.mappings[type].model query = self.session.query(model) model_ids = (h.meta.id for h in hits) query = query.filter(getattr(model, model.es_id).in_(model_ids)) return query
[docs] def load(self) -> list[Any]: """ Loads all results by querying the SQLAlchemy session in the order they were returned by elasticsearch. Note that the resulting lists may include None values, since we are might get elasticsearch results for which we do not have a model on the database (the data is then out of sync). """ positions = {} types = set() # put the types into buckets and store the original position... for ix, hit in enumerate(self.hits): type = type_from_hit(hit) positions[(type, str(hit.meta.id))] = ix types.add(type) results: list[Any] = [None] * len(positions) # ...so we can query the database once per type and not once per result # this has the potential of resulting in fewer queries for type in types: for result in self.query(type): # type:ignore[union-attr] object_id = str(getattr(result, result.es_id)) ix = positions[(type, object_id)] if self.explain: ex = self.hits[ix].meta.explanation result.explanation = { 'raw': ex.__dict__, 'score': self.hits[ix].meta.score, 'term-frequency': explanation_value( ex, 'termFreq' ), 'inverse-document-frequency': explanation_value( ex, 'idf' ), 'field-norm': explanation_value( ex, 'fieldNorm' ) } if ix < len(results): results[ix] = result return results
[docs] def explanation_value(explanation: Any, text: str) -> dict[str, Any] | None: """ Gets the value from the explanation for descriptions starting with the given text. """ if explanation.description.startswith(text): return { 'description': explanation.description, 'value': explanation.value } for detail in getattr(explanation, 'details', []): result = explanation_value(detail, text) if result: return result return None
[docs] class Hit(BaseHit): """ Extends a single result with additional methods to query the SQLAlchemy models behind the results. """ @classmethod
[docs] def bind( cls, model: type['Searchable'] | None, session: 'Session | None' ) -> type['BoundHit']: class BoundHit(cls): # type:ignore[valid-type,misc] pass BoundHit.model = model BoundHit.session = session return BoundHit
[docs] def query(self) -> 'Query[Any]': """ Returns the SQLAlchemy query for this result. """ query = self.session.query(self.model) model_id = getattr(self.model, self.model.es_id) query = query.filter(model_id == self.meta.id) return query
[docs] def load(self) -> Any: """ Loads this result from the SQLAlchemy session. """ return self.query().one()
if TYPE_CHECKING: @type_check_only
[docs] class BoundResponse(Response):
[docs] session: Session | None
[docs] mappings: TypeMappingRegistry | None
[docs] explain: bool
@type_check_only class BoundHit(Hit): model: type[Searchable] | None session: Session | None