from __future__ import annotations
import platform
import re
from copy import deepcopy
from elasticsearch.exceptions import NotFoundError
from elasticsearch.helpers import streaming_bulk
from itertools import groupby
from queue import Queue, Empty, Full
from onegov.core.utils import hash_dictionary, is_non_string_iterable
from onegov.search import index_log, log, Searchable, utils
from onegov.search.errors import SearchOfflineError
from onegov.search.search_index import SearchIndex
from onegov.search.utils import language_from_locale
from operator import itemgetter
from sqlalchemy import and_, bindparam, func, String, Table, MetaData
from sqlalchemy.orm.exc import ObjectDeletedError
from sqlalchemy.dialects.postgresql import insert, HSTORE
from unidecode import unidecode
from uuid import UUID
from typing import Any, Literal, NamedTuple, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable, Iterable, Iterator, Sequence
from datetime import datetime
from elasticsearch import Elasticsearch
from sqlalchemy.engine import Engine
from sqlalchemy.orm import Session
from sqlalchemy.sql import ColumnElement
from sqlalchemy.sql.expression import Executable
from typing import TypeAlias
from typing import TypedDict
[docs]
class IndexTask(TypedDict):
[docs]
action: Literal['index']
[docs]
publication_start: datetime | None
[docs]
publication_end: datetime | None
# TODO: We only need the raw properties once we get rid of ES
[docs]
raw_properties: dict[str, Any]
[docs]
properties: dict[str, Any]
class DeleteTask(TypedDict):
action: Literal['delete']
id: UUID | str | int
schema: str
type_name: str
owner_type: str
tablename: str
Task: TypeAlias = IndexTask | DeleteTask
PKColumn: TypeAlias = (
ColumnElement[UUID | None]
| ColumnElement[int | None]
| ColumnElement[str | None]
)
[docs]
ES_ANALYZER_MAP = {
'en': 'english',
'de': 'german',
'fr': 'french',
'en_html': 'english_html',
'de_html': 'german_html',
'fr_html': 'french_html',
}
[docs]
ANALYSIS_CONFIG = {
'filter': {
'english_stop': {
'type': 'stop',
'stopwords': '_english_'
},
'english_stemmer': {
'type': 'stemmer',
'language': 'english'
},
'english_possessive_stemmer': {
'type': 'stemmer',
'language': 'possessive_english'
},
'german_stop': {
'type': 'stop',
'stopwords': '_german_'
},
'german_stemmer': {
'type': 'stemmer',
'language': 'light_german'
},
'french_elision': {
'type': 'elision',
'articles_case': True,
'articles': [
'l', 'm', 't', 'qu', 'n', 's',
'j', 'd', 'c', 'jusqu', 'quoiqu',
'lorsqu', 'puisqu'
]
},
'french_stop': {
'type': 'stop',
'stopwords': '_french_'
},
'french_keywords': {
'type': 'keyword_marker',
'keywords': ['Exemple']
},
'french_stemmer': {
'type': 'stemmer',
'language': 'light_french'
}
},
'analyzer': {
'english_html': {
'tokenizer': 'standard',
'char_filter': [
'html_strip'
],
'filter': [
'english_possessive_stemmer',
'lowercase',
'english_stop',
'english_stemmer'
]
},
'german_html': {
'tokenizer': 'standard',
'char_filter': [
'html_strip'
],
'filter': [
'lowercase',
'german_stop',
'german_normalization',
'german_stemmer'
]
},
'french_html': {
'tokenizer': 'standard',
'char_filter': [
'html_strip'
],
'filter': [
'french_elision',
'lowercase',
'french_stop',
'french_keywords',
'french_stemmer'
]
},
'autocomplete': {
'type': 'custom',
'char_filter': ['html_strip'],
'tokenizer': 'standard',
'filter': ['lowercase']
},
'tags': {
'type': 'custom',
'tokenizer': 'keyword',
'filter': ['lowercase']
},
}
}
[docs]
class IndexParts(NamedTuple):
[docs]
def parse_index_name(index_name: str) -> IndexParts:
""" Takes the given index name and returns the hostname, schema,
language and type_name in a dictionary.
* If the index_name doesn't match the pattern, all values are None.
* If the index_name has no version, the version is None.
"""
parts = index_name.split('-')
if len(parts) == 4:
hostname, schema, language, type_name = parts
version = None
elif len(parts) == 5:
hostname, schema, language, type_name, version = parts
else:
hostname = None
schema = None
language = None
type_name = None
version = None
return IndexParts(
hostname=hostname,
schema=schema,
language=language,
type_name=type_name,
version=version
)
[docs]
class IndexerBase:
[docs]
failed_task: Task | None = None
[docs]
def process(
self,
block: bool = False,
timeout: float | None = None
) -> int:
""" Processes the queue until it is empty or until there's an error.
If there's an error, the next call to this function will try to
execute the failed task again. This is mainly meant for elasticsearch
outages.
:block:
If True, the process waits for the queue to be available. Useful
if you run this in a separate thread.
:timeout:
How long the blocking call should block. Has no effect if
``block`` is False.
:return: The number of successfully processed items
"""
try:
processed = 0
while True:
# get the previously failed task or a new one
task = self.failed_task or self.queue.get(block, timeout)
self.failed_task = None
if self.process_task(task):
processed += 1
else:
# if the task failed, keep it for the next run and give up
self.failed_task = task
return processed
except Empty:
pass
return processed
[docs]
def process_task(self, task: Task) -> bool:
try:
getattr(self, task['action'])(task)
except SearchOfflineError:
return False
self.queue.task_done()
return True
[docs]
class Indexer(IndexerBase):
""" Takes actions from a queue and executes them on the elasticsearch
cluster. Depends on :class:`IndexManager` for index management and expects
to have the same :class:`TypeRegistry` as :class:`ORMEventTranslator`.
The idea is that this class does the indexing/deindexing, the index manager
sets up the indices and the orm event translator listens for changes in
the ORM.
A queue is used so the indexer can be run in a separate thread.
"""
def __init__(
self,
mappings: TypeMappingRegistry,
queue: Queue[Task],
es_client: Elasticsearch,
hostname: str | None = None
) -> None:
[docs]
self.es_client = es_client
self.queue = queue
[docs]
self.hostname = hostname or platform.node()
[docs]
self.ixmgr = IndexManager(self.hostname, es_client=self.es_client)
[docs]
self.mappings = mappings
[docs]
def bulk_process(self) -> None:
""" Processes the queue in bulk. This offers better performance but it
is less safe at the moment and should only be used as part of
reindexing.
"""
def actions() -> Iterator[dict[str, Any]]:
try:
task = self.queue.get(block=False, timeout=None)
if task['action'] == 'index':
yield {
'_op_type': 'index',
'_index': self.ensure_index(task),
'_id': task['id'],
'doc': task['properties']
}
else:
raise NotImplementedError
except Empty:
pass
for success, info in streaming_bulk(self.es_client, actions()):
if success:
self.queue.task_done()
[docs]
def ensure_index(self, task: IndexTask) -> str:
return self.ixmgr.ensure_index(
task['schema'],
task['language'],
self.mappings[task['type_name']],
return_index='internal'
)
[docs]
def index(self, task: IndexTask) -> None:
index = self.ensure_index(task)
self.es_client.index(
index=index,
id=task['id'], # type:ignore[arg-type]
document=task['properties']
)
[docs]
def delete(self, task: DeleteTask) -> None:
# get all the types this model could be stored in (with polymorphic)
# identites, this could be many
mapping = self.mappings[task['type_name']]
if mapping.model:
types = utils.related_types(mapping.model)
else:
types = {mapping.name}
# delete the document from all languages (because we don't know
# which one anymore) - and delete from all related types (polymorphic)
for type in types:
ix = self.ixmgr.get_external_index_name(
schema=task['schema'],
language='*',
type_name=type
)
# for the delete operation we need the internal index names
for internal in self.es_client.indices.get_alias(index=ix).keys():
try:
self.es_client.delete(
index=internal,
id=task['id'] # type:ignore[arg-type]
)
except NotFoundError:
pass
[docs]
class PostgresIndexer(IndexerBase):
[docs]
queue: Queue[Task]
def __init__(
self,
mappings: TypeMappingRegistry,
queue: Queue[Task],
engine: Engine,
languages: set[str] | None = None
) -> None:
[docs]
self.mappings = mappings
self.queue = queue
[docs]
self.engine = engine
[docs]
self.languages = languages or {'simple'}
[docs]
def index(
self,
tasks: list[IndexTask] | IndexTask,
session: Session | None = None,
) -> bool:
""" Update the 'search_index' table (full text search index) of
the given object(s)/task(s).
In case of a bunch of tasks we are assuming they are all from the
same schema and table in order to optimize the indexing process.
When a session is passed we use that session's transaction context
and use a savepoint instead of our own transaction to perform the
action.
:param tasks: A list of tasks to index
:param session: Supply an active session
:return: True if the indexing was successful, False otherwise
"""
params_dict = {}
if not isinstance(tasks, list):
tasks = [tasks]
if not tasks:
# nothing to do
return True
tablename: str | None = None
schema: str | None = None
owner_id_column: PKColumn | None = None
try:
for task in tasks:
if schema is not None:
if schema != task['schema']:
index_log.error(
'Received mixed schemas in search delete tasks.'
)
return False
else:
schema = task['schema']
if tablename is not None:
if tablename != task['tablename']:
index_log.error(
'Received mixed tables in search delete tasks.'
)
return False
else:
tablename = task['tablename']
_mapping = self.mappings[task['type_name']]
_owner_id = task['id']
_owner_type = task['owner_type'] # class name
_owner_id_column: PKColumn
if isinstance(_owner_id, UUID):
_owner_id_column = SearchIndex.owner_id_uuid
elif isinstance(_owner_id, int):
_owner_id_column = SearchIndex.owner_id_int
elif isinstance(_owner_id, str):
_owner_id_column = SearchIndex.owner_id_str
if owner_id_column is not None:
if owner_id_column is not _owner_id_column:
index_log.error(
'Received mixed id types in search delete tasks.'
)
return False
else:
owner_id_column = _owner_id_column
detected_language = language_from_locale(task['language'])
if detected_language not in self.languages:
if len(self.languages) == 1:
language = next(iter(self.languages))
elif 'german' in self.languages:
language = 'german'
elif 'french' in self.languages:
language = 'french'
else:
# HACK: just take one
language = next(iter(self.languages), 'simple')
else:
language = detected_language
data = {
k: unidecode(
' '.join(v) if isinstance(v, list) else str(v)
) if v else ''
for k, v in task['raw_properties'].items()
if k == 'es_tags' or not k.startswith('es_')
}
_tags_list = task['raw_properties'].get('es_tags')
_tags = dict.fromkeys(_tags_list, '') if _tags_list else None
# NOTE: We use a dictionary to avoid duplicate updates for
# the same model, only the latest update will count
params_dict[_owner_id] = {
'_data': data,
'_owner_id': _owner_id,
'_owner_type': _owner_type,
'_owner_tablename': tablename,
'_public': task['raw_properties']['es_public'],
'_access': task.get('access', 'public'),
'_last_change': task['raw_properties']['es_last_change'],
'_tags': _tags,
'_suggestion': task['suggestion'],
'_publication_start':
task.get('publication_start', None),
'_publication_end':
task.get('publication_end', None),
**{
f'_lang__{lang}': lang
for lang in self.languages
},
**{
f'_{k}': v
for k, v in data.items()
}
}
for field in data.keys():
_config = _mapping.raw_mapping.get(field, {})
_weight = _config.get('weight')
if _weight not in ('A', 'B', 'C', 'D'):
index_log.warn(
f'Invalid weight for property "{field}" on type '
f'"{_owner_type}", falling back to weight "C".'
)
_weight = 'C'
for lang in self.languages:
params_dict[_owner_id][
f'_weight__{field}__{lang}'
] = chr(ord(_weight) + 1) if (
'localized' in _config.get('type', '')
and lang != language
# TODO: Do we want to emit a warning if we can't
# deprioritize non-matching languages?
and _weight != 'D'
) else _weight
assert schema is not None
assert owner_id_column is not None
weighted_vector = [
func.setweight(
func.to_tsvector(
bindparam(f'_lang__{language}', type_=String),
bindparam(f'_{field}', type_=String)
),
bindparam(f'_weight__{field}__{language}')
)
for field in tasks[0]['raw_properties'].keys()
if field == 'es_tags' or not field.startswith('es_')
for language in self.languages
]
combined_vector = weighted_vector[0]
for vector in weighted_vector[1:]:
combined_vector = combined_vector.op('||')(vector)
stmt = (
insert(SearchIndex.__table__)
.values(
{
owner_id_column: bindparam('_owner_id'),
SearchIndex.owner_type: bindparam('_owner_type'),
SearchIndex.owner_tablename:
bindparam('_owner_tablename'),
SearchIndex.publication_start:
bindparam('_publication_start'),
SearchIndex.publication_end:
bindparam('_publication_end'),
SearchIndex.public: bindparam('_public'),
SearchIndex.access: bindparam('_access'),
SearchIndex.last_change: bindparam('_last_change'),
SearchIndex._tags: bindparam('_tags', type_=HSTORE),
SearchIndex.suggestion: bindparam('_suggestion'),
SearchIndex.fts_idx_data: bindparam('_data'),
SearchIndex.fts_idx: combined_vector,
}
)
# we may have already indexed this model
# so perform an update instead
.on_conflict_do_update(
index_elements=[
SearchIndex.owner_tablename,
owner_id_column
],
set_={
# the owner_type can change, although uncommon
'owner_type': bindparam('_owner_type'),
'publication_start': bindparam('_publication_start'),
'publication_end': bindparam('_publication_end'),
'public': bindparam('_public'),
'access': bindparam('_access'),
'last_change': bindparam('_last_change'),
'tags': bindparam('_tags', type_=HSTORE),
'suggestion': bindparam('_suggestion'),
'fts_idx_data': bindparam('_data'),
'fts_idx': combined_vector,
},
# since our unique constraints are partial indeces
# we need this index_where clause, otherwise postgres
# will not be able to infer the matching constraint
index_where=owner_id_column.isnot(None) # type: ignore[no-untyped-call]
)
)
params = list(params_dict.values())
self.execute_statement(session, schema, stmt, params)
except Exception:
index_log.exception(
f'Error creating index schema {schema} of '
f'table {tablename}, tasks: {[t["id"] for t in tasks]}',
)
return False
return True
[docs]
def execute_statement(
self,
session: Session | None,
schema: str,
stmt: Executable,
params: list[dict[str, Any]] | None = None
) -> None:
if session is None:
connection = self.engine.connect()
connection = connection.execution_options(
schema_translate_map={None: schema}
)
with connection.begin():
connection.execute(stmt, params or [{}])
else:
# use a savepoint instead
with session.begin_nested():
session.execute(stmt, params or [{}])
[docs]
def delete(
self,
tasks: list[IndexTask] | IndexTask,
session: Session | None = None
) -> bool:
if not isinstance(tasks, list):
tasks = [tasks]
schema: str | None = None
tablename: str | None = None
owner_ids: set[UUID | int | str] = set()
owner_id_column: PKColumn | None = None
for task in tasks:
if schema is not None:
if schema != task['schema']:
index_log.error(
'Received mixed schemas in search delete tasks.'
)
return False
else:
schema = task['schema']
if tablename is not None:
if tablename != task['tablename']:
index_log.error(
'Received mixed tables in search delete tasks.'
)
return False
else:
tablename = task['tablename']
_owner_id = task['id']
_owner_id_column: PKColumn
owner_ids.add(_owner_id)
if isinstance(_owner_id, UUID):
_owner_id_column = SearchIndex.owner_id_uuid
elif isinstance(_owner_id, int):
_owner_id_column = SearchIndex.owner_id_int
elif isinstance(_owner_id, str):
_owner_id_column = SearchIndex.owner_id_str
if owner_id_column is not None:
if owner_id_column is not _owner_id_column:
index_log.error(
'Received mixed id types in search delete tasks.'
)
return False
else:
owner_id_column = _owner_id_column
if not owner_ids:
# nothing to delete
return True
try:
assert schema is not None
assert tablename is not None
assert owner_id_column is not None
stmt = (
SearchIndex.__table__.delete()
.where(and_(
SearchIndex.owner_tablename == tablename,
owner_id_column.in_(owner_ids)
))
)
self.execute_statement(session, schema, stmt)
except Exception:
index_log.exception(
f'Error deleting index schema {schema} tasks {tasks}:'
)
return False
return True
# FIXME: bulk_process should probably be the only function we use for
# the Postgres indexer, we don't have to worry about individual
# transactions failing as much
[docs]
def bulk_process(self, session: Session | None = None) -> None:
""" Processes the queue in bulk.
Gathers all tasks and groups them by action and owner type
"""
def task_generator() -> Iterator[Task]:
while not self.queue.empty():
task = self.queue.get(block=False, timeout=None)
self.queue.task_done()
yield task
grouped_tasks = groupby(
task_generator(),
# NOTE: We could group by tablename for delete actions
# which could yield slightly larger batches in
# some cases, but for indexing we currently
# can't do that, because properties may differ
# between multiple polymorphic variants
key=itemgetter('schema', 'action', 'owner_type')
)
for (_, action, _), tasks in grouped_tasks:
task_list = list(tasks)
if action == 'index':
self.index(task_list, session) # type: ignore[arg-type]
elif action == 'delete':
self.delete(task_list, session) # type: ignore[arg-type]
else:
raise NotImplementedError(
f"Action '{action}' not implemented for {self.__class__}")
[docs]
def delete_search_index(self, schema: str) -> None:
""" Delete all records in search index table of the given `schema`. """
metadata = MetaData(schema=schema)
search_index_table = Table(SearchIndex.__tablename__, metadata)
stmt = search_index_table.delete()
connection = self.engine.connect()
with connection.begin():
connection.execute(stmt)
[docs]
class TypeMapping:
[docs]
__slots__ = ('name', 'mapping', 'raw_mapping', 'version', 'model')
def __init__(
self,
name: str,
mapping: dict[str, Any],
model: type[Searchable] | None = None
) -> None:
[docs]
self.mapping = self.add_defaults(mapping)
[docs]
self.raw_mapping = mapping
self.raw_mapping['es_tags'] = {'type': 'text', 'weight': 'A'}
[docs]
self.version = hash_dictionary(mapping)
[docs]
def add_defaults(self, mapping: dict[str, Any]) -> dict[str, Any]:
# HACK: remove the weight key for ElasticSearch, eventually
# we can refactor all of this when ElasticSearch goes
# away.
mapping = {
key: {
config_key: value
for config_key, value in config.items()
if config_key != 'weight'
}
for key, config in mapping.items()
}
mapping['es_public'] = {
'type': 'boolean'
}
mapping['es_last_change'] = {
'type': 'date'
}
mapping['es_suggestion'] = {
'analyzer': 'autocomplete',
'type': 'completion',
'contexts': [
{
'name': 'es_suggestion_context',
'type': 'category'
}
]
}
mapping['es_tags'] = {
'analyzer': 'tags',
'type': 'text',
}
return mapping
[docs]
def for_language(self, language: str) -> dict[str, Any]:
""" Returns the mapping for the given language. Mappings can
be slightly different for each language. That is, the analyzer
changes.
Because the :class:`IndexManager` puts each language into its own
index we do not have to worry about creating different versions
of the same mapping here.
"""
return self.supplement_analyzer(deepcopy(self.mapping), language)
[docs]
def supplement_analyzer(
self,
dictionary: dict[str, Any],
language: str
) -> dict[str, Any]:
""" Iterate through the dictionary found in the type mapping and
replace the 'localized' type with a 'text' type that includes a
language specific analyzer.
"""
supplement = None
for key, value in dictionary.items():
if hasattr(value, 'items'):
dictionary[key] = self.supplement_analyzer(value, language)
elif key == 'type' and value.startswith('localized'):
supplement = value.replace('localized', language)
break
if supplement:
assert 'analyzer' not in dictionary
dictionary[key] = 'text'
dictionary['analyzer'] = ES_ANALYZER_MAP[supplement]
return dictionary
[docs]
class TypeMappingRegistry:
[docs]
mappings: dict[str, TypeMapping]
def __init__(self) -> None:
self.mappings = {}
[docs]
def __getitem__(self, key: str) -> TypeMapping:
return self.mappings[key]
[docs]
def __iter__(self) -> Iterator[TypeMapping]:
yield from self.mappings.values()
[docs]
def register_orm_base(self, base: type[object]) -> None:
""" Takes the given SQLAlchemy base and registers all
:class:`~onegov.search.mixins.Searchable` objects.
"""
for model in utils.searchable_sqlalchemy_models(base):
self.register_type(model.es_type_name, model.es_properties, model)
[docs]
def register_type(
self,
type_name: str,
mapping: dict[str, Any],
model: type[Searchable] | None = None
) -> None:
""" Registers the given type with the given mapping. The mapping is
as dictionary representing the part below the ``mappings/type_name``.
See:
`<https://www.elastic.co/guide/en/elasticsearch/reference/current/\
indices-create-index.html#mappings>`_
When the mapping changes, a new index is created internally and the
alias to this index (the external name of the index) is pointed to
this new index.
As a consequence, a change in the mapping requires a reindex.
"""
assert type_name not in self.mappings, \
f"Type '{type_name}' already registered"
self.mappings[type_name] = TypeMapping(type_name, mapping, model)
@property
[docs]
def registered_fields(self) -> set[str]:
""" Goes through all the registered types and returns the a set with
all fields used by the mappings.
"""
return {key for mapping in self for key in mapping.mapping.keys()}
[docs]
class IndexManager:
""" Manages the creation/destruction of indices. The indices it creates
have an internal name and an external alias. To facilitate that, versions
are used.
"""
[docs]
created_indices: set[str]
def __init__(self, hostname: str, es_client: Elasticsearch) -> None:
assert hostname and es_client
[docs]
self.hostname = hostname
[docs]
self.es_client = es_client
self.created_indices = set()
@property
[docs]
def normalized_hostname(self) -> str:
return utils.normalize_index_segment(
self.hostname, allow_wildcards=False)
[docs]
def query_indices(self) -> set[str]:
""" Queryies the elasticsearch cluster for indices belonging to this
hostname. """
return set(
self.es_client.cat.indices( # type:ignore[union-attr]
index=f'{self.normalized_hostname}-*', h='index'
).splitlines()
)
[docs]
def query_aliases(self) -> set[str]:
""" Queryies the elasticsearch cluster for aliases belonging to this
hostname. """
result = set()
infos = self.es_client.indices.get_alias(
index='{}-*'.format(self.normalized_hostname)
)
for info in infos.values():
result.update(info['aliases'])
return result
[docs]
def ensure_index(
self,
schema: str,
language: str,
mapping: TypeMapping,
return_index: Literal['external', 'internal'] = 'external'
) -> str:
""" Takes the given database schema, language and type name and
creates an internal index with a version number and an external
alias without the version number.
:schema:
The database schema this index is based on.
:language:
The language in ISO 639-1 format.
:mapping:
The :class:`TypeMapping` mapping used in this index.
:return_index:
The index name to return. Either 'external' or 'internal'.
:return:
The (external/aliased) name of the created index.
"""
assert schema and language and mapping
assert len(language) == 2
assert return_index == 'external' or return_index == 'internal'
external = self.get_external_index_name(schema, language, mapping.name)
internal = self.get_internal_index_name(
schema, language, mapping.name, mapping.version)
return_value = return_index == 'external' and external or internal
if internal in self.created_indices:
return return_value
if self.es_client.indices.exists(index=internal):
self.created_indices.add(internal)
return return_value
# create the index
self.es_client.indices.create(
index=internal,
mappings={
'properties': mapping.for_language(language)
},
settings={
'analysis': ANALYSIS_CONFIG,
'index': {
'number_of_shards': 1,
'number_of_replicas': 0,
'refresh_interval': '5s'
}
}
)
# point the alias to the new index
self.es_client.indices.put_alias(name=external, index=internal)
# cache the result
self.created_indices.add(internal)
return return_value
[docs]
def remove_expired_indices(
self,
current_mappings: Iterable[TypeMapping]
) -> int:
""" Removes all expired indices. An index is expired if it's version
number is no longer known in the current mappings.
:return: The number of indices that were deleted.
"""
active_versions = {m.version for m in current_mappings}
count = 0
for index in self.query_indices():
info = parse_index_name(index)
if info.version and info.version not in active_versions:
self.es_client.indices.delete(index=index)
self.created_indices.remove(index)
count += 1
return count
[docs]
def get_managed_indices_wildcard(self, schema: str) -> str:
""" Returns a wildcard index name for all indices managed. """
return '-'.join((
utils.normalize_index_segment(
self.hostname, allow_wildcards=False),
utils.normalize_index_segment(
schema, allow_wildcards=False),
'*'
))
[docs]
def get_external_index_names(
self,
schema: str,
languages: Iterable[str] = '*',
types: Iterable[str] = '*'
) -> str:
""" Returns a comma separated string of external index names that
match the given arguments. Useful to pass on to elasticsearch when
targeting multiple indices.
"""
return ','.join(
self.get_external_index_name(schema, language, type_name)
for language in languages
for type_name in types
)
[docs]
def get_external_index_name(
self,
schema: str,
language: str,
type_name: str
) -> str:
""" Generates the external index name from the given parameters. """
return '-'.join(
utils.normalize_index_segment(segment, allow_wildcards=True)
for segment in (self.hostname, schema, language, type_name)
)
[docs]
def get_internal_index_name(
self,
schema: str,
language: str,
type_name: str,
version: str
) -> str:
""" Generates the internal index name from the given parameters. """
return '-'.join((
self.get_external_index_name(schema, language, type_name),
utils.normalize_index_segment(version, allow_wildcards=False)
))
[docs]
class ORMLanguageDetector(utils.LanguageDetector):
[docs]
html_strip_expression = re.compile(r'<[^<]+?>')
[docs]
def localized_properties(self, obj: Searchable) -> Iterator[str]:
for key, definition in obj.es_properties.items():
if definition.get('type', '').startswith('localized'):
yield key
[docs]
def localized_texts(
self,
obj: Searchable,
max_chars: int | None = None
) -> Iterator[str]:
chars = 0
for p in self.localized_properties(obj):
text = getattr(obj, p, '')
if not isinstance(text, str):
continue
yield text.strip()
chars += len(text)
if max_chars is not None and max_chars <= chars:
break
[docs]
def detect_object_language(self, obj: Searchable) -> str:
properties = self.localized_properties(obj)
if not properties:
# here, the mapping will be the same for all languages
return self.supported_languages[0]
text = ' '.join(self.localized_texts(obj, max_chars=1024))
text = self.html_strip_expression.sub('', text).strip()
if not text:
return self.supported_languages[0]
return self.detect(text)
[docs]
class ORMEventTranslator:
""" Handles the onegov.core orm events, translates them into indexing
actions and puts the result into a queue for the indexer to consume.
The queue may be limited. Once the limit is reached, new events are no
longer processed and an error is logged.
"""
[docs]
converters: dict[str, Callable[[Any], Any]] = {
'date': lambda dt: dt and dt.isoformat(),
}
[docs]
psql_queue: Queue[Task]
def __init__(
self,
mappings: TypeMappingRegistry,
max_queue_size: int = 0,
languages: Sequence[str] = ('de', 'fr', 'en')
) -> None:
[docs]
self.mappings = mappings
self.es_queue = Queue(maxsize=max_queue_size)
self.psql_queue = Queue(maxsize=max_queue_size)
[docs]
self.detector = ORMLanguageDetector(languages)
[docs]
def on_insert(self, schema: str, obj: object) -> None:
if not self.stopped:
if isinstance(obj, Searchable):
self.index(schema, obj)
[docs]
def on_update(self, schema: str, obj: object) -> None:
if not self.stopped:
if isinstance(obj, Searchable):
# FIXME: We should be able to get rid of this delete
# once we get rid of ES, since we implemented
# upserting for our Postgres indexer.
self.delete(schema, obj)
self.index(schema, obj)
[docs]
def on_delete(self, schema: str, obj: object) -> None:
if not self.stopped:
if isinstance(obj, Searchable):
self.delete(schema, obj)
[docs]
def put(self, translation: Task) -> None:
try:
self.es_queue.put_nowait(translation)
except Full:
log.error('The es orm event translator queue is full!')
try:
self.psql_queue.put_nowait(translation)
except Full:
log.error('The psql orm event translator queue is full!')
[docs]
def index(
self,
schema: str,
obj: Searchable,
) -> None:
"""
Creates or updates index for the given object
"""
try:
if obj.es_skip:
return
if obj.es_language == 'auto':
language = self.detector.detect_object_language(obj)
else:
language = obj.es_language
translation: IndexTask = {
'action': 'index',
'id': getattr(obj, obj.es_id),
'id_key': obj.es_id,
'schema': schema,
'type_name': obj.es_type_name,
'tablename': obj.__tablename__,
'owner_type': obj.__class__.__name__,
'language': language,
'access': getattr(obj, 'access', 'public'),
'suggestion': [],
'publication_start': getattr(obj, 'publication_start', None),
'publication_end': getattr(obj, 'publication_end', None),
'raw_properties': {},
'properties': {},
}
mapping_ = self.mappings[obj.es_type_name].for_language(language)
for prop, mapping in mapping_.items():
if prop == 'es_suggestion':
continue
convert = self.converters.get(mapping['type'], lambda v: v)
raw = getattr(obj, prop)
if is_non_string_iterable(raw):
translation['properties'][prop] = [convert(v) for v in raw]
# TODO: Do we actually need to coerce to list?
# that seems potentially dangerous for dictionaries
translation['raw_properties'][prop] = list(raw)
else:
translation['properties'][prop] = convert(raw)
translation['raw_properties'][prop] = raw
if obj.es_public:
contexts = {'es_suggestion_context': ['public']}
else:
contexts = {'es_suggestion_context': ['private']}
suggestion = obj.es_suggestion
if suggestion:
if is_non_string_iterable(suggestion):
suggestion = list(suggestion)
else:
suggestion = [str(suggestion)]
translation['suggestion'] = suggestion
translation['properties']['es_suggestion'] = {
'input': suggestion,
'contexts': contexts
}
self.put(translation)
except ObjectDeletedError:
obj_id = getattr(obj, 'id', obj)
log.info(
f'Object {obj_id} was deleted before indexing:',
exc_info=True
)
[docs]
def delete(self, schema: str, obj: Searchable) -> None:
"""
Deletes index of the given object
"""
translation: DeleteTask = {
'action': 'delete',
'schema': schema,
'type_name': obj.es_type_name,
'tablename': obj.__tablename__,
'owner_type': obj.__class__.__name__,
'id': getattr(obj, obj.es_id)
}
self.put(translation)