使用事件跟踪查询、对象和 Session 更改

SQLAlchemy 具有广泛的 事件监听 系统,该系统贯穿 Core 和 ORM。在 ORM 中,有各种各样的事件监听器钩子,这些钩子在 ORM 事件 的 API 级别进行了文档化。多年来,此事件集合不断发展,包括许多非常有用的新事件以及一些不如以前那么相关的旧事件。本节将尝试介绍主要的事件钩子以及何时可能使用它们。

执行事件

1.4 版本新增: Session 现在具有一个全面的钩子,旨在拦截代表 ORM 执行的所有 SELECT 语句以及批量 UPDATE 和 DELETE 语句。此钩子取代了之前的 QueryEvents.before_compile() 事件,以及 QueryEvents.before_compile_update()QueryEvents.before_compile_delete()

Session 具有一个全面的系统,通过该系统,所有通过 Session.execute() 方法调用的查询(包括 Query 发出的所有 SELECT 语句以及代表列和关系加载器发出的所有 SELECT 语句)都可以被拦截和修改。该系统使用 SessionEvents.do_orm_execute() 事件钩子以及 ORMExecuteState 对象来表示事件状态。

基本查询拦截

SessionEvents.do_orm_execute() 首先适用于任何类型的查询拦截,包括 Query1.x 样式 发出的查询,以及当启用 ORM 的 2.0 样式 select()update()delete() 构造传递给 Session.execute() 时。ORMExecuteState 构造提供访问器以允许修改语句、参数和选项

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if orm_execute_state.is_select:
        # add populate_existing for all SELECT statements

        orm_execute_state.update_execution_options(populate_existing=True)

        # check if the SELECT is against a certain entity and add an
        # ORDER BY if so
        col_descriptions = orm_execute_state.statement.column_descriptions

        if col_descriptions[0]["entity"] is MyEntity:
            orm_execute_state.statement = statement.order_by(MyEntity.name)

上面的示例说明了对 SELECT 语句的一些简单修改。在此级别,SessionEvents.do_orm_execute() 事件钩子旨在取代以前使用的 QueryEvents.before_compile() 事件,该事件对于各种加载器而言并非始终如一地触发;此外,QueryEvents.before_compile() 仅适用于 1.x 样式Query 的使用,而不适用于 2.0 样式Session.execute() 的使用。

添加全局 WHERE / ON 条件

最常请求的查询扩展功能之一是能够将 WHERE 条件添加到所有查询中实体的所有出现项。这可以通过使用 with_loader_criteria() 查询选项来实现,该选项可以单独使用,或者非常适合在 SessionEvents.do_orm_execute() 事件中使用

from sqlalchemy.orm import with_loader_criteria

Session = sessionmaker(engine)


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(MyEntity.public == True)
        )

上面,一个选项被添加到所有 SELECT 语句中,这将限制针对 MyEntity 的所有查询,以按 public == True 进行过滤。该条件将应用于立即查询范围内该类的所有加载。with_loader_criteria() 选项默认情况下也会自动传播到关系加载器,这将应用于后续关系加载,包括延迟加载、selectinloads 等。

对于一系列都具有某些公共列结构的类,如果这些类是使用 声明性混入 组成的,则混入类本身可以与 with_loader_criteria() 选项结合使用,方法是使用 Python lambda。Python lambda 将在查询编译时针对与条件匹配的特定实体调用。给定一系列基于名为 HasTimestamp 的混入的类

import datetime


class HasTimestamp:
    timestamp = mapped_column(DateTime, default=datetime.datetime.now)


class SomeEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)


class SomeOtherEntity(HasTimestamp, Base):
    __tablename__ = "some_entity"
    id = mapped_column(Integer, primary_key=True)

上面的类 SomeEntitySomeOtherEntity 将各自具有一个列 timestamp,该列默认为当前日期和时间。可以使用事件来拦截所有从 HasTimestamp 扩展的对象,并在不早于一个月前的日期过滤其 timestamp

