Source code for search.integration

from __future__ import annotations

import morepath

from concurrent.futures import ThreadPoolExecutor
from functools import cached_property
from more.transaction.main import transaction_tween_factory
from onegov.search import index_log, Searchable
from onegov.search.indexer import Indexer
from onegov.search.indexer import ORMEventTranslator
from onegov.search.indexer import TypeMappingRegistry
from onegov.search.utils import (
    apply_searchable_polymorphic_filter,
    get_polymorphic_base,
    language_from_locale,
    searchable_sqlalchemy_models,
)
from sqlalchemy.orm import undefer


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.core.orm import Base, SessionManager
    from onegov.core.request import CoreRequest
    from sqlalchemy.orm import Session
    from webob import Response


[docs] class SearchApp(morepath.App): """ Provides elasticsearch and postgres integration for :class:`onegov.core.framework.Framework` based applications. The application must be connected to a database. Usage:: from onegov.core import Framework class MyApp(Framework, ESIntegration): pass """ if TYPE_CHECKING: # forward declare required attributes
[docs] schema: str
session_manager: SessionManager @property def session(self) -> Callable[[], Session]: ... @property def has_database_connection(self) -> bool: ... @cached_property def locales(self) -> set[str]: ... @cached_property
[docs] def fts_languages(self) -> set[str]: return { language_from_locale(locale) for locale in self.locales } or {'simple'}
[docs] def indexable_base_models(self) -> set[type[Searchable | Base]]: return { get_polymorphic_base(model) for base in self.session_manager.bases for model in searchable_sqlalchemy_models(base) }
[docs] def perform_reindex(self, fail: bool = False) -> None: """ Re-indexes all content. This is a heavy operation and should be run with consideration. By default, all exceptions during reindex are silently ignored. """ if not self.fts_search_enabled: return schema = self.schema index_log.info(f'Indexing schema {schema}..') # psql delete table search_index self.fts_indexer.delete_search_index(schema) # have no queue limit for reindexing (that we're able to change # this here is a bit of a CPython implementation detail) - we can't # necessarily always rely on being able to change this property original_queue_size = self.fts_orm_events.queue.maxsize self.fts_orm_events.queue.maxsize = 0 def reindex_model(model: type[Base]) -> None: """ Load all database objects and index them. """ session = self.session() try: query = session.query(model).options(undefer('*')) query = apply_searchable_polymorphic_filter(query, model) for obj in query: self.fts_orm_events.index(schema, obj) # FIXME: Ideally we process the queue concurrently as well, # but it seems we're leaking connections that way, # we will have to tighten things up, before we can # slightly speed things up a bit here. It might also # be worth bypassing the queue entirely and directly # generating and submitting the batches. Currently # we're only saving around 10% of runtime by processing # the queue concurrently. except Exception: index_log.info( f"Error indexing model '{model.__name__}'", exc_info=True ) finally: session.invalidate() session.bind.dispose() with ThreadPoolExecutor() as executor: results = executor.map(reindex_model, self.indexable_base_models()) if fail: index_log.info('Failed reindexing:', tuple(results)) try: self.fts_indexer.process() finally: self.fts_orm_events.queue.maxsize = original_queue_size self.fts_indexer.engine.dispose()
@SearchApp.tween_factory(over=transaction_tween_factory)
[docs] def process_indexer_tween_factory( app: SearchApp, handler: Callable[[CoreRequest], Response] ) -> Callable[[CoreRequest], Response]: def process_indexer_tween(request: CoreRequest) -> Response: app: SearchApp = request.app # type:ignore[assignment] if not app.fts_search_enabled: return handler(request) result = handler(request) app.fts_indexer.process() return result return process_indexer_tween