Source code for search.datamanager

from __future__ import annotations

import transaction

from onegov.search import log
from weakref import WeakKeyDictionary


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from onegov.search.indexer import Indexer, Task
    from sqlalchemy.orm import Session
    from transaction.interfaces import ITransaction


[docs] class TaskQueue(list['Task']): def __init__(self, max_size: int = 0) -> None: super().__init__()
[docs] self.max_size = max_size
[docs] def append(self, value: Task) -> None: if self.max_size and len(self) >= self.max_size: log.error('The orm event translator queue is full!') return super().append(value)
# NOTE: Disallow extend/insert/__setitem__ __setitem__ = extend = insert = None # type: ignore[assignment]
_DATAMANAGERS: WeakKeyDictionary[Session, IndexerDataManager]
[docs] _DATAMANAGERS = WeakKeyDictionary()
[docs] class IndexerDataManager: """ Flushes pending indexing tasks to the database. """
[docs] transaction_manager = transaction.manager
[docs] queue: TaskQueue | None
[docs] session: Session | None
[docs] indexer: Indexer | None
def __init__( self, session: Session, indexer: Indexer, max_queue_size: int = 0 ) -> None: self.session = session self.indexer = indexer self.queue = TaskQueue(max_queue_size) transaction.get().join(self) _DATAMANAGERS[session] = self @classmethod
[docs] def get_queue( cls, session: Session | None, indexer: Indexer, max_queue_size: int = 0 ) -> TaskQueue | None: if session is None: return None instance = _DATAMANAGERS.get(session) if instance is None: instance = cls(session, indexer, max_queue_size) return instance.queue
[docs] def _finish(self) -> None: assert self.session is not None assert self.queue is not None del _DATAMANAGERS[self.session] self.queue.clear() self.session = self.indexer = self.queue = None
[docs] def sortKey(self) -> str: # we want to sort close to the end but before zope.sqlalchemy return '~indexer'
[docs] def commit(self, transaction: ITransaction) -> None: pass
[docs] def abort(self, transaction: ITransaction) -> None: if self.session is not None: self._finish()
[docs] def tpc_begin(self, transaction: ITransaction) -> None: pass
[docs] def tpc_vote(self, transaction: ITransaction) -> None: # NOTE: This is the best stage to submit our tasks to the indexer # since zope.sqlalchemy will flush in tpc_begin and start # commiting things in tpc_vote, but our tpc_vote will run # before theirs. if self.session is None: return assert self.indexer is not None assert self.queue is not None self.indexer.process(self.queue, self.session)
[docs] def tpc_abort(self, transaction: ITransaction) -> None: if self.session is not None: self._finish()
[docs] def tpc_finish(self, transaction: ITransaction) -> None: if self.session is not None: self._finish()
[docs] def savepoint(self) -> IndexerSavepoint: assert self.queue is not None assert self.session is not None # this ensures everything up to the savepoint has been recorded self.session.flush() return IndexerSavepoint(self.queue)
[docs] class IndexerSavepoint: def __init__(self, queue: TaskQueue) -> None:
[docs] self.queue = queue
[docs] self.saved_size = len(queue)
[docs] def rollback(self) -> None: del self.queue[self.saved_size:]