Source code for core.orm.session_manager

import threading
import re
import weakref
import zope.sqlalchemy

from blinker import Signal
from onegov.core.cache import lru_cache
from onegov.core.custom import json
from sqlalchemy import create_engine, event, text
from sqlalchemy.orm import scoped_session, sessionmaker
from sqlalchemy.pool import QueuePool
from sqlalchemy.orm.query import Query
from sqlalchemy_utils.aggregates import manager as aggregates_manager

from typing import Any, TypeVar, TYPE_CHECKING
    from import Iterator
    from sqlalchemy.engine import Connection, Engine
    from sqlalchemy.orm.session import Session

[docs] _T = TypeVar('_T')
_S = TypeVar('_S', bound=Session) # FIXME: we can lift this out of type checking context in SQLAlchemy 2.0 class ForceFetchQueryClass(Query[_T]): def delete(self, synchronize_session: Any = None) -> int: ... else: class ForceFetchQueryClass(Query): """ Alters the builtin query class, ensuring that the delete query always fetches the data first, which is important for the bulk delete events to get the actual objects affected by the change. """ def delete(self, synchronize_session: Any = None) -> int: return super().delete('fetch') # Limits the lifteime of sessions for n seconds. Postgres will automatically # reap connections which are unused for longer and we will automatically # recreate connections which are activated after that. # # This boils down to connections being recycled once an hour. As each # connection is handled by a separate process by Postgres - a process which # may grow quite large - this alleviates memory fragmentation/high water mark # issues that we've been seeing on some servers.
[docs] CONNECTION_LIFETIME = 60 * 60
[docs] def query_schemas( connection: 'Connection | Engine', namespace: str | None = None ) -> 'Iterator[str]': """ Yields all schemas or the ones with the given namespace. """ query = text(""" SELECT schema_name FROM information_schema.schemata ORDER BY schema_name """) prefix = namespace and f'{namespace}-' for (schema, ) in connection.execute(query): if not prefix or schema.startswith(prefix): yield schema
[docs] class SessionManager: """ Holds sessions and creates schemas before binding sessions to schemas. Threadsafe in theory, but not tested or well thought out. No global state though, so two different session managers will manage different sessions/schemas. """ # describes the accepted characters in a schema name
[docs] _valid_schema_name = re.compile(r'^[a-z0-9_-]+$')
# describes the prefixes with which a schema may not begin
[docs] _invalid_prefixes = re.compile(r'^([0-9]|pg_)+')
# holds schemas that may never be used: # - information_schema is a sql standard schema that's for internal use # - public is the default schema name which should not be used for security # reasons. Someone could figure out a way to set the search_path to # default and to add an application that has the right to change the # public schema.
[docs] _reserved_schemas = {'information_schema', 'public', 'extensions'}
# holds thread local data
[docs] _thread_bound = threading.local()
def __init__( self, dsn: str, base: type[Any], # FIXME: use DeclarativeBase in SQLAlchemy 2.0 engine_config: dict[str, Any] | None = None, # TODO: TypedDict? session_config: dict[str, Any] | None = None, # TODO: TypedDict? pool_config: dict[str, Any] | None = None # TODO: TypedDict? ): """ Configures the data source name/dsn/database url and sets up the connection to the database. :dsn: Database connection string including user, password, host, port and database name. See: `<\ #database-urls>`_ :base: Declarative base used to define the SQLAlchemy database models. Extra bases may be added to the session manager after __init__:: mgr.bases.append(MyBase) The tables in these additional schemas are created on the schema alongside the primary base. :engine_config: Additional configuration passed to SQLAlchemy's `create_engine`. See: `<\ #sqlalchemy.create_engine>` :session_config: Additional configuration passed to SQLAlchemy's sessionmaker. See: `<\ #sqlalchemy.orm.session.sessionmaker>` :pool_config: Custom pool configuration to be used in lieue of the default pool. Only use under special circumstances. Note, to connect to another database you need to create a new SessionManager instance. **Postgres Extensions** The session manager supports the setup of postgres extensions. Currently those extensions are hard-coded and they are all added to the extensions schema. The extensions schema is then added to the search path of each query. Since extensions can only be created by superusers, this is something you might want to do in a separate step in deployment. We don't advise you to use a superuser connection for your onegov cloud deployment. You may therefore use the list of the required extensions below and create a schema 'extensions' with those extensions inside. **Signals** The session manager supports the following signals: ``self.on_insert`` Called with the schema and each object when an insert happens. ``self.on_update`` Called with the schema and each object when a change happens. ``self.on_delete`` Called with the scheam and each object when a change happens. Note that because those objects basically do not exist anymore, only the primary keys of those objects contain actual values! Signals abstracts SQLAlchemy ORM events. Since those handle events of the Session, only things happening on sessions provided by this sessionmanager are caught. Raw SQL queries, other sessions and other outside changes to the database are not caught! The signals are implemented using blinker: `<>`_ Note that signal handler functions are by default attached as weakrefs, that means if you create your signal handling function inside another function, the signal will stop working once the outer function completes. To avoid this behavior use ``connect_via``. See examples below: Examples:: mgr = SessionManager(dsn) # connect to the insert signal, disconnecting automatically once # the handler function is garbage collected @mgr.on_insert.connect def on_insert(schema, obj): print("Inserted {} @ {}".format(obj, schema)) # connect to the update signal, limited to a specific schema, also # disconnecting automatically once the handler function falls out # of scope @mgr.on_update.connect_via('my-schema') def on_update(schema, obj): print("Updated {} @ {}".format(obj, schema)) # connect to the delete signal for all schemas -> here, the # handler is strongly referenced and may be used in a closure from blinker import ANY @mgr.on_delete.connect_via(ANY, weak=False) def on_delete(schema, obj): print("Deleted {} @ {}".format(obj, schema)) """ assert 'postgres' in dsn, "Onegov only supports Postgres!" self.activate() engine_config = engine_config or {} session_config = session_config or {} pool_config = pool_config or {} self.dsn = dsn self.bases = [base] self.created_schemas: set[str] = set() self.current_schema: str | None = None self.on_insert = Signal() self.on_update = Signal() self.on_delete = Signal() # onegov.core creates extensions that it requires in a separate schema # in the future, this might become something we can configure through # the setuptools entry_points -> modules could advertise what they need # and the core would install the extensions the modules require self.required_extensions = {'btree_gist', 'hstore', 'unaccent'} self.created_extensions: set[str] = set() # override the isolation level in any case, we cannot allow another engine_config['isolation_level'] = 'SERIALIZABLE' # provide our custom serializer to the engine assert 'json_serializer' not in engine_config assert 'json_deserializer' not in engine_config engine_config['json_serializer'] = json.dumps engine_config['json_deserializer'] = json.loads if pool_config: engine_config.update(pool_config) else: # the default pool config should work for all onegov applications # # currently most applications use one single connection that is # reused between requests as well as one extra connection for each # instance (cronjob thread with session-based lock) engine_config['poolclass'] = QueuePool # connections which are kept open when returned to the pool engine_config['pool_size'] = 5 engine_config['pool_recycle'] = CONNECTION_LIFETIME - 1 # connections which are closed when returned (unlimited) engine_config['max_overflow'] = -1 self.engine = create_engine(self.dsn, **engine_config) self.register_engine(self.engine) self.session_factory = scoped_session( sessionmaker( bind=self.engine, query_cls=ForceFetchQueryClass, **session_config ), scopefunc=self._scopefunc, ) self.register_session(self.session_factory)
[docs] def register_engine(self, engine: 'Engine') -> None: """ Takes the given engine and registers it with the schema switching mechanism. Maybe used to register external engines with the session manager. If used like this, make sure to call :meth:`bind_session` before using the session provided by the external engine. """ @event.listens_for(engine, "before_cursor_execute") def activate_schema( connection: 'Connection', cursor: Any, *args: Any, **kwargs: Any ) -> None: """ Share the 'info' dictionary of Session with Connection objects. """ # execution options have priority! if 'schema' in connection._execution_options: # type:ignore schema = connection._execution_options['schema'] # type:ignore else: if 'session' in schema =['session'].info['schema'] else: schema = None if schema is not None: cursor.execute("SET search_path TO %s, extensions", (schema, )) @event.listens_for(engine, "before_cursor_execute") def limit_session_lifetime( connection: 'Connection', cursor: Any, *args: Any, **kwargs: Any ) -> None: """ Kills idle sessions after a while, freeing up memory. """ cursor.execute( "SET SESSION idle_in_transaction_session_timeout = %s", (f'{CONNECTION_LIFETIME}s', ) )
[docs] def register_session(self, session: 'Session') -> None: """ Takes the given session and registers it with zope.sqlalchemy and various orm events. """ signals = ( (self.on_insert, lambda session:, (self.on_update, lambda session: session.dirty), (self.on_delete, lambda session: session.deleted) ) # SQLAlchemy-Utils' aggregates decorator doesn't work correctly # with our sessions - we need to setup the appropriate event handler # manually. There's a test that makes sure that this is not done # twice. The cause of this has to be investigated still. event.listen( target=session, identifier='after_flush', fn=aggregates_manager.construct_aggregate_queries ) # This check we need in any case, since aggregates don't work with # bulk udpdates/deletes, which is something we make sure can't # happen by accident (it'll lead to hard to debug errors) cache_size = min(len(aggregates_manager.generator_registry), 32) @lru_cache(cache_size) def prevent_bulk_changes_on_aggregate_modules( module_class: type[Any] ) -> None: for registered in aggregates_manager.generator_registry: assert not issubclass(module_class, registered), """ Bulk queries run on models that use sqlalchemy-utils aggregates won't lead to a proper update. It's impossible to have both aggregates and bulk updates/deletes. """ @event.listens_for(session, 'after_flush') def on_after_flush( session: 'Session', flush_context: Any ) -> None: for signal, get_objects in signals: if signal.receivers: for obj in get_objects(session): signal.send(self.current_schema, obj=obj) @event.listens_for(session, 'after_bulk_update') def on_after_bulk_update(update_context: Any) -> None: prevent_bulk_changes_on_aggregate_modules( update_context.mapper.class_) if self.on_update.receivers: assert hasattr(update_context, 'matched_objects'), """ Bulk queries which use synchronize_session=False or synchronize_session='fetch' cannot be supported because it is unclear what rows were affected. Manually update values instead (even though it's way slower). There's no better solution at the moment. """ for obj in update_context.matched_objects: self.on_update.send(self.current_schema, obj=obj) @event.listens_for(session, 'after_bulk_delete') def on_after_bulk_delete(delete_context: Any) -> None: prevent_bulk_changes_on_aggregate_modules( delete_context.mapper.class_) if self.on_delete.receivers: for row in delete_context.matched_rows: obj = delete_context.mapper.class_(**{ v for c, v in zip(delete_context.mapper.primary_key, row) }) self.on_delete.send(self.current_schema, obj=obj) zope.sqlalchemy.register(session)
[docs] def activate(self) -> None: """ Sets the currently active session_manager - this is basically a global variable we require because we hook into the orm/query events where we don't know yet which session is going to be used and therefore cannot be sure about the used session manager either For example, the Document model below needs access to the current session to get the current locale, but since it is instantiated without any session information, this can't be known without a global variable: document = Document(localized_title="Das Dokument") We can only work around that with a global variable: session_manager.activate() document = Document(localized_title="Das Dokument") As a result session managers need to be activated, or they can't be used (at least for translated columns). To do so in tests, use: session_manager.activate() To ease the use of testing the last session_manager is however also automatically activated when the schema is set, which covers most use-cases outside of testing """ self.__class__.set_active(self)
[docs] def deactivate(self) -> None: """ Sets the currently active session manager to None, if it is equal to self. """ active = self.__class__.get_active() if active is None: return if active.__repr__.__self__ is self: # type:ignore[attr-defined] self.set_active(None)
[docs] def set_active(cls, session_manager: 'SessionManager | None') -> None: if session_manager: cls._thread_bound._active = weakref.proxy(session_manager) else: cls._thread_bound._active = None
[docs] def get_active(cls) -> 'SessionManager | None': try: return cls._thread_bound._active except (AttributeError, ReferenceError): return None
[docs] def set_locale( self, default_locale: str | None, current_locale: str | None ) -> None: """ Sets the default locale and the current locale so it may be shared with the translated ORM columns. Note that the locales may be NONE. It is up to the consumer of these settings to assert their existence. """ self.default_locale = default_locale self.current_locale = current_locale
[docs] def _scopefunc(self) -> tuple[threading.Thread, str | None]: """ Returns the scope of the scoped_session used to create new sessions. Relies on self.current_schema being set before the session is created. This function is as internal as they come and it exists only because we otherwise would have to create different session factories for each schema. """ return (threading.current_thread(), self.current_schema)
[docs] def dispose(self) -> None: """ Closes the connection to the server and cleans up. This only needed for testing. """ self.engine.raw_connection().invalidate() self.engine.dispose() self.deactivate()
[docs] def is_valid_schema(self, schema: str | None) -> bool: """ Returns True if the given schema looks valid enough to be created on the database with no danger of SQL injection or other unwanted sideeffects. """ if not schema: return False if schema in self._reserved_schemas: return False if self._invalid_prefixes.match(schema): return False # only one consecutive '-' is allowed (-- constitues a comment) if '--' in schema: return False return self._valid_schema_name.match(schema) and True or False
[docs] def set_current_schema(self, schema: str) -> None: """ Sets the current schema in use. The current schema is then used to bind the session to a schema. Note that this can't be done in a functional way. We need the current schema to generate a new scope. I would very much prefer to bind this to the session but this is not at all straight-forward with SQLAlchemy. I tried a solution like `this one <\ sqlalchemy/wiki/UsageRecipes/SessionModifiedSQL>`_, but it's not good enough, because it still relies on some kind of global stage, even if it's set locally. Ideally a value could be bound to the session and an event would trigger every time the connection is used with that session. We could then set the schema on the connection every time that happens. For now, the global option is okay though, because in practice we only set the schema once per request and we don't do threading anyway. """ assert self.is_valid_schema(schema) self.current_schema = schema self.ensure_schema_exists(schema) self.activate()
[docs] def create_schema(self, schema: str, validate: bool = True) -> None: """ Creates the given schema. If said schema exists, expect this method to throw an error. If you use :meth:`set_current_schema`, this is invoked automatically if needed. So you usually shouldn't have to care about this function. """ if validate: assert self.is_valid_schema(schema) # psycopg2 doesn't know how to correctly build a CREATE # SCHEMA statement, so we need to do it manually. # self.is_valid_schema should've checked that no sql # injections are possible. # # this is the *only* place where this happens - if anyone # knows how to do this using sqlalchemy/psycopg2, come forward! conn = self.engine.execution_options(schema=None) conn.execute('CREATE SCHEMA "{}"'.format(schema)) conn.execute('COMMIT')
[docs] def create_schema_if_not_exists( self, schema: str, validate: bool = True ) -> None: """ Creates the given schema if it doesn't exist yet. """ if not self.is_schema_found_on_database(schema): self.create_schema(schema, validate)
[docs] def bind_session(self, session: '_S') -> '_S': """ Bind the session to the current schema. """['schema'] = self.current_schema session.connection().info['session'] = weakref.proxy(session) return session
[docs] def session(self) -> 'Session': """ Returns a new session or an existing session. Sessions with different schemas are kept independent, though they might reuse each others connections. This means that a session retrieved thusly:: mgr = SessionManager('postgres://...') mgr.set_current_schema('foo') session = mgr.session() Will not see objects attached to this session:: mgr.set_current_schema('bar') session = mgr.session() """ return self.bind_session(self.session_factory())
[docs] def list_schemas(self, namespace: str | None = None) -> list[str]: """ Returns a tuple containing *all* schemas defined in the current database. """ return list(query_schemas(self.engine, namespace))
[docs] def is_schema_found_on_database(self, schema: str) -> bool: """ Returns True if the given schema exists on the database. """ conn = self.engine.execution_options(schema=None) result = conn.execute(text( "SELECT EXISTS(SELECT 1 FROM information_schema.schemata " "WHERE schema_name = :schema)" ), schema=schema) return result.first()[0]
[docs] def create_required_extensions(self) -> None: """ Creates the required extensions once per lifetime of the manager. """ if self.required_extensions != self.created_extensions: # extensions are a all added to a shared schema (a reserved schema) self.create_schema_if_not_exists('extensions', validate=False) conn = self.engine.execution_options(schema='extensions') for ext in self.required_extensions - self.created_extensions: conn.execute( 'CREATE EXTENSION IF NOT EXISTS "{}" ' 'SCHEMA "extensions"'.format(ext) ) conn.execute('COMMIT') self.created_extensions.add(ext)
[docs] def ensure_schema_exists(self, schema: str) -> None: """ Makes sure the schema exists on the database. If it doesn't, it is created. """ assert self.engine is not None if schema not in self.created_schemas: # XXX circular dependencies from onegov.core import upgrade # this is important because CREATE SCHEMA is done in a possibly # dangerous way! assert self.is_valid_schema(schema) # setup the extensions right before we activate our first schema self.create_required_extensions() self.create_schema_if_not_exists(schema) conn = self.engine.execution_options(schema=schema) declared_classes = set() try: for base in self.bases: base.metadata.schema = schema base.metadata.create_all(conn) declared_classes.update(base._decl_class_registry.values()) conn.execute('COMMIT') finally: # reset the schema on the global base variable - this state # sticks around otherwise and haunts us in the tests for base in self.bases: base.metadata.schema = None # if we have an upgrade state table, we want to prefill it with # all the current modules/tasks, to get the correct initial update # state (see # # we usually want to do that, but during testing, those upgrade # state classes may not exist, that's why we check if upgrade.UpgradeState in declared_classes: # we use a special session that is not registered with the # zope transaction extension -> we do this during a request # and we don't want the whole request to be commited, just # this small change. session = sessionmaker(bind=self.engine)() upgrade.register_all_modules_and_tasks(session) session.commit() self.created_schemas.add(schema)