Source code for search.indexer

from __future__ import annotations

import re

from itertools import groupby
from onegov.core.utils import is_non_string_iterable
from onegov.search import index_log, log, Searchable, utils
from onegov.search.datamanager import IndexerDataManager
from onegov.search.search_index import SearchIndex
from onegov.search.utils import language_from_locale, normalize_text
from operator import itemgetter
from sqlalchemy import and_, bindparam, func, text, String
from sqlalchemy.orm import object_session
from sqlalchemy.orm.exc import ObjectDeletedError
from sqlalchemy.dialects.postgresql import insert, ARRAY
from uuid import UUID


from typing import Any, Literal, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Iterable, Iterator, Sequence
    from datetime import datetime
    from sqlalchemy.orm import Session
    from sqlalchemy.sql import ColumnElement
    from typing import TypeAlias
    from typing import TypedDict

[docs] class IndexTask(TypedDict):
[docs] action: Literal['index']
[docs] id: UUID | str | int
[docs] id_key: str
[docs] schema: str
[docs] tablename: str
[docs] owner_type: str
[docs] language: str
[docs] access: str
[docs] public: bool
[docs] suggestion: list[str]
[docs] tags: list[str]
[docs] last_change: datetime | None
[docs] publication_start: datetime | None
[docs] publication_end: datetime | None
[docs] properties: dict[str, str]
class DeleteTask(TypedDict): action: Literal['delete'] id: UUID | str | int schema: str owner_type: str tablename: str Task: TypeAlias = IndexTask | DeleteTask PKColumn: TypeAlias = ( ColumnElement[UUID | None] | ColumnElement[int | None] | ColumnElement[str | None] )
[docs] class Indexer: def __init__( self, mappings: TypeMappingRegistry, languages: set[str] | None = None ) -> None:
[docs] self.mappings = mappings
[docs] self.languages = languages or {'simple'}
[docs] def index( self, tasks: list[IndexTask] | IndexTask, session: Session, ) -> bool: """ Update the 'search_index' table (full text search index) of the given object(s)/task(s). In case of a bunch of tasks we are assuming they are all from the same schema and table in order to optimize the indexing process. When a session is passed we use that session's transaction context and use a savepoint instead of our own transaction to perform the action. :param tasks: A list of tasks to index :param session: Supply an active session :return: True if the indexing was successful, False otherwise """ params_dict = {} if not isinstance(tasks, list): tasks = [tasks] if not tasks: # nothing to do return True tablename: str | None = None schema: str | None = None owner_id_column: PKColumn | None = None try: for task in tasks: if schema is not None: if schema != task['schema']: index_log.error( 'Received mixed schemas in search delete tasks.' ) return False else: schema = task['schema'] if tablename is not None: if tablename != task['tablename']: index_log.error( 'Received mixed tables in search delete tasks.' ) return False else: tablename = task['tablename'] _owner_type = task['owner_type'] # class name _mapping = self.mappings[_owner_type] _owner_id = task['id'] _owner_id_column: PKColumn if isinstance(_owner_id, UUID): _owner_id_column = SearchIndex.owner_id_uuid elif isinstance(_owner_id, int): _owner_id_column = SearchIndex.owner_id_int elif isinstance(_owner_id, str): _owner_id_column = SearchIndex.owner_id_str if owner_id_column is not None: if owner_id_column is not _owner_id_column: index_log.error( 'Received mixed id types in search delete tasks.' ) return False else: owner_id_column = _owner_id_column detected_language = language_from_locale(task['language']) if detected_language not in self.languages: if len(self.languages) == 1: language = next(iter(self.languages)) elif 'german' in self.languages: language = 'german' elif 'french' in self.languages: language = 'french' else: # HACK: just take one language = next(iter(self.languages), 'simple') else: language = detected_language _properties = task['properties'] _tags = task['tags'] # NOTE: We use a dictionary to avoid duplicate updates for # the same model, only the latest update will count params_dict[_owner_id] = { '_data': _properties, '_owner_id': _owner_id, '_owner_type': _owner_type, '_owner_tablename': tablename, '_public': task['public'], '_access': task.get('access', 'public'), '_last_change': task['last_change'], '_tags': _tags, '_suggestion': task['suggestion'], '_publication_start': task.get('publication_start', None), '_publication_end': task.get('publication_end', None), **{ f'_lang__{lang}': lang for lang in self.languages }, **{ f'_{k}': v for k, v in _properties.items() } } for field in _properties.keys(): _config = _mapping.mapping.get(field, {}) _weight = _config.get('weight') if _weight not in ('A', 'B', 'C', 'D'): index_log.warn( f'Invalid weight for property "{field}" on type ' f'"{_owner_type}", falling back to weight "C".' ) _weight = 'C' for lang in self.languages: params_dict[_owner_id][ f'_weight__{field}__{lang}' ] = chr(ord(_weight) + 1) if ( 'localized' in _config.get('type', '') and lang != language # TODO: Do we want to emit a warning if we can't # de-prioritize non-matching languages? and _weight != 'D' ) else _weight assert schema is not None assert owner_id_column is not None combined_vector = func.setweight( func.array_to_tsvector( bindparam('_tags', type_=ARRAY(String)) ), 'A' ) for field in tasks[0]['properties'].keys(): for language in self.languages: combined_vector = combined_vector.op('||')( func.setweight( func.to_tsvector( bindparam(f'_lang__{language}', type_=String), bindparam(f'_{field}', type_=String) ), bindparam(f'_weight__{field}__{language}') ) ) stmt = ( insert(SearchIndex.__table__) .values( { owner_id_column: bindparam('_owner_id'), SearchIndex.owner_type: bindparam('_owner_type'), SearchIndex.owner_tablename: bindparam('_owner_tablename'), SearchIndex.publication_start: bindparam('_publication_start'), SearchIndex.publication_end: bindparam('_publication_end'), SearchIndex.public: bindparam('_public'), SearchIndex.access: bindparam('_access'), SearchIndex.last_change: bindparam('_last_change'), SearchIndex._tags: bindparam('_tags', type_=ARRAY(String)), SearchIndex.suggestion: bindparam('_suggestion'), SearchIndex.fts_idx_data: bindparam('_data'), SearchIndex.fts_idx: combined_vector, } ) # we may have already indexed this model # so perform an update instead .on_conflict_do_update( index_elements=[ SearchIndex.owner_tablename, owner_id_column ], set_={ # the owner_type can change, although uncommon 'owner_type': bindparam('_owner_type'), 'publication_start': bindparam('_publication_start'), 'publication_end': bindparam('_publication_end'), 'public': bindparam('_public'), 'access': bindparam('_access'), 'last_change': bindparam('_last_change'), 'tags': bindparam('_tags', type_=ARRAY(String)), 'suggestion': bindparam('_suggestion'), 'fts_idx_data': bindparam('_data'), 'fts_idx': combined_vector, }, # since our unique constraints are partial indeces # we need this index_where clause, otherwise postgres # will not be able to infer the matching constraint index_where=owner_id_column.isnot(None) # type: ignore[no-untyped-call] ) ) params = list(params_dict.values()) with session.begin_nested(): session.execute(stmt, params) except Exception: index_log.exception( f'Error creating index schema {schema} of ' f'table {tablename}, tasks: {[t["id"] for t in tasks]}', ) return False return True
[docs] def delete( self, tasks: list[IndexTask] | IndexTask, session: Session ) -> bool: if not isinstance(tasks, list): tasks = [tasks] schema: str | None = None tablename: str | None = None owner_ids: set[UUID | int | str] = set() owner_id_column: PKColumn | None = None for task in tasks: if schema is not None: if schema != task['schema']: index_log.error( 'Received mixed schemas in search delete tasks.' ) return False else: schema = task['schema'] if tablename is not None: if tablename != task['tablename']: index_log.error( 'Received mixed tables in search delete tasks.' ) return False else: tablename = task['tablename'] _owner_id = task['id'] _owner_id_column: PKColumn owner_ids.add(_owner_id) if isinstance(_owner_id, UUID): _owner_id_column = SearchIndex.owner_id_uuid elif isinstance(_owner_id, int): _owner_id_column = SearchIndex.owner_id_int elif isinstance(_owner_id, str): _owner_id_column = SearchIndex.owner_id_str if owner_id_column is not None: if owner_id_column is not _owner_id_column: index_log.error( 'Received mixed id types in search delete tasks.' ) return False else: owner_id_column = _owner_id_column if not owner_ids: # nothing to delete return True try: assert schema is not None assert tablename is not None assert owner_id_column is not None stmt = ( SearchIndex.__table__.delete() .where(and_( SearchIndex.owner_tablename == tablename, owner_id_column.in_(owner_ids) )) ) with session.begin_nested(): session.execute(stmt) except Exception: index_log.exception( f'Error deleting index schema {schema} tasks {tasks}:' ) return False return True
[docs] def process( self, tasks: Iterable[Task], session: Session ) -> int: """ Processes the queue in bulk. Gathers all tasks and groups them by action and owner type. Returns the number of successfully processed batches. """ grouped_tasks = groupby( tasks, # NOTE: We could group by tablename for delete actions # which could yield slightly larger batches in # some cases, but for indexing we currently # can't do that, because properties may differ # between multiple polymorphic variants key=itemgetter('schema', 'action', 'owner_type') ) success = 0 for (_, action, _), tasks in grouped_tasks: task_list = list(tasks) if action == 'index': success += self.index( task_list, # type: ignore[arg-type] session ) elif action == 'delete': success += self.delete( task_list, # type: ignore[arg-type] session ) else: raise NotImplementedError( f"Action '{action}' not implemented for {self.__class__}") return success
[docs] def delete_search_index(self, session: Session) -> None: """ Immediately delete all records in search index table. """ session.execute(text(""" TRUNCATE search_index; COMMIT; """))
[docs] class TypeMapping:
[docs] __slots__ = ('name', 'mapping', 'model')
def __init__( self, name: str, mapping: dict[str, Any], model: type[Searchable] | None = None ) -> None:
[docs] self.name = name
[docs] self.mapping = mapping
[docs] self.model = model
[docs] class TypeMappingRegistry:
[docs] mappings: dict[str, TypeMapping]
def __init__(self) -> None: self.mappings = {}
[docs] def __getitem__(self, key: str) -> TypeMapping: return self.mappings[key]
[docs] def __iter__(self) -> Iterator[TypeMapping]: yield from self.mappings.values()
[docs] def register_orm_base(self, base: type[object]) -> None: """ Takes the given SQLAlchemy base and registers all :class:`~onegov.search.mixins.Searchable` objects. """ for model in utils.searchable_sqlalchemy_models(base): self.register_type(model.__name__, model.fts_properties, model)
[docs] def register_type( self, type_name: str, mapping: dict[str, Any], model: type[Searchable] | None = None ) -> None: """ Registers the given type with the given mapping. The mapping is as dictionary representing the part below the ``mappings/type_name``. See: `<https://www.elastic.co/guide/en/elasticsearch/reference/current/\ indices-create-index.html#mappings>`_ When the mapping changes, a new index is created internally and the alias to this index (the external name of the index) is pointed to this new index. As a consequence, a change in the mapping requires a reindex. """ assert type_name not in self.mappings, ( f"Type '{type_name}' already registered") self.mappings[type_name] = TypeMapping(type_name, mapping, model)
@property
[docs] def registered_fields(self) -> set[str]: """ Goes through all the registered types and returns the a set with all fields used by the mappings. """ return {key for mapping in self for key in mapping.mapping.keys()}
[docs] class ORMLanguageDetector(utils.LanguageDetector):
[docs] html_strip_expression = re.compile(r'<[^<]+?>')
[docs] def localized_properties(self, obj: Searchable) -> Iterator[str]: for key, definition in obj.fts_properties.items(): if definition.get('type', '').startswith('localized'): yield key
[docs] def localized_texts( self, obj: Searchable, max_chars: int | None = None ) -> Iterator[str]: chars = 0 for p in self.localized_properties(obj): text = getattr(obj, p, '') if not isinstance(text, str): continue yield text.strip() chars += len(text) if max_chars is not None and max_chars <= chars: break
[docs] def detect_object_language(self, obj: Searchable) -> str: properties = self.localized_properties(obj) if not properties: # here, the mapping will be the same for all languages return self.supported_languages[0] text = ' '.join(self.localized_texts(obj, max_chars=1024)) text = self.html_strip_expression.sub('', text).strip() if not text: return self.supported_languages[0] return self.detect(text)
[docs] class ORMEventTranslator: """ Handles the onegov.core orm events, translates them into indexing actions and puts the result into a queue for the indexer to consume. The queue may be limited. Once the limit is reached, new events are no longer processed and an error is logged. """ def __init__( self, indexer: Indexer, max_queue_size: int = 0, languages: Sequence[str] = ('de', 'fr', 'en') ) -> None:
[docs] self.indexer = indexer
[docs] self.mappings = indexer.mappings
[docs] self.detector = ORMLanguageDetector(languages)
[docs] self.max_queue_size = max_queue_size
[docs] self.stopped = False
[docs] def on_insert(self, schema: str, obj: object) -> None: if not self.stopped: if isinstance(obj, Searchable): self.index(schema, obj)
[docs] def on_update(self, schema: str, obj: object) -> None: if not self.stopped: if isinstance(obj, Searchable): if obj.fts_skip: # NOTE: We need to emit a delete in case this # model previously wasn't skipped self.delete(schema, obj) else: self.index(schema, obj)
[docs] def on_delete(self, schema: str, session: Session, obj: object) -> None: if not self.stopped: if isinstance(obj, Searchable): self.delete(schema, obj, session)
[docs] def on_transaction_join(self, schema: str, session: Session) -> None: if not self.stopped: # NOTE: This ensures IndexerDataManager gets created before # the transaction is in its `Comitting` state, where # it no longer can be joined. IndexerDataManager.get_queue( session, self.indexer, self.max_queue_size )
[docs] def put(self, session: Session, translation: Task) -> None: queue = IndexerDataManager.get_queue( session, self.indexer, self.max_queue_size ) if queue is None: log.error( 'Tried to put events into the ORM translation queue ' 'while the transaction was in an invalid state.' ) return queue.append(translation)
[docs] def index_task( self, schema: str, obj: Searchable, ) -> IndexTask | None: try: if obj.fts_skip: return None if obj.fts_language == 'auto': language = self.detector.detect_object_language(obj) else: language = obj.fts_language _owner_type = obj.__class__.__name__ translation: IndexTask = { 'action': 'index', 'id': getattr(obj, obj.fts_id), 'id_key': obj.fts_id, 'schema': schema, 'tablename': obj.__tablename__, 'owner_type': _owner_type, 'language': language, 'access': obj.fts_access, 'public': obj.fts_public, 'suggestion': [], 'tags': obj.fts_tags or [], 'last_change': obj.fts_last_change, 'publication_start': obj.fts_publication_start, 'publication_end': obj.fts_publication_end, 'properties': {}, } mapping_ = self.mappings[_owner_type] for prop in mapping_.mapping.keys(): # FIXME: If we treat lists and dictionaries as documents # then we may create some unintended combinations # that are ranked highly because there are no stop # words separating them. We could try to switch to # `JSONB` input for `to_tsvector` which inserts at # least one stop word between every value, but we # might want more than one stop word of separation. raw = getattr(obj, prop) if raw is None: value = '' elif isinstance(raw, dict): # FIXME: Do we want to unnest nested structures? value = ' '.join( str(value) for value in raw.values() if value is not None ) elif is_non_string_iterable(raw): # FIXME: Do we want to unnest nested structures? value = ' '.join( str(value) for value in raw if value is not None ) else: value = str(raw) translation['properties'][prop] = normalize_text(value) suggestion = obj.fts_suggestion if suggestion: if is_non_string_iterable(suggestion): suggestion = list(suggestion) else: suggestion = [str(suggestion)] translation['suggestion'] = suggestion return translation except ObjectDeletedError: obj_id = getattr(obj, 'id', obj) log.info( f'Object {obj_id} was deleted before indexing:', exc_info=True ) return None
[docs] def delete_task(self, schema: str, obj: Searchable) -> DeleteTask: return { 'action': 'delete', 'schema': schema, 'tablename': obj.__tablename__, 'owner_type': obj.__class__.__name__, 'id': getattr(obj, obj.fts_id) }
[docs] def index( self, schema: str, obj: Searchable, session: Session | None = None, ) -> None: """ Creates or updates index for the given object """ if session is None: session = object_session(obj) task = self.index_task(schema, obj) if task is not None: self.put(object_session(obj), task)
[docs] def delete( self, schema: str, obj: Searchable, session: Session | None = None, ) -> None: """ Deletes index of the given object """ if session is None: session = object_session(obj) self.put(session, self.delete_task(schema, obj))