@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if (
        orm_execute_state.is_select
        and not orm_execute_state.is_column_load
        and not orm_execute_state.is_relationship_load
    ):
        one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)

        orm_execute_state.statement = orm_execute_state.statement.options(
            with_loader_criteria(
                HasTimestamp,
                lambda cls: cls.timestamp >= one_month_ago,
                include_aliases=True,
            )
        )

警告

在调用 with_loader_criteria() 内部使用 lambda 仅对每个唯一类调用一次。自定义函数不应在此 lambda 中调用。有关“lambda SQL”功能的概述,请参阅 使用 Lambdas 为语句生成添加显著的速度提升,该功能仅供高级使用。

另请参阅

ORM 查询事件 - 包括上述 with_loader_criteria() 食谱的工作示例。

重新执行语句

深度炼金术

语句重新执行功能涉及稍微复杂的递归序列,旨在解决将 SQL 语句的执行重新路由到各种非 SQL 上下文中的相当困难的问题。下面链接的“dogpile 缓存”和“水平分片”的双重示例应用作指导,以了解何时适合使用此相当高级的功能。

ORMExecuteState 能够控制给定语句的执行;这包括完全不调用语句的能力(允许返回从缓存检索的预先构造的结果集),以及使用不同的状态重复调用同一语句的能力,例如针对多个数据库连接调用它,然后在内存中合并结果。SQLAlchemy 的示例套件中详细说明了这两种高级模式,如下所示。

当在 SessionEvents.do_orm_execute() 事件钩子内部时,可以使用 ORMExecuteState.invoke_statement() 方法来调用语句,方法是使用 Session.execute() 的新嵌套调用,这将抢占正在进行的当前执行的后续处理,而是返回内部执行返回的 Result。到目前为止在此过程中为 SessionEvents.do_orm_execute() 钩子调用的事件处理程序也将在这种嵌套调用中被跳过。

ORMExecuteState.invoke_statement() 方法返回一个 Result 对象;然后,该对象具有将其“冻结”为可缓存格式并“解冻”为新的 Result 对象的能力,以及将其数据与其他 Result 对象的数据合并的能力。

例如,使用 SessionEvents.do_orm_execute() 来实现缓存

from sqlalchemy.orm import loading

cache = {}


@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
    if "my_cache_key" in orm_execute_state.execution_options:
        cache_key = orm_execute_state.execution_options["my_cache_key"]

        if cache_key in cache:
            frozen_result = cache[cache_key]
        else:
            frozen_result = orm_execute_state.invoke_statement().freeze()
            cache[cache_key] = frozen_result

        return loading.merge_frozen_result(
            orm_execute_state.session,
            orm_execute_state.statement,
            frozen_result,
            load=False,
        )

使用上面的钩子到位,使用缓存的示例将如下所示

stmt = (
    select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy")
)

result = session.execute(stmt)

上面,自定义执行选项被传递给 Select.execution_options(),以便建立一个“缓存键”,然后该缓存键将被 SessionEvents.do_orm_execute() 钩子拦截。然后,此缓存键与可能存在于缓存中的 FrozenResult 对象匹配,如果存在,则重新使用该对象。该配方使用 Result.freeze() 方法来“冻结”一个 Result 对象,该对象在上面将包含 ORM 结果,以便可以将其存储在缓存中并多次使用。为了从“冻结”结果返回实时结果,merge_frozen_result() 函数用于将结果对象中的“冻结”数据合并到当前会话中。

上面的示例在 Dogpile 缓存 中作为完整示例实现。

ORMExecuteState.invoke_statement() 方法也可以多次调用,将不同的信息传递给 ORMExecuteState.invoke_statement.bind_arguments 参数,以便 Session 每次都将使用不同的 Engine 对象。这将每次返回一个不同的 Result 对象;可以使用 Result.merge() 方法将这些结果合并在一起。这是 水平分片 扩展采用的技术;请参阅源代码以熟悉。

持久化事件

