search.indexer
Attributes
Classes
dict() -> new empty dictionary |
|
Takes actions from a queue and executes them on the elasticsearch |
|
Manages the creation/destruction of indices. The indices it creates |
|
Detects languages with the help of langdetect. |
|
Handles the onegov.core orm events, translates them into indexing |
Functions
|
Takes the given index name and returns the hostname, schema, |
Module Contents
- class search.indexer.IndexTask[source]
Bases:
TypedDict
dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object’s
(key, value) pairs
- dict(iterable) -> new dictionary initialized as if via:
d = {} for k, v in iterable:
d[k] = v
- dict(**kwargs) -> new dictionary initialized with the name=value pairs
in the keyword argument list. For example: dict(one=1, two=2)
- search.indexer.parse_index_name(index_name: str) IndexParts [source]
Takes the given index name and returns the hostname, schema, language and type_name in a dictionary.
If the index_name doesn’t match the pattern, all values are None.
If the index_name has no version, the version is None.
- class search.indexer.IndexerBase[source]
-
- process(block: bool = False, timeout: float | None = None) int [source]
Processes the queue until it is empty or until there’s an error.
If there’s an error, the next call to this function will try to execute the failed task again. This is mainly meant for elasticsearch outages.
- Block:
If True, the process waits for the queue to be available. Useful if you run this in a separate thread.
- Timeout:
How long the blocking call should block. Has no effect if
block
is False.- Returns:
The number of successfully processed items
- class search.indexer.Indexer(mappings: TypeMappingRegistry, queue: queue.Queue[Task], es_client: elasticsearch.Elasticsearch, hostname: str | None = None)[source]
Bases:
IndexerBase
Takes actions from a queue and executes them on the elasticsearch cluster. Depends on
IndexManager
for index management and expects to have the sameTypeRegistry
asORMEventTranslator
.The idea is that this class does the indexing/deindexing, the index manager sets up the indices and the orm event translator listens for changes in the ORM.
A queue is used so the indexer can be run in a separate thread.
- class search.indexer.PostgresIndexer(queue: queue.Queue[IndexTask], engine: sqlalchemy.engine.Engine)[source]
Bases:
IndexerBase
- index(tasks: list[IndexTask] | IndexTask, session: sqlalchemy.orm.Session | None = None) bool [source]
Update the ‘fts_idx’ column (full text search index) of the given object(s)/task(s).
In case of a bunch of tasks we are assuming they are all from the same schema and table in order to optimize the indexing process.
When a session is passed we use that session’s transaction context and use a savepoint instead of our own transaction to perform the action.
- Parameters:
tasks – A list of tasks to index
session – Supply an active session
- Returns:
True if the indexing was successful, False otherwise
- class search.indexer.TypeMapping(name: str, mapping: dict[str, Any], model: type[onegov.search.Searchable] | None = None)[source]
-
- for_language(language: str) dict[str, Any] [source]
Returns the mapping for the given language. Mappings can be slightly different for each language. That is, the analyzer changes.
Because the
IndexManager
puts each language into its own index we do not have to worry about creating different versions of the same mapping here.
- class search.indexer.TypeMappingRegistry[source]
- mappings: dict[str, TypeMapping][source]
- __getitem__(key: str) TypeMapping [source]
- __iter__() collections.abc.Iterator[TypeMapping] [source]
- register_orm_base(base: type[object]) None [source]
Takes the given SQLAlchemy base and registers all
Searchable
objects.
- register_type(type_name: str, mapping: dict[str, Any], model: type[onegov.search.Searchable] | None = None) None [source]
Registers the given type with the given mapping. The mapping is as dictionary representing the part below the
mappings/type_name
.See:
https://www.elastic.co/guide/en/elasticsearch/reference/current/indices-create-index.html#mappings
When the mapping changes, a new index is created internally and the alias to this index (the external name of the index) is pointed to this new index.
As a consequence, a change in the mapping requires a reindex.
- class search.indexer.IndexManager(hostname: str, es_client: elasticsearch.Elasticsearch)[source]
Manages the creation/destruction of indices. The indices it creates have an internal name and an external alias. To facilitate that, versions are used.
- query_indices() set[str] [source]
Queryies the elasticsearch cluster for indices belonging to this hostname.
- query_aliases() set[str] [source]
Queryies the elasticsearch cluster for aliases belonging to this hostname.
- ensure_index(schema: str, language: str, mapping: TypeMapping, return_index: Literal['external', 'internal'] = 'external') str [source]
Takes the given database schema, language and type name and creates an internal index with a version number and an external alias without the version number.
- Schema:
The database schema this index is based on.
- Language:
The language in ISO 639-1 format.
- Mapping:
The
TypeMapping
mapping used in this index.- Return_index:
The index name to return. Either ‘external’ or ‘internal’.
- Returns:
The (external/aliased) name of the created index.
- remove_expired_indices(current_mappings: collections.abc.Iterable[TypeMapping]) int [source]
Removes all expired indices. An index is expired if it’s version number is no longer known in the current mappings.
- Returns:
The number of indices that were deleted.
- get_managed_indices_wildcard(schema: str) str [source]
Returns a wildcard index name for all indices managed.
- get_external_index_names(schema: str, languages: collections.abc.Iterable[str] = '*', types: collections.abc.Iterable[str] = '*') str [source]
Returns a comma separated string of external index names that match the given arguments. Useful to pass on to elasticsearch when targeting multiple indices.
- class search.indexer.ORMLanguageDetector(supported_languages: collections.abc.Sequence[str])[source]
Bases:
onegov.search.utils.LanguageDetector
Detects languages with the help of langdetect.
Unlike langdetect this detector may be limited to a subset of all supported languages, which may improve accuracy if the subset is known and saves some memory.
- class search.indexer.ORMEventTranslator(mappings: TypeMappingRegistry, max_queue_size: int = 0, languages: collections.abc.Sequence[str] = ('de', 'fr', 'en'))[source]
Handles the onegov.core orm events, translates them into indexing actions and puts the result into a queue for the indexer to consume.
The queue may be limited. Once the limit is reached, new events are no longer processed and an error is logged.