SQLAlchemy 2.0 文档
事务和连接管理¶
管理事务¶
更改于版本 1.4: Session 事务管理已修改为更清晰易用。特别是,它现在具有“autobegin”操作,这意味着可以控制事务开始的点,而无需使用传统的“autocommit”模式。
Session
跟踪单个“虚拟”事务的状态,使用一个名为 SessionTransaction
的对象。这个对象然后利用底层的 Engine
或 Session
对象绑定的引擎,以便使用 Connection
对象根据需要启动实际的连接级别事务。
这个“虚拟”事务是在需要时自动创建的,或者可以使用 Session.begin()
方法手动启动。在尽可能大的程度上,Python 上下文管理器使用在创建 Session
对象级别以及维护 SessionTransaction
的作用域方面都得到了支持。
下面,假设我们从一个 Session
开始
from sqlalchemy.orm import Session
session = Session(engine)
我们现在可以使用上下文管理器在划定的事务中运行操作
with session.begin():
session.add(some_object())
session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised
在上面的上下文结束时,假设没有引发异常,任何挂起的对象都将被刷新到数据库中,并且数据库事务将被提交。如果在上面的代码块中引发了异常,那么事务将被回滚。在这两种情况下,上面的 Session
在退出代码块后将准备好用于后续事务。
Session.begin()
方法是可选的,Session
也可以用于随用随提交的方式,在这种方式下,它将根据需要自动启动事务;这些事务只需要被提交或回滚
session = Session(engine)
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.rollback() # rolls back still_another_object
Session
本身有一个 Session.close()
方法。如果 Session
在一个尚未提交或回滚的事务中启动,此方法将取消(即回滚)该事务,并将清除 Session
对象状态中的所有对象。如果 Session
正在被使用,并且无法保证调用 Session.commit()
或 Session.rollback()
(例如,不在上下文管理器或类似环境中),可以使用 close
方法确保所有资源都已释放
# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()
最后,Session 构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session
对象使用的作用域在固定代码块内的一种最佳方式。首先通过 Session
构造函数说明
with Session(engine) as session:
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.commit() # commits
result = session.execute(text("<some SELECT statement>"))
# remaining transactional state from the .execute() call is
# discarded
类似地,sessionmaker
可以以相同的方式使用
Session = sessionmaker(engine)
with Session() as session:
with session.begin():
session.add(some_object)
# commits
# closes the Session
sessionmaker
本身包含一个 sessionmaker.begin()
方法,以允许这两个操作同时进行
with Session.begin() as session:
session.add(some_object)
使用 SAVEPOINT¶
SAVEPOINT 事务(如果底层引擎支持)可以使用 Session.begin_nested()
方法进行划分
Session = sessionmaker()
with Session.begin() as session:
session.add(u1)
session.add(u2)
nested = session.begin_nested() # establish a savepoint
session.add(u3)
nested.rollback() # rolls back u3, keeps u1 and u2
# commits u1 and u2
每次调用 Session.begin_nested()
时,在当前数据库事务范围内(如果尚未进行,则启动一个事务),会向数据库发出一个新的“BEGIN SAVEPOINT”命令,并返回一个类型为 SessionTransaction
的对象,该对象表示此 SAVEPOINT 的句柄。当在该对象上调用 .commit()
方法时,会向数据库发出“RELEASE SAVEPOINT”,而如果调用 .rollback()
方法,则会发出“ROLLBACK TO SAVEPOINT”。外层数据库事务将保持进行状态。
Session.begin_nested()
通常用作上下文管理器,在其中可以捕获特定实例错误,并结合对该部分事务状态的回滚,而不会回滚整个事务,如下例所示
for record in records:
try:
with session.begin_nested():
session.merge(record)
except:
print("Skipped record %s" % record)
session.commit()
当 Session.begin_nested()
生成的上下文管理器完成时,它会“提交”保存点,这包括将所有挂起状态刷新的正常行为。当引发错误时,保存点会被回滚,并且 Session
中与被更改对象相关的本地状态会失效。
此模式非常适合以下情况:使用 PostgreSQL 并捕获 IntegrityError
以检测重复行;PostgreSQL 通常在引发此类错误时会中止整个事务,但是当使用 SAVEPOINT 时,外层事务会保持进行状态。在下面的示例中,一个数据列表会持久化到数据库中,偶尔会跳过“重复主键”记录,而不会回滚整个操作
from sqlalchemy import exc
with session.begin():
for record in records:
try:
with session.begin_nested():
obj = SomeRecord(id=record["identifier"], name=record["name"])
session.add(obj)
except exc.IntegrityError:
print(f"Skipped record {record} - row already exists")
当调用 Session.begin_nested()
时,Session
首先将所有当前挂起的状态刷新到数据库;这将无条件发生,无论 Session.autoflush
参数的值是什么,该参数通常可以用来禁用自动刷新。这种行为背后的原因是,当此嵌套事务回滚时,Session
可以使在 SAVEPOINT 范围内创建的所有内存中状态失效,同时确保当这些失效的对象被刷新时,SAVEPOINT 开始之前对象图的状态将可用于从数据库中重新加载。
在现代版本的 SQLAlchemy 中,当由 Session.begin_nested()
启动的 SAVEPOINT 回滚时,自 SAVEPOINT 创建以来被修改的内存中对象状态将失效,但是自 SAVEPOINT 开始以来未被修改的其他对象状态将被保留。这样,后续操作可以继续使用未受影响的数据,而无需从数据库中刷新它们。
另请参阅
Connection.begin_nested()
- 核心 SAVEPOINT API
会话级与引擎级事务控制¶
核心中的 Connection
和 ORM 中的 _session.Session
具有等效的事务语义,无论是在 sessionmaker
与 Engine
级别,还是 Session
与 Connection
级别。以下部分将根据以下方案详细介绍这些场景
ORM Core
----------------------------------------- -----------------------------------
sessionmaker Engine
Session Connection
sessionmaker.begin() Engine.begin()
some_session.commit() some_connection.commit()
with some_sessionmaker() as session: with some_engine.connect() as conn:
with some_sessionmaker.begin() as session: with some_engine.begin() as conn:
with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
边走边提交¶
无论 Session
还是 Connection
,都具有 Connection.commit()
和 Connection.rollback()
方法。使用 SQLAlchemy 2.0 样式的操作,这些方法在所有情况下都会影响最外层事务。对于 Session
,假设 Session.autobegin
保留其默认值 True
。
引擎
:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.connect() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
conn.commit()
会话
:
Session = sessionmaker(engine)
with Session() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
session.commit()
只开始一次¶
无论 sessionmaker
还是 Engine
,都具有 Engine.begin()
方法,该方法将同时获取一个新的对象来执行 SQL 语句(分别为 Session
和 Connection
),然后返回一个上下文管理器,该管理器将为该对象维护一个 begin/commit/rollback 上下文。
引擎
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
# commits and closes automatically
会话
Session = sessionmaker(engine)
with Session.begin() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
# commits and closes automatically
嵌套事务¶
当使用 Session.begin_nested()
或 Connection.begin_nested()
方法通过 SAVEPOINT 时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。调用 Session.commit()
或 Connection.commit()
方法将始终提交最外层事务;这是 SQLAlchemy 2.0 的特定行为,它与 1.x 系列相反。
引擎
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
savepoint = conn.begin_nested()
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
savepoint.commit() # or rollback
# commits automatically
会话
Session = sessionmaker(engine)
with Session.begin() as session:
savepoint = session.begin_nested()
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
savepoint.commit() # or rollback
# commits automatically
显式开始¶
Session
具有“autobegin”行为,这意味着只要操作开始进行,它就会确保存在 SessionTransaction
来跟踪正在进行的操作。当调用 Session.commit()
时,此事务将完成。
通常情况下,尤其是在框架集成中,控制“begin”操作发生的点是可取的。为了适应这一点,Session
使用“autobegin”策略,这样就可以直接为尚未开始事务的 Session
调用 Session.begin()
方法
Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
session.commit()
except:
session.rollback()
raise
上面的模式更习惯地使用上下文管理器调用
Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
Session.begin()
方法和会话的“autobegin”过程使用相同的步骤序列来开始事务。这包括当它发生时调用 SessionEvents.after_transaction_create()
事件;此钩子由框架使用,以便将它们自己的事务过程与 ORM Session
的事务过程集成。
启用两阶段提交¶
对于支持两阶段操作的后端(目前为 MySQL 和 PostgreSQL),会话可以被指示使用两阶段提交语义。这将协调跨数据库的提交事务,以便事务在所有数据库中都已提交或回滚。你还可以 Session.prepare()
会话以与未由 SQLAlchemy 管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True
engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})
session = Session()
# .... work with accounts and users
# commit. session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
设置事务隔离级别/DBAPI AUTOCOMMIT¶
大多数 DBAPI 支持可配置事务 隔离 级别的概念。这些通常是四个级别“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常应用于 DBAPI 连接,在它开始新的事务之前,要注意的是,大多数 DBAPI 会在第一次发出 SQL 语句时隐式开始此事务。
支持隔离级别的 DBAPI 通常也支持真正的“autocommit”概念,这意味着 DBAPI 连接本身将被置于非事务性自动提交模式。这通常意味着 DBAPI 自动向数据库发出“BEGIN”的典型行为不再发生,但也可能包括其他指令。在这种模式下,DBAPI 在任何情况下都不会使用事务。SQLAlchemy 方法(如 .begin()
、.commit()
和 .rollback()
)将静默通过。
SQLAlchemy 的方言支持在每个 Engine
或每个 Connection
基础上设置可设置的隔离模式,使用 create_engine()
级别和 Connection.execution_options()
级别的标志。
当使用 ORM Session
时,它充当引擎和连接的外观,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据情况对 Engine
或 Connection
进行操作。
另请参阅
设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别如何在 SQLAlchemy Connection
对象级别工作。
为 Sessionmaker / Engine 全局设置隔离 ¶
要全局设置具有特定隔离级别的 Session
或 sessionmaker
,第一个技巧是在所有情况下针对特定隔离级别构建一个 Engine
,然后将其用作 Session
和/或 sessionmaker
的连接来源。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
"postgresql+psycopg2://scott:tiger@localhost/test",
isolation_level="REPEATABLE READ",
)
Session = sessionmaker(eng)
另一个选项,如果需要同时使用两个具有不同隔离级别的引擎,就是使用 Engine.execution_options()
方法,该方法将生成原始 Engine
的浅层副本,该副本与父引擎共享相同的连接池。当操作将被分离到“事务性”和“自动提交”操作时,这通常是比较好的选择。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)
上面,“eng
” 和 "autocommit_engine"
共享相同的方言和连接池。但是,当从 autocommit_engine
获取连接时,将为这些连接设置“AUTOCOMMIT”模式。这两个 sessionmaker
对象,“transactional_session
” 和 “autocommit_session"
随后在与数据库连接交互时继承这些特性。
“autocommit_session
” **仍然具有事务语义**,包括 Session.commit()
和 Session.rollback()
仍然将自己视为“提交”和“回滚”对象,但是事务将被静默地忽略。因此,**通常(尽管不是严格要求)将具有 AUTOCOMMIT 隔离级别的 Session 用于只读方式**,即
with autocommit_session() as session:
some_objects = session.execute(text("<statement>"))
some_other_objects = session.execute(text("<statement>"))
# closes connection
为单个 Session 设置隔离 ¶
当我们创建一个新的 Session
时,无论是直接使用构造函数还是调用 sessionmaker
生成的可调用对象,我们都可以直接传递 bind
参数,从而覆盖预先存在的绑定。例如,我们可以从默认的 sessionmaker
创建我们的 Session
并传递为自动提交设置的引擎。
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# will normally use plain_engine
Session = sessionmaker(plain_engine)
# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
# work with session
...
对于 Session
或 sessionmaker
配置了多个“绑定”的情况,我们可以完全重新指定 binds
参数,或者如果我们只想替换特定绑定,我们可以使用 Session.bind_mapper()
或 Session.bind_table()
方法。
with Session() as session:
session.bind_mapper(User, autocommit_engine)
为单个事务设置隔离 ¶
关于隔离级别的一个关键注意事项是,在已经开始事务的 Connection
上无法安全地修改该设置。数据库无法更改正在进行的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在该领域的行为不一致。
因此,最好使用预先绑定到具有所需隔离级别的引擎的 Session
。但是,可以通过在事务开始时使用 Session.connection()
方法来影响每个连接的隔离级别。
from sqlalchemy.orm import Session
# assume session just constructed
sess = Session(bind=engine)
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# commit transaction. the connection is released
# and reverted to its previous isolation level.
sess.commit()
# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.
在上面,我们首先使用构造函数或 sessionmaker
生成一个 Session
。然后,我们通过调用 Session.connection()
明确设置数据库级事务的开始,该方法提供了将在数据库级事务开始之前传递给连接的执行选项。事务将使用此选定的隔离级别进行。当事务完成时,连接上的隔离级别将重置为默认值,然后连接将返回到连接池。
Session.begin()
方法也可用于启动 Session
级别事务;在该调用之后调用 Session.connection()
可用于设置每个连接事务的隔离级别。
sess = Session(bind=engine)
with sess.begin():
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a
# real database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# outside the block, the transaction has been committed. the connection is
# released and reverted to its previous isolation level.
使用事件跟踪事务状态 ¶
有关会话事务状态更改的可用事件挂钩概述,请参阅 事务事件 部分。
将 Session 加入外部事务(例如用于测试套件) ¶
如果使用的是已处于事务状态(即已建立 Transaction
)的 Connection
,则可以通过将 Session
绑定到该 Connection
来使 Session
在该事务中参与。这样做通常的理由是测试套件,该套件允许 ORM 代码与 Session
自由地工作,包括调用 Session.commit()
的能力,之后整个数据库交互将回滚。
在版本 2.0 中变更: 2.0 中,“加入外部事务”的方案再次得到了改进;不再需要“重置”嵌套事务的事件处理程序。
该方法通过在事务中建立一个Connection
,并可选地建立一个 SAVEPOINT,然后将它作为“bind”传递给一个Session
来实现。Session.join_transaction_mode
参数被设置为"create_savepoint"
,这表明为了实现Session
的 BEGIN/COMMIT/ROLLBACK 操作,应该创建新的 SAVEPOINT,这将保持外部事务处于它被传递时的相同状态。
当测试结束时,外部事务被回滚,以便在整个测试期间所做的任何数据更改都被恢复。
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase
# global application scope. create Session class, engine
Session = sessionmaker()
engine = create_engine("postgresql+psycopg2://...")
class SomeTest(TestCase):
def setUp(self):
# connect to the database
self.connection = engine.connect()
# begin a non-ORM transaction
self.trans = self.connection.begin()
# bind an individual Session to the connection, selecting
# "create_savepoint" join_transaction_mode
self.session = Session(
bind=self.connection, join_transaction_mode="create_savepoint"
)
def test_something(self):
# use the session in tests.
self.session.add(Foo())
self.session.commit()
def test_something_with_rollbacks(self):
self.session.add(Bar())
self.session.flush()
self.session.rollback()
self.session.add(Foo())
self.session.commit()
def tearDown(self):
self.session.close()
# rollback - everything that happened with the
# Session above (including calls to commit())
# is rolled back.
self.trans.rollback()
# return connection to the Engine
self.connection.close()
上述方法是 SQLAlchemy 自身 CI 的一部分,以确保它按预期工作。
flambé! 龙和炼金术士图像设计由Rotem Yaari创建并慷慨捐赠。
使用Sphinx 7.2.6 创建。最后生成文档: 2024 年 11 月 8 日星期五 08:41:19 AM EST