可能最广泛使用的一系列事件是“持久化”事件,这些事件与 flush 过程 相对应。flush 是在其中做出有关对象挂起更改的所有决策,然后以 INSERT、UPDATE 和 DELETE 语句的形式发出到数据库。

before_flush()

SessionEvents.before_flush() 钩子是应用程序希望确保在 flush 继续进行时对数据库进行其他持久性更改时,迄今为止最常用的事件。使用 SessionEvents.before_flush() 以对对象进行操作以验证其状态,以及在对象持久化之前组合其他对象和引用。在此事件中,可以安全地操作 Session 的状态,也就是说,可以将新对象附加到它,可以删除对象,并且可以自由更改对象上的各个属性,并且当事件钩子完成时,这些更改将被拉入 flush 过程。

典型的 SessionEvents.before_flush() 钩子将负责扫描集合 Session.newSession.dirtySession.deleted,以查找将要发生某些事情的对象。

有关 SessionEvents.before_flush() 的说明,请参见诸如 使用历史记录表进行版本控制使用时间行进行版本控制 之类的示例。

after_flush()

SessionEvents.after_flush() 钩子在 flush 过程的 SQL 发出后调用,但在之前刷新对象的对象状态已更改。也就是说,您仍然可以检查 Session.newSession.dirtySession.deleted 集合,以查看刚刚刷新了什么,并且您还可以使用历史记录跟踪功能(例如 AttributeState 提供的功能)来查看刚刚持久化了哪些更改。在 SessionEvents.after_flush() 事件中,可以根据观察到的更改向数据库发出其他 SQL。

after_flush_postexec()

SessionEvents.after_flush_postexec()SessionEvents.after_flush() 之后不久被调用,但在之后修改了对象的状态以说明刚刚发生的 flush。Session.newSession.dirtySession.deleted 集合通常在此处完全为空。使用 SessionEvents.after_flush_postexec() 来检查已完成对象的标识映射,并可能发出其他 SQL。在此钩子中,可以对对象进行新的更改,这意味着 Session 将再次进入“脏”状态;如果在此钩子中检测到新更改(如果 flush 是在 Session.commit() 的上下文中调用的),则 Session 的机制将导致它再次刷新;否则,挂起的更改将捆绑为下一个正常 flush 的一部分。当钩子在 Session.commit() 中检测到新更改时,计数器将确保在此方面在 100 次迭代后停止无休止的循环,以防 SessionEvents.after_flush_postexec() 钩子每次调用时都不断添加要刷新的新状态。

Mapper 级别 Flush 事件

除了 flush 级别钩子之外,还有一套更细粒度的钩子,因为它们是基于每个对象调用的,并且在 flush 过程中根据 INSERT、UPDATE 或 DELETE 分解。这些是 mapper 持久性钩子,它们也很受欢迎,但是需要更加谨慎地对待这些事件,因为它们在已经进行的 flush 过程的上下文中进行;许多操作在这里进行是不安全的。

事件是

注意

重要的是要注意,这些事件仅适用于 会话 flush 操作,而适用于 ORM 启用 INSERT、UPDATE 和 DELETE 语句 中描述的 ORM 级别 INSERT/UPDATE/DELETE 功能。要拦截 ORM 级别 DML,请使用 SessionEvents.do_orm_execute() 事件。

每个事件都传递 Mapper、映射的对象本身以及用于发出 INSERT、UPDATE 或 DELETE 语句的 Connection。这些事件的吸引力显而易见,因为如果应用程序希望将某些活动与特定类型的对象使用 INSERT 持久化时联系起来,则钩子非常具体;与 SessionEvents.before_flush() 事件不同,无需搜索诸如 Session.new 之类的集合以查找目标。但是,当调用这些事件时,表示要发出的每个 INSERT、UPDATE、DELETE 语句的完整列表的 flush 计划已经确定,并且在此阶段不得进行任何更改。因此,对给定对象唯一可能进行的更改是对象行的本地属性。对对象或其他对象的任何其他更改都将影响 Session 的状态,这将导致其无法正常运行。

