Source code for core.orm

import psycopg2

from markupsafe import escape, Markup
from onegov.core.orm.cache import orm_cached, request_cached
from onegov.core.orm.observer import observes
from onegov.core.orm.session_manager import SessionManager, query_schemas
from onegov.core.orm.sql import as_selectable, as_selectable_from_path
from sqlalchemy import event, inspect
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import object_session
from sqlalchemy.orm import Query
from sqlalchemy_utils import TranslationHybrid
from zope.sqlalchemy import mark_changed
from sqlalchemy.exc import InterfaceError, OperationalError


from typing import overload, Any, ClassVar, TypeVar, TYPE_CHECKING
if TYPE_CHECKING:
    from collections.abc import Callable, Iterator
    from sqlalchemy import Column
    from sqlalchemy_utils.i18n import _TranslatableColumn
    from typing import Self

_T = TypeVar('_T')


MISSING = object()

DB_CONNECTION_ERRORS = (
    OperationalError,
    InterfaceError,
    psycopg2.OperationalError,
    psycopg2.InterfaceError,
)


#: The base for all OneGov Core ORM Models
class ModelBase:

    #: set by :class:`onegov.core.orm.cache.OrmCacheDescriptor`, this attribute
    #: indicates if the current model was loaded from cache
    is_cached = False

    # FIXME: These are temporary and help mypy know that these attributes
    #        exist on the Base ORM class
    __tablename__: ClassVar[str]

    @overload
    @classmethod
    def get_polymorphic_class(cls, identity_value: str) -> type['Self']: ...

    @overload
    @classmethod
    def get_polymorphic_class(cls, identity_value: str, default: _T
                              ) -> type['Self'] | _T: ...

    @classmethod
    def get_polymorphic_class(
        cls,
        identity_value: str,
        default: _T = MISSING  # type:ignore[assignment]
    ) -> type['Self'] | _T:
        """ Returns the polymorphic class if it exists, given the value
        of the polymorphic identity.

        Asserts that the identity is actually found, unless a default is
        provided.

        """
        mapper = inspect(cls).polymorphic_map.get(identity_value)

        if default is MISSING:
            assert mapper, 'No such polymorphic_identity: {}'.format(
                identity_value
            )

        return mapper and mapper.class_ or default

    @property
    def session_manager(self) -> SessionManager | None:
        # FIXME: Should we assert that there is an active SessionManager
        #        when we access this property? This would allow us not
        #        having to check that there is one everywhere, but there
        #        may some existing code that relies on this possibly
        #        returning `None`, so let's leave it for now
        return SessionManager.get_active()


[docs] Base = declarative_base(cls=ModelBase)
#: A translation hybrid integrated with OneGov Core. See also: #: http://sqlalchemy-utils.readthedocs.org/en/latest/internationalization.html
[docs] translation_hybrid = TranslationHybrid( current_locale=lambda: SessionManager.get_active().current_locale, # type:ignore default_locale=lambda: SessionManager.get_active().default_locale, # type:ignore )
class TranslationMarkupHybrid(TranslationHybrid): """ A TranslationHybrid that stores `markupsafe.Markup`. """ def getter_factory( self, attr: '_TranslatableColumn' ) -> 'Callable[[object], Markup | None]': original_getter = super().getter_factory(attr) def getter(obj: object) -> Markup | None: value = original_getter(obj) if value is self.default_value and isinstance(value, str): # NOTE: The default may be a plain string so we need # to escape it return escape(self.default_value) # NOTE: Need to wrap in Markup, we may consider sanitizing # this in the future, to guard against stored values # that somehow bypassed the sanitization, but this will # be expensive return Markup(value) # noqa: RUF035 return getter def setter_factory( self, attr: '_TranslatableColumn' ) -> 'Callable[[object, str | None], None]': original_setter = super().setter_factory(attr) def setter(obj: object, value: str | None) -> None: if value is not None: value = escape(value) original_setter(obj, value) return setter if TYPE_CHECKING: # FIXME: In SQLAlchemy 2.0 this should return a hybrid_property def __call__( # type: ignore[override] self, attr: _TranslatableColumn ) -> Column[Markup | None]: pass #: A translation markup hybrid integrated with OneGov Core. translation_markup_hybrid = TranslationMarkupHybrid( current_locale=lambda: SessionManager.get_active().current_locale, # type:ignore default_locale=lambda: SessionManager.get_active().default_locale, # type:ignore )
[docs] def find_models( base: type[_T], is_match: 'Callable[[type[_T]], bool]' ) -> 'Iterator[type[_T]]': """ Finds the ORM models in the given ORM base class that match a filter. The filter is called with each class in the instance and it is supposed to return True if it matches. For example, find all SQLAlchemy models that use :class:`~onegov.core.orm.mixins.ContentMixin`:: from onegov.core.orm.mixins import ContentMixin find_models(base, is_match=lambda cls: issubclass(cls, ContentMixin)) """ for cls in base.__subclasses__(): if is_match(cls): yield cls yield from find_models(cls, is_match)
def configure_listener( cls: type[Base], key: str, instance: Base ) -> None: """ The zope.sqlalchemy transaction mechanism doesn't recognize changes to cached objects. The following code intercepts all object changes and marks the transaction as changed if there was a change to a cached object. """ def mark_as_changed(obj: Base, *args: Any, **kwargs: Any) -> None: if obj.is_cached: mark_changed(object_session(obj)) event.listen(instance, 'append', mark_as_changed) event.listen(instance, 'remove', mark_as_changed) event.listen(instance, 'set', mark_as_changed) event.listen(instance, 'init_collection', mark_as_changed) event.listen(instance, 'dispose_collection', mark_as_changed) event.listen(ModelBase, 'attribute_instrument', configure_listener) def share_session_manager(query: 'Query[Any]') -> None: session_manager = SessionManager.get_active() for desc in query.column_descriptions: desc['type'].session_manager = session_manager event.listen(Query, 'before_compile', share_session_manager, retval=False) __all__ = [ 'Base', 'SessionManager', 'as_selectable', 'as_selectable_from_path', 'translation_hybrid', 'find_models', 'observes', 'orm_cached', 'query_schemas', 'request_cached' ]