SQLAlchemy 2.0 文档
- 上一个: 上下文/线程本地 Session
- 下一个: Session API
- 上级: 首页
- 本页目录
使用事件跟踪查询、对象和 Session 更改¶
SQLAlchemy 具有广泛的 事件监听 系统,该系统贯穿 Core 和 ORM。在 ORM 中,有各种各样的事件监听器钩子,这些钩子在 ORM 事件 的 API 级别进行了文档化。多年来,此事件集合不断发展,包括许多非常有用的新事件以及一些不如以前那么相关的旧事件。本节将尝试介绍主要的事件钩子以及何时可能使用它们。
执行事件¶
1.4 版本新增: Session
现在具有一个全面的钩子,旨在拦截代表 ORM 执行的所有 SELECT 语句以及批量 UPDATE 和 DELETE 语句。此钩子取代了之前的 QueryEvents.before_compile()
事件,以及 QueryEvents.before_compile_update()
和 QueryEvents.before_compile_delete()
。
Session
具有一个全面的系统,通过该系统,所有通过 Session.execute()
方法调用的查询(包括 Query
发出的所有 SELECT 语句以及代表列和关系加载器发出的所有 SELECT 语句)都可以被拦截和修改。该系统使用 SessionEvents.do_orm_execute()
事件钩子以及 ORMExecuteState
对象来表示事件状态。
基本查询拦截¶
SessionEvents.do_orm_execute()
首先适用于任何类型的查询拦截,包括 Query
与 1.x 样式 发出的查询,以及当启用 ORM 的 2.0 样式 select()
、update()
或 delete()
构造传递给 Session.execute()
时。ORMExecuteState
构造提供访问器以允许修改语句、参数和选项
Session = sessionmaker(engine)
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if orm_execute_state.is_select:
# add populate_existing for all SELECT statements
orm_execute_state.update_execution_options(populate_existing=True)
# check if the SELECT is against a certain entity and add an
# ORDER BY if so
col_descriptions = orm_execute_state.statement.column_descriptions
if col_descriptions[0]["entity"] is MyEntity:
orm_execute_state.statement = statement.order_by(MyEntity.name)
上面的示例说明了对 SELECT 语句的一些简单修改。在此级别,SessionEvents.do_orm_execute()
事件钩子旨在取代以前使用的 QueryEvents.before_compile()
事件,该事件对于各种加载器而言并非始终如一地触发;此外,QueryEvents.before_compile()
仅适用于 1.x 样式 与 Query
的使用,而不适用于 2.0 样式 与 Session.execute()
的使用。
添加全局 WHERE / ON 条件¶
最常请求的查询扩展功能之一是能够将 WHERE 条件添加到所有查询中实体的所有出现项。这可以通过使用 with_loader_criteria()
查询选项来实现,该选项可以单独使用,或者非常适合在 SessionEvents.do_orm_execute()
事件中使用
from sqlalchemy.orm import with_loader_criteria
Session = sessionmaker(engine)
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if (
orm_execute_state.is_select
and not orm_execute_state.is_column_load
and not orm_execute_state.is_relationship_load
):
orm_execute_state.statement = orm_execute_state.statement.options(
with_loader_criteria(MyEntity.public == True)
)
上面,一个选项被添加到所有 SELECT 语句中,这将限制针对 MyEntity
的所有查询,以按 public == True
进行过滤。该条件将应用于立即查询范围内该类的所有加载。with_loader_criteria()
选项默认情况下也会自动传播到关系加载器,这将应用于后续关系加载,包括延迟加载、selectinloads 等。
对于一系列都具有某些公共列结构的类,如果这些类是使用 声明性混入 组成的,则混入类本身可以与 with_loader_criteria()
选项结合使用,方法是使用 Python lambda。Python lambda 将在查询编译时针对与条件匹配的特定实体调用。给定一系列基于名为 HasTimestamp
的混入的类
import datetime
class HasTimestamp:
timestamp = mapped_column(DateTime, default=datetime.datetime.now)
class SomeEntity(HasTimestamp, Base):
__tablename__ = "some_entity"
id = mapped_column(Integer, primary_key=True)
class SomeOtherEntity(HasTimestamp, Base):
__tablename__ = "some_entity"
id = mapped_column(Integer, primary_key=True)
上面的类 SomeEntity
和 SomeOtherEntity
将各自具有一个列 timestamp
,该列默认为当前日期和时间。可以使用事件来拦截所有从 HasTimestamp
扩展的对象,并在不早于一个月前的日期过滤其 timestamp
列
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if (
orm_execute_state.is_select
and not orm_execute_state.is_column_load
and not orm_execute_state.is_relationship_load
):
one_month_ago = datetime.datetime.today() - datetime.timedelta(months=1)
orm_execute_state.statement = orm_execute_state.statement.options(
with_loader_criteria(
HasTimestamp,
lambda cls: cls.timestamp >= one_month_ago,
include_aliases=True,
)
)
警告
在调用 with_loader_criteria()
内部使用 lambda 仅对每个唯一类调用一次。自定义函数不应在此 lambda 中调用。有关“lambda SQL”功能的概述,请参阅 使用 Lambdas 为语句生成添加显著的速度提升,该功能仅供高级使用。
另请参阅
ORM 查询事件 - 包括上述 with_loader_criteria()
食谱的工作示例。
重新执行语句¶
深度炼金术
语句重新执行功能涉及稍微复杂的递归序列,旨在解决将 SQL 语句的执行重新路由到各种非 SQL 上下文中的相当困难的问题。下面链接的“dogpile 缓存”和“水平分片”的双重示例应用作指导,以了解何时适合使用此相当高级的功能。
ORMExecuteState
能够控制给定语句的执行;这包括完全不调用语句的能力(允许返回从缓存检索的预先构造的结果集),以及使用不同的状态重复调用同一语句的能力,例如针对多个数据库连接调用它,然后在内存中合并结果。SQLAlchemy 的示例套件中详细说明了这两种高级模式,如下所示。
当在 SessionEvents.do_orm_execute()
事件钩子内部时,可以使用 ORMExecuteState.invoke_statement()
方法来调用语句,方法是使用 Session.execute()
的新嵌套调用,这将抢占正在进行的当前执行的后续处理,而是返回内部执行返回的 Result
。到目前为止在此过程中为 SessionEvents.do_orm_execute()
钩子调用的事件处理程序也将在这种嵌套调用中被跳过。
ORMExecuteState.invoke_statement()
方法返回一个 Result
对象;然后,该对象具有将其“冻结”为可缓存格式并“解冻”为新的 Result
对象的能力,以及将其数据与其他 Result
对象的数据合并的能力。
例如,使用 SessionEvents.do_orm_execute()
来实现缓存
from sqlalchemy.orm import loading
cache = {}
@event.listens_for(Session, "do_orm_execute")
def _do_orm_execute(orm_execute_state):
if "my_cache_key" in orm_execute_state.execution_options:
cache_key = orm_execute_state.execution_options["my_cache_key"]
if cache_key in cache:
frozen_result = cache[cache_key]
else:
frozen_result = orm_execute_state.invoke_statement().freeze()
cache[cache_key] = frozen_result
return loading.merge_frozen_result(
orm_execute_state.session,
orm_execute_state.statement,
frozen_result,
load=False,
)
使用上面的钩子到位,使用缓存的示例将如下所示
stmt = (
select(User).where(User.name == "sandy").execution_options(my_cache_key="key_sandy")
)
result = session.execute(stmt)
上面,自定义执行选项被传递给 Select.execution_options()
,以便建立一个“缓存键”,然后该缓存键将被 SessionEvents.do_orm_execute()
钩子拦截。然后,此缓存键与可能存在于缓存中的 FrozenResult
对象匹配,如果存在,则重新使用该对象。该配方使用 Result.freeze()
方法来“冻结”一个 Result
对象,该对象在上面将包含 ORM 结果,以便可以将其存储在缓存中并多次使用。为了从“冻结”结果返回实时结果,merge_frozen_result()
函数用于将结果对象中的“冻结”数据合并到当前会话中。
上面的示例在 Dogpile 缓存 中作为完整示例实现。
ORMExecuteState.invoke_statement()
方法也可以多次调用,将不同的信息传递给 ORMExecuteState.invoke_statement.bind_arguments
参数,以便 Session
每次都将使用不同的 Engine
对象。这将每次返回一个不同的 Result
对象;可以使用 Result.merge()
方法将这些结果合并在一起。这是 水平分片 扩展采用的技术;请参阅源代码以熟悉。
持久化事件¶
可能最广泛使用的一系列事件是“持久化”事件,这些事件与 flush 过程 相对应。flush 是在其中做出有关对象挂起更改的所有决策,然后以 INSERT、UPDATE 和 DELETE 语句的形式发出到数据库。
before_flush()
¶
SessionEvents.before_flush()
钩子是应用程序希望确保在 flush 继续进行时对数据库进行其他持久性更改时,迄今为止最常用的事件。使用 SessionEvents.before_flush()
以对对象进行操作以验证其状态,以及在对象持久化之前组合其他对象和引用。在此事件中,可以安全地操作 Session 的状态,也就是说,可以将新对象附加到它,可以删除对象,并且可以自由更改对象上的各个属性,并且当事件钩子完成时,这些更改将被拉入 flush 过程。
典型的 SessionEvents.before_flush()
钩子将负责扫描集合 Session.new
、Session.dirty
和 Session.deleted
,以查找将要发生某些事情的对象。
有关 SessionEvents.before_flush()
的说明,请参见诸如 使用历史记录表进行版本控制 和 使用时间行进行版本控制 之类的示例。
after_flush()
¶
SessionEvents.after_flush()
钩子在 flush 过程的 SQL 发出后调用,但在之前刷新对象的对象状态已更改。也就是说,您仍然可以检查 Session.new
、Session.dirty
和 Session.deleted
集合,以查看刚刚刷新了什么,并且您还可以使用历史记录跟踪功能(例如 AttributeState
提供的功能)来查看刚刚持久化了哪些更改。在 SessionEvents.after_flush()
事件中,可以根据观察到的更改向数据库发出其他 SQL。
after_flush_postexec()
¶
SessionEvents.after_flush_postexec()
在 SessionEvents.after_flush()
之后不久被调用,但在之后修改了对象的状态以说明刚刚发生的 flush。Session.new
、Session.dirty
和 Session.deleted
集合通常在此处完全为空。使用 SessionEvents.after_flush_postexec()
来检查已完成对象的标识映射,并可能发出其他 SQL。在此钩子中,可以对对象进行新的更改,这意味着 Session
将再次进入“脏”状态;如果在此钩子中检测到新更改(如果 flush 是在 Session.commit()
的上下文中调用的),则 Session
的机制将导致它再次刷新;否则,挂起的更改将捆绑为下一个正常 flush 的一部分。当钩子在 Session.commit()
中检测到新更改时,计数器将确保在此方面在 100 次迭代后停止无休止的循环,以防 SessionEvents.after_flush_postexec()
钩子每次调用时都不断添加要刷新的新状态。
Mapper 级别 Flush 事件¶
除了 flush 级别钩子之外,还有一套更细粒度的钩子,因为它们是基于每个对象调用的,并且在 flush 过程中根据 INSERT、UPDATE 或 DELETE 分解。这些是 mapper 持久性钩子,它们也很受欢迎,但是需要更加谨慎地对待这些事件,因为它们在已经进行的 flush 过程的上下文中进行;许多操作在这里进行是不安全的。
事件是
注意
重要的是要注意,这些事件仅适用于 会话 flush 操作,而不适用于 ORM 启用 INSERT、UPDATE 和 DELETE 语句 中描述的 ORM 级别 INSERT/UPDATE/DELETE 功能。要拦截 ORM 级别 DML,请使用 SessionEvents.do_orm_execute()
事件。
每个事件都传递 Mapper
、映射的对象本身以及用于发出 INSERT、UPDATE 或 DELETE 语句的 Connection
。这些事件的吸引力显而易见,因为如果应用程序希望将某些活动与特定类型的对象使用 INSERT 持久化时联系起来,则钩子非常具体;与 SessionEvents.before_flush()
事件不同,无需搜索诸如 Session.new
之类的集合以查找目标。但是,当调用这些事件时,表示要发出的每个 INSERT、UPDATE、DELETE 语句的完整列表的 flush 计划已经确定,并且在此阶段不得进行任何更改。因此,对给定对象唯一可能进行的更改是对象行的本地属性。对对象或其他对象的任何其他更改都将影响 Session
的状态,这将导致其无法正常运行。
在这些 mapper 级别持久性事件中不支持的操作包括
映射的集合 append、add、remove、delete、discard 等。
映射的关系属性 set/del 事件,即
someobject.related = someotherobject
传递 Connection
的原因是鼓励在此处直接在 Connection
上执行简单 SQL 操作,例如递增计数器或在日志表中插入额外的行。
还有许多基于每个对象的操作根本不需要在 flush 事件中处理。最常见的替代方法是简单地在其 __init__()
方法中与对象一起建立其他状态,例如创建要与新对象关联的其他对象。使用 简单验证器 中描述的验证器是另一种方法;这些函数可以拦截对属性的更改,并响应属性更改在目标对象上建立其他状态更改。通过这两种方法,对象在进入 flush 步骤之前处于正确的状态。
对象生命周期事件¶
事件的另一个用例是跟踪对象的生命周期。这指的是在 对象状态快速入门 中首次介绍的状态。
上面的所有状态都可以使用事件完全跟踪。每个事件都代表一个不同的状态转换,这意味着,起始状态和目标状态都是跟踪内容的一部分。除了初始瞬态事件外,所有事件都以 Session
对象或类为单位,这意味着它们可以与特定的 Session
对象关联
from sqlalchemy import event
from sqlalchemy.orm import Session
session = Session()
@event.listens_for(session, "transient_to_pending")
def object_is_pending(session, obj):
print("new pending: %s" % obj)
或与 Session
类本身关联,以及与特定的 sessionmaker
关联,这可能是最有用的形式
from sqlalchemy import event
from sqlalchemy.orm import sessionmaker
maker = sessionmaker()
@event.listens_for(maker, "transient_to_pending")
def object_is_pending(session, obj):
print("new pending: %s" % obj)
监听器当然可以堆叠在一个函数之上,这很可能是常见的。例如,要跟踪所有正在进入持久状态的对象
@event.listens_for(maker, "pending_to_persistent")
@event.listens_for(maker, "deleted_to_persistent")
@event.listens_for(maker, "detached_to_persistent")
@event.listens_for(maker, "loaded_as_persistent")
def detect_all_persistent(session, instance):
print("object is now persistent: %s" % instance)
瞬态¶
所有映射对象在首次构建时都以瞬态 (transient)状态开始。在这种状态下,对象独立存在,并且不与任何 Session
关联。对于这种初始状态,没有特定的“转换”事件,因为没有 Session
。但是,如果想要拦截任何瞬态对象被创建的时刻,InstanceEvents.init()
方法可能是最好的事件。此事件应用于特定的类或超类。例如,要拦截特定声明基类的所有新对象
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import event
class Base(DeclarativeBase):
pass
@event.listens_for(Base, "init", propagate=True)
def intercept_init(instance, args, kwargs):
print("new transient: %s" % instance)
瞬态到待定 (Pending)¶
当瞬态对象首次通过 Session.add()
或 Session.add_all()
方法与 Session
关联时,它会变为 待定 (pending) 状态。对象也可能由于来自显式添加的引用对象的 “级联 (cascade)” 而成为 Session
的一部分。可以使用 SessionEvents.transient_to_pending()
事件检测从瞬态到待定的转换
@event.listens_for(sessionmaker, "transient_to_pending")
def intercept_transient_to_pending(session, object_):
print("transient to pending: %s" % object_)
待定到持久 (Persistent)¶
当 flush 操作继续并且为实例执行 INSERT 语句时,待定 (pending) 对象变为 持久 (persistent) 状态。对象现在具有身份键 (identity key)。使用 SessionEvents.pending_to_persistent()
事件跟踪从待定到持久的转换
@event.listens_for(sessionmaker, "pending_to_persistent")
def intercept_pending_to_persistent(session, object_):
print("pending to persistent: %s" % object_)
待定到瞬态 (Transient)¶
如果在待定对象被 flush 之前调用了 Session.rollback()
方法,或者在对象被 flush 之前为对象调用了 Session.expunge()
方法,则 待定 (pending) 对象可以恢复为 瞬态 (transient) 状态。使用 SessionEvents.pending_to_transient()
事件跟踪从待定到瞬态的转换
@event.listens_for(sessionmaker, "pending_to_transient")
def intercept_pending_to_transient(session, object_):
print("transient to pending: %s" % object_)
加载为持久 (Persistent)¶
当对象从数据库加载时,它们可以直接以 持久 (persistent) 状态出现在 Session
中。跟踪此状态转换与跟踪对象的加载过程同义,并且与使用 InstanceEvents.load()
实例级事件同义。但是,SessionEvents.loaded_as_persistent()
事件作为会话中心 (session-centric) 的钩子提供,用于拦截通过此特定途径进入持久状态的对象
@event.listens_for(sessionmaker, "loaded_as_persistent")
def intercept_loaded_as_persistent(session, object_):
print("object loaded into persistent state: %s" % object_)
持久到瞬态 (Transient)¶
如果为首次将对象添加为待定状态的事务调用了 Session.rollback()
方法,则持久对象可以恢复为瞬态状态。在 ROLLBACK 的情况下,使此对象变为持久状态的 INSERT 语句被回滚,并且该对象从 Session
中逐出,再次变为瞬态状态。使用 SessionEvents.persistent_to_transient()
事件钩子跟踪从持久状态恢复为瞬态状态的对象
@event.listens_for(sessionmaker, "persistent_to_transient")
def intercept_persistent_to_transient(session, object_):
print("persistent to transient: %s" % object_)
持久到已删除 (Deleted)¶
当标记为删除的对象在 flush 过程中从数据库中删除时,持久对象进入 已删除 (deleted) 状态。请注意,这与为目标对象调用 Session.delete()
方法是 **不相同的**。Session.delete()
方法仅 **标记** 对象为删除;实际的 DELETE 语句直到 flush 操作继续后才发出。在 flush 操作之后,目标对象才处于“已删除”状态。
在“已删除”状态下,对象仅与 Session
有轻微关联。它既不在身份映射 (identity map) 中,也不在 Session.deleted
集合中,该集合指的是对象何时待删除 (pending for deletion)。
从“已删除”状态,对象可以在事务提交时变为分离 (detached) 状态,或者在事务回滚时返回到持久状态。
使用 SessionEvents.persistent_to_deleted()
跟踪从持久到已删除的转换
@event.listens_for(sessionmaker, "persistent_to_deleted")
def intercept_persistent_to_deleted(session, object_):
print("object was DELETEd, is now in deleted state: %s" % object_)
已删除到分离 (Detached)¶
当会话的事务提交时,已删除对象变为 分离 (detached) 状态。在调用 Session.commit()
方法后,数据库事务最终完成,并且 Session
现在完全丢弃已删除的对象并移除与之的所有关联。使用 SessionEvents.deleted_to_detached()
跟踪从已删除到分离的转换
@event.listens_for(sessionmaker, "deleted_to_detached")
def intercept_deleted_to_detached(session, object_):
print("deleted to detached: %s" % object_)
注意
当对象处于已删除状态时,可以使用 inspect(object).deleted
访问的 InstanceState.deleted
属性返回 True。但是,当对象分离时,InstanceState.deleted
将再次返回 False。要检测对象是否已被删除,无论它是否已分离,请使用 InstanceState.was_deleted
访问器。
持久到分离 (Detached)¶
当对象通过 Session.expunge()
、Session.expunge_all()
或 Session.close()
方法与 Session
取消关联时,持久对象变为 分离 (detached) 状态。
注意
如果应用程序取消引用其所属的 Session
并由于垃圾回收而被丢弃,则对象也可能 **隐式地分离 (implicitly detached)**。在这种情况下,**不会发出任何事件**。
使用 SessionEvents.persistent_to_detached()
事件跟踪对象从持久状态移动到分离状态
@event.listens_for(sessionmaker, "persistent_to_detached")
def intercept_persistent_to_detached(session, object_):
print("object became detached: %s" % object_)
分离到持久 (Persistent)¶
当分离对象使用 Session.add()
或等效方法重新与会话关联时,它将变为持久状态。使用 SessionEvents.detached_to_persistent()
事件跟踪对象从分离状态返回到持久状态
@event.listens_for(sessionmaker, "detached_to_persistent")
def intercept_detached_to_persistent(session, object_):
print("object became persistent again: %s" % object_)
已删除到持久 (Persistent)¶
当使用 Session.rollback()
方法回滚其中 DELETEd 对象的事务时,已删除 (deleted) 对象可以恢复为 持久 (persistent) 状态。使用 SessionEvents.deleted_to_persistent()
事件跟踪从已删除状态返回到持久状态的对象
@event.listens_for(sessionmaker, "deleted_to_persistent")
def intercept_deleted_to_persistent(session, object_):
print("deleted to persistent: %s" % object_)
事务事件 (Transaction Events)¶
事务事件允许应用程序在 Session
级别以及当 Session
更改 Connection
对象上的事务状态时收到通知。
SessionEvents.after_transaction_create()
,SessionEvents.after_transaction_end()
- 这些事件以不特定于单个数据库连接的方式跟踪Session
的逻辑事务范围。这些事件旨在帮助集成事务跟踪系统,例如zope.sqlalchemy
。当应用程序需要将一些外部范围与Session
的事务范围对齐时,请使用这些事件。这些钩子反映了Session
的“嵌套”事务行为,因为它们跟踪逻辑“子事务”以及“嵌套”事务(例如 SAVEPOINT)。SessionEvents.before_commit()
,SessionEvents.after_commit()
,SessionEvents.after_begin()
,SessionEvents.after_rollback()
,SessionEvents.after_soft_rollback()
- 这些事件允许从数据库连接的角度跟踪事务事件。SessionEvents.after_begin()
尤其是一个按连接的事件;维护多个连接的Session
将为每个连接单独发出此事件,因为这些连接在当前事务中被使用。然后,rollback 和 commit 事件指的是 DBAPI 连接本身何时直接收到 rollback 或 commit 指令。
属性更改事件 (Attribute Change Events)¶
属性更改事件允许拦截对象上特定属性何时被修改。这些事件包括 AttributeEvents.set()
, AttributeEvents.append()
, 和 AttributeEvents.remove()
。这些事件非常有用,特别是对于每个对象的验证操作;但是,使用“验证器 (validator)”钩子通常更方便,它在幕后使用这些钩子;有关此背景信息,请参阅 简单验证器 (Simple Validators)。属性事件也位于反向引用 (backreferences) 的机制之后。 属性检测 (Attribute Instrumentation) 中的示例说明了属性事件的使用。
flambé! 龙和 The Alchemist 图像设计由 Rotem Yaari 创作并慷慨捐赠。
使用 Sphinx 7.2.6 创建。文档最后生成时间:Tue 11 Mar 2025 02:40:17 PM EDT