在这些 mapper 级别持久性事件中不支持的操作包括

  • Session.add()

  • Session.delete()

  • 映射的集合 append、add、remove、delete、discard 等。

  • 映射的关系属性 set/del 事件,即 someobject.related = someotherobject

传递 Connection 的原因是鼓励在此处直接在 Connection 上执行简单 SQL 操作,例如递增计数器或在日志表中插入额外的行。

还有许多基于每个对象的操作根本不需要在 flush 事件中处理。最常见的替代方法是简单地在其 __init__() 方法中与对象一起建立其他状态,例如创建要与新对象关联的其他对象。使用 简单验证器 中描述的验证器是另一种方法;这些函数可以拦截对属性的更改,并响应属性更改在目标对象上建立其他状态更改。通过这两种方法,对象在进入 flush 步骤之前处于正确的状态。

对象生命周期事件

事件的另一个用例是跟踪对象的生命周期。这指的是在 对象状态快速入门 中首次介绍的状态。

上面的所有状态都可以使用事件完全跟踪。每个事件都代表一个不同的状态转换,这意味着,起始状态和目标状态都是跟踪内容的一部分。除了初始瞬态事件外,所有事件都以 Session 对象或类为单位,这意味着它们可以与特定的 Session 对象关联

from sqlalchemy import event
from sqlalchemy.orm import Session

session = Session()


