core.orm.session_manager ======================== .. py:module:: core.orm.session_manager Attributes ---------- .. autoapisummary:: core.orm.session_manager.CONNECTION_LIFETIME Classes ------- .. autoapisummary:: core.orm.session_manager.ForceFetchQueryClass core.orm.session_manager.SessionManager Functions --------- .. autoapisummary:: core.orm.session_manager.query_schemas core.orm.session_manager.get_warning_stacklevel Module Contents --------------- .. py:class:: ForceFetchQueryClass[T](entities: Union[sqlalchemy.sql._typing._ColumnsClauseArgument[Any], Sequence[sqlalchemy.sql._typing._ColumnsClauseArgument[Any]]], session: Optional[sqlalchemy.orm.session.Session] = None) Bases: :py:obj:`sqlalchemy.orm.query.Query`\ [\ :py:obj:`T`\ ] Alters the builtin query class, ensuring that the delete query always fetches the data first, which is important for the bulk delete events to get the actual objects affected by the change. .. py:method:: delete(synchronize_session: Any = None, delete_args: dict[str, Any] | None = None) -> int Perform a DELETE with an arbitrary WHERE clause. Deletes rows matched by this query from the database. E.g.:: sess.query(User).filter(User.age == 25).delete(synchronize_session=False) sess.query(User).filter(User.age == 25).delete( synchronize_session="evaluate" ) .. warning:: See the section :ref:`orm_expression_update_delete` for important caveats and warnings, including limitations when using bulk UPDATE and DELETE with mapper inheritance configurations. :param synchronize_session: chooses the strategy to update the attributes on objects in the session. See the section :ref:`orm_expression_update_delete` for a discussion of these strategies. :param delete_args: Optional dictionary, if present will be passed to the underlying :func:`_expression.delete` construct as the ``**kw`` for the object. May be used to pass dialect-specific arguments such as ``mysql_limit``. .. versionadded:: 2.0.37 :return: the count of rows matched as returned by the database's "row count" feature. .. seealso:: :ref:`orm_expression_update_delete` .. py:data:: CONNECTION_LIFETIME :value: 3600 .. py:function:: query_schemas(connection: sqlalchemy.engine.Connection, namespace: str | None = None) -> collections.abc.Iterator[str] Yields all schemas or the ones with the given namespace. .. py:function:: get_warning_stacklevel() -> int .. py:class:: SessionManager(dsn: str, base: type[sqlalchemy.orm.DeclarativeBase], engine_config: dict[str, Any] | None = None, session_config: dict[str, Any] | None = None, pool_config: dict[str, Any] | None = None) Holds sessions and creates schemas before binding sessions to schemas. Threadsafe in theory, but not tested or well thought out. No global state though, so two different session managers will manage different sessions/schemas. .. py:attribute:: _valid_schema_name .. py:attribute:: _invalid_prefixes .. py:attribute:: _reserved_schemas .. py:attribute:: _thread_bound .. py:attribute:: dsn .. py:attribute:: bases .. py:attribute:: created_schemas :type: set[str] .. py:attribute:: current_schema :type: str | None :value: None .. py:attribute:: _ignore_bulk_updates :value: False .. py:attribute:: _ignore_bulk_deletes :value: False .. py:attribute:: on_schema_init .. py:attribute:: on_transaction_join .. py:attribute:: on_insert .. py:attribute:: on_update .. py:attribute:: on_delete .. py:attribute:: required_extensions .. py:attribute:: created_extensions :type: set[str] .. py:attribute:: engine :value: None .. py:attribute:: session_factory .. py:method:: ignore_bulk_updates() -> collections.abc.Iterator[Self] Ensures bulk updates don't get blocked when we can't emit update events for each changed object. In some cases we know that a bulk update is fine, even if it doesn't fully propagate through our event hooks. This function should only be rarely used however, where the speed increase outweighs the potential synchrocity issues. .. py:method:: ignore_bulk_deletes() -> collections.abc.Iterator[Self] Ensures bulk delete don't get blocked when we can't emit delete events for each changed object. In some cases we know that a bulk delete is fine, even if it doesn't fully propagate through our event hooks. This function should only be rarely used however, where the speed increase outweighs the potential synchrocity issues. .. py:method:: register_engine(engine: sqlalchemy.engine.Engine) -> None Takes the given engine and registers it with the schema switching mechanism. Maybe used to register external engines with the session manager. If used like this, make sure to call :meth:`bind_session` before using the session provided by the external engine. .. py:method:: register_session(session: sqlalchemy.orm.session.Session | sqlalchemy.orm.scoped_session[Any]) -> None Takes the given session and registers it with zope.sqlalchemy and various orm events. .. py:method:: activate() -> None Sets the currently active session_manager - this is basically a global variable we require because we hook into the orm/query events where we don't know yet which session is going to be used and therefore cannot be sure about the used session manager either For example, the Document model below needs access to the current session to get the current locale, but since it is instantiated without any session information, this can't be known without a global variable: document = Document(localized_title="Das Dokument") We can only work around that with a global variable: session_manager.activate() document = Document(localized_title="Das Dokument") As a result session managers need to be activated, or they can't be used (at least for translated columns). To do so in tests, use: session_manager.activate() To ease the use of testing the last session_manager is however also automatically activated when the schema is set, which covers most use-cases outside of testing .. py:method:: deactivate() -> None Sets the currently active session manager to None, if it is equal to self. .. py:method:: set_active(session_manager: SessionManager | None) -> None :classmethod: .. py:method:: get_active() -> SessionManager | None :classmethod: .. py:method:: set_locale(default_locale: str | None, current_locale: str | None) -> None Sets the default locale and the current locale so it may be shared with the translated ORM columns. Note that the locales may be NONE. It is up to the consumer of these settings to assert their existence. .. py:method:: _scopefunc() -> tuple[threading.Thread, str | None] Returns the scope of the scoped_session used to create new sessions. Relies on self.current_schema being set before the session is created. This function is as internal as they come and it exists only because we otherwise would have to create different session factories for each schema. .. py:method:: dispose() -> None Closes the connection to the server and cleans up. This only needed for testing. .. py:method:: is_valid_schema(schema: str | None) -> bool Returns True if the given schema looks valid enough to be created on the database with no danger of SQL injection or other unwanted sideeffects. .. py:method:: set_current_schema(schema: str) -> None Sets the current schema in use. The current schema is then used to bind the session to a schema. Note that this can't be done in a functional way. We need the current schema to generate a new scope. I would very much prefer to bind this to the session but this is not at all straight-forward with SQLAlchemy. I tried a solution like `this one `_, but it's not good enough, because it still relies on some kind of global stage, even if it's set locally. Ideally a value could be bound to the session and an event would trigger every time the connection is used with that session. We could then set the schema on the connection every time that happens. For now, the global option is okay though, because in practice we only set the schema once per request and we don't do threading anyway. .. py:method:: create_schema(schema: str, validate: bool = True) -> None Creates the given schema. If said schema exists, expect this method to throw an error. If you use :meth:`set_current_schema`, this is invoked automatically if needed. So you usually shouldn't have to care about this function. .. py:method:: create_schema_if_not_exists(schema: str, validate: bool = True) -> None Creates the given schema if it doesn't exist yet. .. py:method:: bind_session[T: sqlalchemy.orm.session.Session](session: T) -> T Bind the session to the current schema. .. py:method:: session() -> sqlalchemy.orm.session.Session Returns a new session or an existing session. Sessions with different schemas are kept independent, though they might reuse each others connections. This means that a session retrieved thusly:: mgr = SessionManager('postgres://...') mgr.set_current_schema('foo') session = mgr.session() Will not see objects attached to this session:: mgr.set_current_schema('bar') session = mgr.session() .. py:method:: list_schemas(namespace: str | None = None) -> list[str] Returns a tuple containing *all* schemas defined in the current database. .. py:method:: is_schema_found_on_database(schema: str) -> bool Returns True if the given schema exists on the database. .. py:method:: create_required_extensions() -> None Creates the required extensions once per lifetime of the manager. .. py:method:: ensure_schema_exists(schema: str) -> None Makes sure the schema exists on the database. If it doesn't, it is created.