使用事件跟踪查询、对象和 Session 更改

SQLAlchemy 提供了一个广泛的 事件监听 系统,该系统在 Core 和 ORM 中广泛使用。在 ORM 中,有各种各样的事件监听器钩子,这些钩子在 ORM 事件 中的 API 级别进行了记录。随着时间的推移,这些事件的集合不断增长,包括许多非常有用的新事件,以及一些不再像以前那么相关的旧事件。本节将尝试介绍主要的事件钩子以及它们可能的使用场景。

执行事件

版本 1.4 中新增: Session 现在提供了一个单一的综合钩子,旨在拦截 ORM 代表执行的所有 SELECT 语句,以及批量 UPDATE 和 DELETE 语句。这个钩子取代了以前的 QueryEvents.before_compile() 事件,以及 QueryEvents.before_compile_update()QueryEvents.before_compile_delete()

Session 提供了一个综合系统,通过该系统,所有通过 Session.execute() 方法调用的查询(包括由 Query 发出的所有 SELECT 语句,以及代表列和关系加载器发出的所有 SELECT 语句)都可能被拦截和修改。该系统利用了 SessionEvents.do_orm_execute() 事件钩子,以及 ORMExecuteState 对象来表示事件状态。

基本查询拦截

SessionEvents.do_orm_execute() 首先对任何类型的查询拦截非常有用,包括由 Query 使用 1.x 样式 发出的查询,以及当 ORM 启用的 2.0 样式 select()update()delete() 构造传递给 Session.execute() 时。ORMExecuteState 构造提供了访问器,允许修改语句、参数和选项。

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if orm_execute_state.is_select:
        # add populate_existing for all SELECT statements

        orm_execute_state.update_execution_options(populate_existing=True)

        # check if the SELECT is against a certain entity and add an
        # ORDER BY if so
        col_descriptions = orm_execute_state.statement.column_descriptions

        if col_descriptions[0]["entity"] is MyEntity:
            orm_execute_state.statement = statement.order_by(MyEntity.name)

上面的示例说明了对 SELECT 语句的一些简单修改。在这个层面上,SessionEvents.do_orm_execute() 事件钩子旨在取代以前 QueryEvents.before_compile() 事件的使用,该事件并非针对各种类型的加载器一致地触发;此外,QueryEvents.before_compile() 仅适用于 1.x 样式Query 的使用,而不适用于 2.0 样式Session.execute() 的使用。

添加全局 WHERE / ON 条件

最常请求的查询扩展功能之一是能够向所有实体的所有查询添加 WHERE 条件。这可以通过使用 with_loader_criteria() 查询选项来实现,该选项可以单独使用,也可以在 SessionEvents.do_orm_execute() 事件中使用。

from sqlalchemy.orm import with_loader_criteria

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(MyEntity.public == True)
        )

在上面,一个选项被添加到所有 SELECT 语句中,该选项将限制针对 MyEntity 的所有查询,以过滤 public == True 的内容。该条件将应用于在当前查询范围内对该类的所有加载。默认情况下,with_loader_criteria() 选项会自动传播到关系加载器,这将应用于后续的关系加载,包括延迟加载、选择性加载等。

对于一系列都具有某些公共列结构的类,如果这些类使用 声明式混合类 组成,则混合类本身可以与 with_loader_criteria() 选项结合使用,使用 Python lambda。Python lambda 将在查询编译时针对符合条件的特定实体调用。假设一系列基于名为 HasTimestamp 的混合类的类

import datetime


class HasTimestamp:
    timestamp = mapped_column(DateTime, default=datetime.datetime.now)


class SomeEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)


class SomeOtherEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)

上面的类 SomeEntitySomeOtherEntity 都有一个名为 timestamp 的列,该列默认为当前日期和时间。可以使用事件来拦截所有从 HasTimestamp 扩展的类,并过滤其 timestamp 列,该列不早于一个月前。

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)

        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(
                HasTimestamp,
                lambda cls: cls.timestamp >= one_month_ago,
                include_aliases=True,
            )
        )

警告

在调用 with_loader_criteria() 中使用 lambda 表达式,每个唯一的类仅会调用 **一次**。 自定义函数不应该在该 lambda 表达式内调用。 有关“lambda SQL”功能的概述,请参见 使用 Lambda 来显著提高语句生成的速度,此功能仅供高级用户使用。

另见

ORM 查询事件 - 包括上述 with_loader_criteria() 方法的示例。

重新执行语句

深度炼金术

语句重新执行功能涉及一个略微复杂的递归序列,旨在解决将 SQL 语句的执行重新路由到各种非 SQL 上下文中的相当困难的问题。 以下链接的“Dogpile 缓存”和“水平分片”的双重示例应该用作在何时使用此相当高级功能的指南。