@event.listens_for(session, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

或与 Session 类本身关联,以及与特定的 sessionmaker 关联,这可能是最有用的形式

from sqlalchemy import event
from sqlalchemy.orm import sessionmaker

maker = sessionmaker()


@event.listens_for(maker, "transient_to_pending")
def object_is_pending(session, obj):
    print("new pending: %s" % obj)

监听器当然可以堆叠在一个函数之上,这很可能是常见的。例如,要跟踪所有正在进入持久状态的对象

@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
    print("object is now persistent: %s" % instance)

瞬态

所有映射对象在首次构建时都以瞬态 (transient)状态开始。在这种状态下,对象独立存在,并且不与任何 Session 关联。对于这种初始状态,没有特定的“转换”事件,因为没有 Session。但是,如果想要拦截任何瞬态对象被创建的时刻,InstanceEvents.init() 方法可能是最好的事件。此事件应用于特定的类或超类。例如,要拦截特定声明基类的所有新对象

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import event


class Base(DeclarativeBase):
    pass


@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
    print("new transient: %s" % instance)

瞬态到待定 (Pending)

当瞬态对象首次通过 Session.add()Session.add_all() 方法与 Session 关联时,它会变为 待定 (pending) 状态。对象也可能由于来自显式添加的引用对象的 “级联 (cascade)” 而成为 Session 的一部分。可以使用 SessionEvents.transient_to_pending() 事件检测从瞬态到待定的转换

@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
    print("transient to pending: %s" % object_)

待定到持久 (Persistent)

当 flush 操作继续并且为实例执行 INSERT 语句时,待定 (pending) 对象变为 持久 (persistent) 状态。对象现在具有身份键 (identity key)。使用 SessionEvents.pending_to_persistent() 事件跟踪从待定到持久的转换

@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
    print("pending to persistent: %s" % object_)

待定到瞬态 (Transient)

如果在待定对象被 flush 之前调用了 Session.rollback() 方法,或者在对象被 flush 之前为对象调用了 Session.expunge() 方法,则 待定 (pending) 对象可以恢复为 瞬态 (transient) 状态。使用 SessionEvents.pending_to_transient() 事件跟踪从待定到瞬态的转换

@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
    print("transient to pending: %s" % object_)

加载为持久 (Persistent)

当对象从数据库加载时,它们可以直接以 持久 (persistent) 状态出现在 Session 中。跟踪此状态转换与跟踪对象的加载过程同义,并且与使用 InstanceEvents.load() 实例级事件同义。但是,SessionEvents.loaded_as_persistent() 事件作为会话中心 (session-centric) 的钩子提供,用于拦截通过此特定途径进入持久状态的对象

@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
    print("object loaded into persistent state: %s" % object_)

持久到瞬态 (Transient)

如果为首次将对象添加为待定状态的事务调用了 Session.rollback() 方法,则持久对象可以恢复为瞬态状态。在 ROLLBACK 的情况下,使此对象变为持久状态的 INSERT 语句被回滚,并且该对象从 Session 中逐出,再次变为瞬态状态。使用 SessionEvents.persistent_to_transient() 事件钩子跟踪从持久状态恢复为瞬态状态的对象

@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
    print("persistent to transient: %s" % object_)

持久到已删除 (Deleted)

当标记为删除的对象在 flush 过程中从数据库中删除时,持久对象进入 已删除 (deleted) 状态。请注意,这与为目标对象调用 Session.delete() 方法是 **不相同的**。Session.delete() 方法仅 **标记** 对象为删除;实际的 DELETE 语句直到 flush 操作继续后才发出。在 flush 操作之后,目标对象才处于“已删除”状态。

在“已删除”状态下,对象仅与 Session 有轻微关联。它既不在身份映射 (identity map) 中,也不在 Session.deleted 集合中,该集合指的是对象何时待删除 (pending for deletion)。

从“已删除”状态,对象可以在事务提交时变为分离 (detached) 状态,或者在事务回滚时返回到持久状态。

使用 SessionEvents.persistent_to_deleted() 跟踪从持久到已删除的转换

@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
    print("object was DELETEd, is now in deleted state: %s" % object_)

已删除到分离 (Detached)

当会话的事务提交时,已删除对象变为 分离 (detached) 状态。在调用 Session.commit() 方法后,数据库事务最终完成,并且 Session 现在完全丢弃已删除的对象并移除与之的所有关联。使用 SessionEvents.deleted_to_detached() 跟踪从已删除到分离的转换

@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
    print("deleted to detached: %s" % object_)

注意

当对象处于已删除状态时,可以使用 inspect(object).deleted 访问的 InstanceState.deleted 属性返回 True。但是,当对象分离时,InstanceState.deleted 将再次返回 False。要检测对象是否已被删除,无论它是否已分离,请使用 InstanceState.was_deleted 访问器。

持久到分离 (Detached)

当对象通过 Session.expunge()Session.expunge_all()Session.close() 方法与 Session 取消关联时,持久对象变为 分离 (detached) 状态。

注意

如果应用程序取消引用其所属的 Session 并由于垃圾回收而被丢弃,则对象也可能 **隐式地分离 (implicitly detached)**。在这种情况下,**不会发出任何事件**。

使用 SessionEvents.persistent_to_detached() 事件跟踪对象从持久状态移动到分离状态

@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
    print("object became detached: %s" % object_)

分离到持久 (Persistent)

当分离对象使用 Session.add() 或等效方法重新与会话关联时,它将变为持久状态。使用 SessionEvents.detached_to_persistent() 事件跟踪对象从分离状态返回到持久状态

@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
    print("object became persistent again: %s" % object_)

已删除到持久 (Persistent)

当使用 Session.rollback() 方法回滚其中 DELETEd 对象的事务时,已删除 (deleted) 对象可以恢复为 持久 (persistent) 状态。使用 SessionEvents.deleted_to_persistent() 事件跟踪从已删除状态返回到持久状态的对象

@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
    print("deleted to persistent: %s" % object_)

事务事件 (Transaction Events)

事务事件允许应用程序在 Session 级别以及当 Session 更改 Connection 对象上的事务状态时收到通知。

属性更改事件 (Attribute Change Events)

属性更改事件允许拦截对象上特定属性何时被修改。这些事件包括 AttributeEvents.set(), AttributeEvents.append(), 和 AttributeEvents.remove()。这些事件非常有用,特别是对于每个对象的验证操作;但是,使用“验证器 (validator)”钩子通常更方便,它在幕后使用这些钩子;有关此背景信息,请参阅 简单验证器 (Simple Validators)。属性事件也位于反向引用 (backreferences) 的机制之后。 属性检测 (Attribute Instrumentation) 中的示例说明了属性事件的使用。