import sqlalchemy_utils.observer
from dectate.tool import resolve_dotted_name
from functools import wraps
from sqlalchemy.event import contains, listen, remove
from typing import Any, ClassVar, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from onegov.core.framework import Framework
from sqlalchemy.orm import Mapper
[docs]
_F = TypeVar('_F', bound=Callable[..., Any])
[docs]
class ScopedPropertyObserver(sqlalchemy_utils.observer.PropertyObserver):
"""
This subclass of `PropertyObserver` doesn't register itself globally.
Having all observers listen for each application is both wasteful and
can lead to bugs if some tables only exist for some applications.
We should never use the base class, since that will always cause all
observers to trigger regardless of whether we used the scoped observer
or not.
"""
[docs]
_global_observer: ClassVar['ScopedPropertyObserver']
[docs]
_scoped_observers: ClassVar[dict[str, 'ScopedPropertyObserver']]
_scoped_observers = {}
def __new__(cls, dotted_name: str | None) -> 'ScopedPropertyObserver':
# special case global scope
if dotted_name is None:
if not hasattr(cls, '_global_observer'):
cls._global_observer = super().__new__(cls)
return cls._global_observer
if dotted_name not in cls._scoped_observers:
cls._scoped_observers[dotted_name] = super().__new__(cls)
return cls._scoped_observers[dotted_name]
def __init__(self, dotted_name: str | None) -> None:
"""
In order to get ourselves out of circular dependency hell we
accept a dotted_name in place of the application class that
defines the scope of the observer.
The dotted_name will be resolved once enter_scope is called.
"""
super().__init__()
[docs]
self.dotted_name = dotted_name
if dotted_name is None:
# global scope is always active
self.activate()
@property
[docs]
def scope(self) -> type[object]:
assert self.dotted_name is not None
application_cls = resolve_dotted_name(self.dotted_name)
assert isinstance(application_cls, type)
# reify this so we only look the scope up once
self.__dict__['scope'] = application_cls
return application_cls
[docs]
def register_listeners(self) -> None:
for cls, event, func in self.listener_args:
# don't register before_flush
if event == 'before_flush':
continue
if not contains(cls, event, func):
listen(cls, event, func)
[docs]
def activate(self) -> None:
for cls, event, func in self.listener_args:
# only register before_flush
if event != 'before_flush':
continue
if not contains(cls, event, func):
listen(cls, event, func)
[docs]
def deactivate(self) -> None:
for cls, event, func in self.listener_args:
# only deregister before_flush
if event != 'before_flush':
continue
if contains(cls, event, func):
remove(cls, event, func)
[docs]
def update_generator_registry(
self,
mapper: 'Mapper',
class_: type[Any]
) -> None:
for generator in class_.__dict__.values():
if getattr(
generator,
'__observes_scope__',
MISSING
) == self.dotted_name:
self.generator_registry[class_].append(
generator
)
@classmethod
[docs]
def enter_scope(cls, application: 'Framework') -> None:
for observer in cls._scoped_observers.values():
if isinstance(application, observer.scope):
observer.activate()
else:
observer.deactivate()
@classmethod
[docs]
def enter_class_scope(cls, application_cls: type['Framework']) -> None:
for observer in cls._scoped_observers.values():
if issubclass(application_cls, observer.scope):
observer.activate()
else:
observer.deactivate()
[docs]
def __repr__(self) -> str:
return '<ScopedPropertyObserver>'
[docs]
def observes(
*paths: str,
scope: str | None = None
) -> 'Callable[[_F], _F]':
observer = ScopedPropertyObserver(scope)
observer.register_listeners()
def decorator(func: '_F') -> '_F':
@wraps(func)
def wrapper(self: Any, *args: Any, **kwargs: Any) -> Any:
return func(self, *args, **kwargs)
wrapper.__observes__ = paths # type:ignore[attr-defined]
wrapper.__observes_scope__ = scope # type:ignore[attr-defined]
return wrapper # type:ignore[return-value]
return decorator
if hasattr(sqlalchemy_utils.observer, 'observer'):
# make sure this observer doesn't mess with us
# remove_listeners doesn't check if the listeners are there
# so we call register_listeners to make sure we can remove them
sqlalchemy_utils.observer.observer.register_listeners()
sqlalchemy_utils.observer.observer.remove_listeners()
delattr(sqlalchemy_utils.observer, 'observer')