ORMExecuteState 能够控制给定语句的执行; 这包括能够完全不调用语句,允许改为返回从缓存中检索的预构造结果集,以及能够使用不同的状态重复调用同一个语句,例如在多个数据库连接上调用它,然后将结果合并到内存中。 这两种高级模式都在 SQLAlchemy 的示例集中展示,如下所述。

SessionEvents.do_orm_execute() 事件挂钩内,该 ORMExecuteState.invoke_statement() 方法可用于使用 Session.execute() 的新嵌套调用来调用语句,这将抢先执行当前正在执行的后续处理,而改为返回由内部执行返回的 Result。 在此过程中,在此嵌套调用中也会跳过针对 SessionEvents.do_orm_execute() 挂钩迄今为止调用的事件处理程序。

ORMExecuteState.invoke_statement() 方法返回一个 Result 对象; 该对象然后具有将自身“冻结”为可缓存格式和“解冻”为新的 Result 对象的能力,以及将自身数据与其他 Result 对象的数据合并的能力。

例如,使用 SessionEvents.do_orm_execute() 来实现缓存

from sqlalchemy.orm import loading

cache = {}


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if "my_cache_key" in orm_execute_state.execution_options:
        cache_key = orm_execute_state.execution_options["my_cache_key"]

        if cache_key in cache:
            frozen_result = cache[cache_key]
        else:
            frozen_result = orm_execute_state.invoke_statement().freeze()
            cache[cache_key] = frozen_result

        return loading.merge_frozen_result(
            orm_execute_state.session,
            orm_execute_state.statement,
            frozen_result,
            load=False,
        )

有了上述挂钩,使用缓存的示例将如下所示

stmt = (
    select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy")
)

result = session.execute(stmt)

上面,自定义执行选项传递给 Select.execution_options(),以建立一个“缓存键”,然后由 SessionEvents.do_orm_execute() 挂钩拦截。 然后将此缓存键与可能存在于缓存中的 FrozenResult 对象匹配,如果存在,则会重新使用该对象。 该方法使用 Result.freeze() 方法来“冻结”一个 Result 对象,该对象上面将包含 ORM 结果,以便它可以存储在缓存中并使用多次。 为了从“冻结”的结果中返回实时结果,merge_frozen_result() 函数用于将“冻结”的数据从结果对象合并到当前会话中。

上面的示例在 Dogpile 缓存 中作为一个完整的示例实现。

ORMExecuteState.invoke_statement() 方法也可以被调用多次,将不同的信息传递给 ORMExecuteState.invoke_statement.bind_arguments 参数,以便该 Session 每次都会使用不同的 Engine 对象。 这将每次返回一个不同的 Result 对象; 这些结果可以使用 Result.merge() 方法合并在一起。 这是 水平分片 扩展采用的技术; 请参阅源代码以熟悉。

持久性事件

可能最广泛使用的事件系列是“持久性”事件,它们对应于 刷新过程。 刷新是在对象的所有更改都会被确定并以 INSERT、UPDATE 和 DELETE 语句的形式发送到数据库的地方。

before_flush()

SessionEvents.before_flush() 挂钩是当应用程序希望确保在刷新进行时对数据库进行额外的持久性更改时,最普遍有用的事件。 使用 SessionEvents.before_flush() 来操作对象以验证其状态,以及在持久化对象之前组合额外的对象和引用。 在此事件中,**可以安全地操作会话的状态**,即,可以将新对象附加到会话,可以删除对象,可以自由地更改对象上的单个属性,并且这些更改将在事件挂钩完成后被拉入刷新过程。

典型的 SessionEvents.before_flush() 挂钩将负责扫描集合 Session.newSession.dirtySession.deleted,以查找正在发生更改的对象。

有关 SessionEvents.before_flush() 的说明,请参阅 使用历史记录表进行版本控制使用时间戳行进行版本控制 等示例。

after_flush()

SessionEvents.after_flush() 挂钩在为刷新过程发出 SQL 之后调用,但在**更改**被刷新的对象的 state 之前调用。 也就是说,你仍然可以检查 Session.newSession.dirtySession.deleted 集合以查看刚刚刷新的内容,你还可以使用历史记录跟踪功能(例如 AttributeState 提供的功能)来查看刚刚持久化的更改。 在 SessionEvents.after_flush() 事件中,可以根据观察到的更改对数据库发出额外的 SQL。

after_flush_postexec()

SessionEvents.after_flush_postexec() 在调用 SessionEvents.after_flush() 之后不久被调用,但它是在修改对象状态以反映刚完成的刷新的操作之后执行的。此时,Session.newSession.dirtySession.deleted 集合通常都完全为空。可以使用 SessionEvents.after_flush_postexec() 检查标识映射以获取最终对象,并可能发出额外的 SQL 语句。在此钩子中,可以对对象进行新的更改,这意味着 Session 再次进入“脏”状态;如果在 Session.commit() 的上下文中调用了刷新,那么 Session 的机制将导致它再次刷新,如果在此钩子中检测到新的更改;否则,挂起的更改将作为下一次正常刷新的部分打包。当钩子在 Session.commit() 内检测到新的更改时,一个计数器将确保在这种情况下,在 100 次迭代后停止无限循环,以防 SessionEvents.after_flush_postexec() 钩子每次被调用时都在不断添加要刷新的新状态。

