Source code for search.integration

from __future__ import annotations

import morepath

from concurrent.futures import ThreadPoolExecutor
from functools import cached_property
from onegov.search import index_log, Searchable
from onegov.search.indexer import Indexer
from onegov.search.indexer import ORMEventTranslator
from onegov.search.indexer import TypeMappingRegistry
from onegov.search.utils import (
    apply_searchable_polymorphic_filter,
    get_polymorphic_base,
    language_from_locale,
    searchable_sqlalchemy_models,
)
from sqlalchemy import text
from sqlalchemy.orm import undefer


from typing import Any, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable
    from onegov.core.orm import Base, SessionManager
    from onegov.core.request import CoreRequest
    from sqlalchemy.orm import Session


[docs] class SearchApp(morepath.App): """ Provides elasticsearch and postgres integration for :class:`onegov.core.framework.Framework` based applications. The application must be connected to a database. Usage:: from onegov.core import Framework class MyApp(Framework, ESIntegration): pass """ if TYPE_CHECKING: # forward declare required attributes
[docs] schema: str
session_manager: SessionManager @property def session(self) -> Callable[[], Session]: ... @property def has_database_connection(self) -> bool: ... @cached_property def locales(self) -> set[str]: ... @cached_property
[docs] def fts_languages(self) -> set[str]: return { language_from_locale(locale) for locale in self.locales } or {'simple'}
[docs] def indexable_base_models(self) -> set[type[Searchable | Base]]: return { get_polymorphic_base(model) for base in self.session_manager.bases for model in searchable_sqlalchemy_models(base) }
[docs] def perform_reindex(self) -> None: """ Re-indexes all content. This is a heavy operation and should be run with consideration. By default, all exceptions during reindex are silently ignored. """ if not self.fts_search_enabled: return schema = self.schema session = self.session() self.fts_indexer.delete_search_index(session) def reindex_model(model: type[Base]) -> None: """ Load all database objects and index them. """ session = self.session() try: query = session.query(model).options(undefer('*')) query = apply_searchable_polymorphic_filter( query, model, order_by_polymorphic_identity=True ) # NOTE: we bypass the normal transaction machinery for speed self.fts_indexer.process(( task for obj in query if ( task := self.fts_orm_events.index_task(schema, obj) ) is not None ), session) session.execute(text('COMMIT')) except Exception: index_log.info( f"Error indexing model '{model.__name__}' " f"in schema {schema}", exc_info=True ) finally: session.invalidate() session.bind.dispose() with ThreadPoolExecutor() as executor: executor.map(reindex_model, self.indexable_base_models()) session.invalidate() session.bind.dispose()