Source code for core.datamanager

from __future__ import annotations

import os
import tempfile
import transaction

from onegov.core.utils import safe_move


from typing import TYPE_CHECKING
if TYPE_CHECKING:
    from transaction.interfaces import ITransaction


[docs] class FileDataManager: """ Writes a file when the transaction is commited. """
[docs] transaction_manager = transaction.manager
def __init__(self, data: bytes, path: str, tmp_path: str | None = None):
[docs] self.data = data
[docs] self.path = path
[docs] self.tmp_path = tmp_path
@classmethod
[docs] def write_file( cls, data: bytes, path: str, tmp_path: str | None = None ) -> None: transaction.get().join(cls(data, path, tmp_path))
[docs] def sortKey(self) -> str: return 'files'
[docs] def commit(self, transaction: ITransaction) -> None: with tempfile.NamedTemporaryFile(delete=False) as temp: self.tempfn = temp.name temp.write(self.data)
[docs] def abort(self, transaction: ITransaction) -> None: pass
[docs] def tpc_vote(self, transaction: ITransaction) -> None: if not os.path.exists(self.tempfn): raise ValueError(f'{self.tempfn} doesnt exist') if os.path.exists(self.path): raise ValueError('file already exists')
[docs] def tpc_abort(self, transaction: ITransaction) -> None: # if another DataManager before us in the chain raises a retryable # error before we get to commit, we still have to execute tpc_abort # despite tempfn not existing yet. if not hasattr(self, 'tempfn'): return try: os.remove(self.tempfn) except OSError: pass
[docs] def tpc_begin(self, transaction: ITransaction) -> None: pass
[docs] def tpc_finish(self, transaction: ITransaction) -> None: safe_move(self.tempfn, self.path, tmp_dst=self.tmp_path)