映射器级别的刷新事件

除了刷新级别的钩子外,还有一套更细粒度的钩子,它们在每个对象的基础上被调用,并且根据刷新过程中的 INSERT、UPDATE 或 DELETE 被分解。这些是映射器持久化钩子,它们也很受欢迎,但是这些事件需要更谨慎地处理,因为它们在正在进行的刷新过程的上下文中进行;许多操作在此是不安全的。

这些事件是

注意

重要的是要注意,这些事件适用于 会话刷新操作适用于 ORM 启用的 INSERT、UPDATE 和 DELETE 语句 中描述的 ORM 级别的 INSERT/UPDATE/DELETE 功能。要拦截 ORM 级别的 DML,请使用 SessionEvents.do_orm_execute() 事件。

每个事件都将传递 Mapper、映射对象本身以及正在用于发出 INSERT、UPDATE 或 DELETE 语句的 Connection。这些事件的吸引力是显而易见的,如果应用程序想要将某些活动绑定到特定类型的对象使用 INSERT 持久化时,则该钩子非常具体;与 SessionEvents.before_flush() 事件不同,不需要搜索 Session.new 等集合以查找目标。但是,当调用这些事件时,表示要发出的每个 INSERT、UPDATE、DELETE 语句的完整列表的刷新计划已经确定,并且在此阶段不能进行任何更改。因此,唯一可能对给定对象进行的更改是在对象行本地的属性上。对对象或其他对象进行的任何其他更改都会影响 Session 的状态,这将无法正常运行。

在这些映射器级别的持久化事件中不支持的操作包括

  • Session.add()

  • Session.delete()

  • 映射集合附加、添加、删除、删除、丢弃等。

  • 映射关系属性设置/删除事件,例如 someobject.related = someotherobject

传递 Connection 的原因是,鼓励在此执行简单的 SQL 操作,直接在 Connection 上执行,例如增加计数器或在日志表中插入额外的行。

还有许多不需要在刷新事件中处理的每个对象操作。最常见的替代方法是在对象的 __init__() 方法内部简单地建立额外的状态以及对象,例如创建要与新对象关联的额外对象。使用 简单验证器 中描述的验证器是另一种方法;这些函数可以拦截对属性的更改,并根据属性更改对目标对象建立额外的状态更改。使用这两种方法,对象在进入刷新步骤之前都处于正确状态。

对象生命周期事件

事件的另一个用例是跟踪对象的生命周期。这指的是在 对象状态快速介绍 中首次介绍的状态。

以上所有状态都可以使用事件完全跟踪。每个事件都代表一个不同的状态转换,这意味着起始状态和目标状态都是跟踪的一部分。除了初始瞬态事件之外,所有事件都与 Session 对象或类有关,这意味着它们可以与特定 Session 对象关联

from sqlalchemy import event
from sqlalchemy.orm import Session

session = Session()


@event.listens_for(session, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

或者与 Session 类本身关联,以及与特定 sessionmaker 关联,这可能是最实用的形式

from sqlalchemy import event
from sqlalchemy.orm import sessionmaker

maker = sessionmaker()


@event.listens_for(maker, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

监听器当然可以堆叠在一个函数之上,这可能是很常见的。例如,要跟踪所有进入持久状态的对象

@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
    print("object is now persistent: %s" % instance)

瞬态

所有映射对象在首次构造时都作为 瞬态 开始。在这种状态下,对象单独存在,并且与任何 Session 没有关联。对于此初始状态,没有特定的“转换”事件,因为没有 Session,但是如果要拦截任何瞬态对象的创建,InstanceEvents.init() 方法可能是最好的事件。此事件应用于特定类或超类。例如,要拦截特定声明式基类的所有新对象

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import event


class Base(DeclarativeBase):
    pass


@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
    print("new transient: %s" % instance)

瞬态到挂起

瞬态对象在首次通过 Session.add()Session.add_all() 方法与 Session 关联时变为 挂起。对象也可能由于来自显式添加的引用对象的 “级联” 而成为 Session 的一部分。可以使用 SessionEvents.transient_to_pending() 事件检测瞬态到挂起的转换

@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
    print("transient to pending: %s" % object_)

挂起到持久

当 flush 操作进行并对该实例执行 INSERT 语句时,pending 对象会变为 persistent。现在该对象具有标识键。使用 SessionEvents.pending_to_persistent() 事件跟踪 pending 到 persistent 的转换。

@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
    print("pending to persistent: %s" % object_)

Pending 到 Transient

如果在 pending 对象被 flush 之前调用了 Session.rollback() 方法,或者在 pending 对象被 flush 之前对该对象调用了 Session.expunge() 方法,则 pending 对象可以恢复为 transient 状态。使用 SessionEvents.pending_to_transient() 事件跟踪 pending 到 transient 的转换。

@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
    print("transient to pending: %s" % object_)

加载为 Persistent

当从数据库中加载对象时,它们可以直接在 Session 中以 persistent 状态出现。跟踪此状态转换等同于跟踪对象被加载时的状态,并且等同于使用 InstanceEvents.load() 实例级事件。但是,提供了 SessionEvents.loaded_as_persistent() 事件作为以会话为中心的钩子,用于拦截通过此特定途径进入 persistent 状态的对象。

@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
    print("object loaded into persistent state: %s" % object_)

Persistent 到 Transient

如果对包含该对象首次被添加为 pending 的事务调用了 Session.rollback() 方法,则 persistent 对象可以恢复到 transient 状态。在 ROLLBACK 的情况下,使该对象变为 persistent 的 INSERT 语句会被回滚,并且该对象会被从 Session 中逐出,再次变为 transient 状态。使用 SessionEvents.persistent_to_transient() 事件钩子跟踪从 persistent 恢复到 transient 状态的对象。

@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
    print("persistent to transient: %s" % object_)

Persistent 到 Deleted

当在 flush 过程中从数据库中删除一个标记为删除的对象时,persistent 对象会进入 deleted 状态。请注意,这与为目标对象调用 Session.delete() 方法 不同Session.delete() 方法仅 标记 对象为删除,实际的 DELETE 语句要等到 flush 进行后才会发出。在 flush 操作之后,目标对象才会处于“deleted”状态。

在“deleted”状态下,对象只是与 Session 有一点关联。它不在身份映射中,也不在 Session.deleted 集合中,该集合指的是它被标记为删除时的情况。

从“deleted”状态,对象可以在事务提交时进入 detached 状态,或者在事务回滚时恢复到 persistent 状态。

使用 SessionEvents.persistent_to_deleted() 跟踪 persistent 到 deleted 的转换。

@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
    print("object was DELETEd, is now in deleted state: %s" % object_)

Deleted 到 Detached

当会话的事务提交时,deleted 对象会变为 detached 状态。调用了 Session.commit() 方法之后,数据库事务会最终完成,Session 会完全丢弃 deleted 对象并移除与它的所有关联。使用 SessionEvents.deleted_to_detached() 跟踪 deleted 到 detached 的转换。

@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
    print("deleted to detached: %s" % object_)

注意

当对象处于 deleted 状态时,InstanceState.deleted 属性(可以通过 inspect(object).deleted 访问)会返回 True。但是,当对象被 detached 时,InstanceState.deleted 会再次返回 False。要检测对象是否被删除(无论它是否被 detached),请使用 InstanceState.was_deleted 访问器。

Persistent 到 Detached

当通过 Session.expunge()Session.expunge_all()Session.close() 方法将对象与 Session 解关联时,persistent 对象会变为 detached 状态。

注意

如果对象所属的 Session 被应用程序取消引用并由于垃圾回收而被丢弃,则对象也会变为 隐式 detached。在这种情况下,不会发出任何事件

使用 SessionEvents.persistent_to_detached() 事件跟踪对象从 persistent 到 detached 的转换。

@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
    print("object became detached: %s" % object_)

Detached 到 Persistent

当使用 Session.add() 或等效方法将 detached 对象与会话重新关联时,detached 对象会变为 persistent 状态。使用 SessionEvents.detached_to_persistent() 事件跟踪对象从 detached 恢复到 persistent 的转换。

@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
    print("object became persistent again: %s" % object_)

Deleted 到 Persistent

当使用 Session.rollback() 方法回滚删除该对象的 DELETEd 操作的事务时,deleted 对象可以恢复到 persistent 状态。使用 SessionEvents.deleted_to_persistent() 事件跟踪 deleted 对象恢复到 persistent 状态。

@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
    print("deleted to persistent: %s" % object_)

事务事件

事务事件允许应用程序在 Session 级别发生事务边界时以及当 Session 更改 Connection 对象上的事务状态时收到通知。

属性更改事件

属性更改事件允许拦截对象上的特定属性何时被修改。这些事件包括AttributeEvents.set()AttributeEvents.append()以及AttributeEvents.remove()。这些事件非常有用,特别是对于每个对象的验证操作;然而,使用“验证器”钩子通常更方便,它在幕后使用这些钩子;有关此背景信息,请参见简单验证器。属性事件还在反向引用机制的背后。说明使用属性事件的示例在属性检测中。