事务和连接管理

管理事务

更改于版本 1.4: Session 事务管理已修改为更清晰易用。特别是,它现在具有“autobegin”操作,这意味着可以控制事务开始的点,而无需使用传统的“autocommit”模式。

Session 跟踪单个“虚拟”事务的状态,使用一个名为 SessionTransaction 的对象。这个对象然后利用底层的 EngineSession 对象绑定的引擎,以便使用 Connection 对象根据需要启动实际的连接级别事务。

这个“虚拟”事务是在需要时自动创建的,或者可以使用 Session.begin() 方法手动启动。在尽可能大的程度上,Python 上下文管理器使用在创建 Session 对象级别以及维护 SessionTransaction 的作用域方面都得到了支持。

下面,假设我们从一个 Session 开始

from sqlalchemy.orm import Session

session = Session(engine)

我们现在可以使用上下文管理器在划定的事务中运行操作

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

在上面的上下文结束时,假设没有引发异常,任何挂起的对象都将被刷新到数据库中,并且数据库事务将被提交。如果在上面的代码块中引发了异常,那么事务将被回滚。在这两种情况下,上面的 Session 在退出代码块后将准备好用于后续事务。

Session.begin() 方法是可选的,Session 也可以用于随用随提交的方式,在这种方式下,它将根据需要自动启动事务;这些事务只需要被提交或回滚

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object

Session 本身有一个 Session.close() 方法。如果 Session 在一个尚未提交或回滚的事务中启动,此方法将取消(即回滚)该事务,并将清除 Session 对象状态中的所有对象。如果 Session 正在被使用,并且无法保证调用 Session.commit()Session.rollback() (例如,不在上下文管理器或类似环境中),可以使用 close 方法确保所有资源都已释放

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

最后,Session 构造/关闭过程本身可以通过上下文管理器运行。这是确保 Session 对象使用的作用域在固定代码块内的一种最佳方式。首先通过 Session 构造函数说明

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(text("<some SELECT statement>"))

# remaining transactional state from the .execute() call is
# discarded

类似地,sessionmaker 可以以相同的方式使用

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session

sessionmaker 本身包含一个 sessionmaker.begin() 方法,以允许这两个操作同时进行

with Session.begin() as session:
    session.add(some_object)

使用 SAVEPOINT

SAVEPOINT 事务(如果底层引擎支持)可以使用 Session.begin_nested() 方法进行划分

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2

每次调用 Session.begin_nested() 时,在当前数据库事务范围内(如果尚未进行,则启动一个事务),会向数据库发出一个新的“BEGIN SAVEPOINT”命令,并返回一个类型为 SessionTransaction 的对象,该对象表示此 SAVEPOINT 的句柄。当在该对象上调用 .commit() 方法时,会向数据库发出“RELEASE SAVEPOINT”,而如果调用 .rollback() 方法,则会发出“ROLLBACK TO SAVEPOINT”。外层数据库事务将保持进行状态。

Session.begin_nested() 通常用作上下文管理器,在其中可以捕获特定实例错误,并结合对该部分事务状态的回滚,而不会回滚整个事务,如下例所示

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

Session.begin_nested() 生成的上下文管理器完成时,它会“提交”保存点,这包括将所有挂起状态刷新的正常行为。当引发错误时,保存点会被回滚,并且 Session 中与被更改对象相关的本地状态会失效。

此模式非常适合以下情况:使用 PostgreSQL 并捕获 IntegrityError 以检测重复行;PostgreSQL 通常在引发此类错误时会中止整个事务,但是当使用 SAVEPOINT 时,外层事务会保持进行状态。在下面的示例中,一个数据列表会持久化到数据库中,偶尔会跳过“重复主键”记录,而不会回滚整个操作

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

当调用 Session.begin_nested() 时,Session 首先将所有当前挂起的状态刷新到数据库;这将无条件发生,无论 Session.autoflush 参数的值是什么,该参数通常可以用来禁用自动刷新。这种行为背后的原因是,当此嵌套事务回滚时,Session 可以使在 SAVEPOINT 范围内创建的所有内存中状态失效,同时确保当这些失效的对象被刷新时,SAVEPOINT 开始之前对象图的状态将可用于从数据库中重新加载。

在现代版本的 SQLAlchemy 中,当由 Session.begin_nested() 启动的 SAVEPOINT 回滚时,自 SAVEPOINT 创建以来被修改的内存中对象状态将失效,但是自 SAVEPOINT 开始以来未被修改的其他对象状态将被保留。这样,后续操作可以继续使用未受影响的数据,而无需从数据库中刷新它们。

另请参阅

Connection.begin_nested() - 核心 SAVEPOINT API

会话级与引擎级事务控制

核心中的 Connection 和 ORM 中的 _session.Session 具有等效的事务语义,无论是在 sessionmakerEngine 级别,还是 SessionConnection 级别。以下部分将根据以下方案详细介绍这些场景

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

边走边提交

无论 Session 还是 Connection,都具有 Connection.commit()Connection.rollback() 方法。使用 SQLAlchemy 2.0 样式的操作,这些方法在所有情况下都会影响最外层事务。对于 Session,假设 Session.autobegin 保留其默认值 True

引擎:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

会话:

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()

只开始一次

无论 sessionmaker 还是 Engine,都具有 Engine.begin() 方法,该方法将同时获取一个新的对象来执行 SQL 语句(分别为 SessionConnection),然后返回一个上下文管理器,该管理器将为该对象维护一个 begin/commit/rollback 上下文。

引擎

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

会话

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically

嵌套事务

当使用 Session.begin_nested()Connection.begin_nested() 方法通过 SAVEPOINT 时,必须使用返回的事务对象来提交或回滚 SAVEPOINT。调用 Session.commit()Connection.commit() 方法将始终提交最外层事务;这是 SQLAlchemy 2.0 的特定行为,它与 1.x 系列相反。

引擎

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback

# commits automatically

会话

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically

显式开始

Session 具有“autobegin”行为,这意味着只要操作开始进行,它就会确保存在 SessionTransaction 来跟踪正在进行的操作。当调用 Session.commit() 时,此事务将完成。

通常情况下,尤其是在框架集成中,控制“begin”操作发生的点是可取的。为了适应这一点,Session 使用“autobegin”策略,这样就可以直接为尚未开始事务的 Session 调用 Session.begin() 方法

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

上面的模式更习惯地使用上下文管理器调用

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Session.begin() 方法和会话的“autobegin”过程使用相同的步骤序列来开始事务。这包括当它发生时调用 SessionEvents.after_transaction_create() 事件;此钩子由框架使用,以便将它们自己的事务过程与 ORM Session 的事务过程集成。

启用两阶段提交

对于支持两阶段操作的后端(目前为 MySQL 和 PostgreSQL),会话可以被指示使用两阶段提交语义。这将协调跨数据库的提交事务,以便事务在所有数据库中都已提交或回滚。你还可以 Session.prepare() 会话以与未由 SQLAlchemy 管理的事务进行交互。要使用两阶段事务,请在会话上设置标志 twophase=True

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别/DBAPI AUTOCOMMIT

大多数 DBAPI 支持可配置事务 隔离 级别的概念。这些通常是四个级别“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常应用于 DBAPI 连接,在它开始新的事务之前,要注意的是,大多数 DBAPI 会在第一次发出 SQL 语句时隐式开始此事务。

支持隔离级别的 DBAPI 通常也支持真正的“autocommit”概念,这意味着 DBAPI 连接本身将被置于非事务性自动提交模式。这通常意味着 DBAPI 自动向数据库发出“BEGIN”的典型行为不再发生,但也可能包括其他指令。在这种模式下,DBAPI 在任何情况下都不会使用事务。SQLAlchemy 方法(如 .begin().commit().rollback())将静默通过。

SQLAlchemy 的方言支持在每个 Engine 或每个 Connection 基础上设置可设置的隔离模式,使用 create_engine() 级别和 Connection.execution_options() 级别的标志。

当使用 ORM Session 时,它充当引擎和连接的外观,但不直接公开事务隔离。因此,为了影响事务隔离级别,我们需要根据情况对 EngineConnection 进行操作。

另请参阅

设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别如何在 SQLAlchemy Connection 对象级别工作。

为 Sessionmaker / Engine 全局设置隔离

要全局设置具有特定隔离级别的 Sessionsessionmaker,第一个技巧是在所有情况下针对特定隔离级别构建一个 Engine,然后将其用作 Session 和/或 sessionmaker 的连接来源。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)

另一个选项,如果需要同时使用两个具有不同隔离级别的引擎,就是使用 Engine.execution_options() 方法,该方法将生成原始 Engine 的浅层副本,该副本与父引擎共享相同的连接池。当操作将被分离到“事务性”和“自动提交”操作时,这通常是比较好的选择。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

上面,“eng” 和 "autocommit_engine" 共享相同的方言和连接池。但是,当从 autocommit_engine 获取连接时,将为这些连接设置“AUTOCOMMIT”模式。这两个 sessionmaker 对象,“transactional_session” 和 “autocommit_session" 随后在与数据库连接交互时继承这些特性。

autocommit_session” **仍然具有事务语义**,包括 Session.commit()Session.rollback() 仍然将自己视为“提交”和“回滚”对象,但是事务将被静默地忽略。因此,**通常(尽管不是严格要求)将具有 AUTOCOMMIT 隔离级别的 Session 用于只读方式**,即

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))

# closes connection

为单个 Session 设置隔离

当我们创建一个新的 Session 时,无论是直接使用构造函数还是调用 sessionmaker 生成的可调用对象,我们都可以直接传递 bind 参数,从而覆盖预先存在的绑定。例如,我们可以从默认的 sessionmaker 创建我们的 Session 并传递为自动提交设置的引擎。

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

对于 Sessionsessionmaker 配置了多个“绑定”的情况,我们可以完全重新指定 binds 参数,或者如果我们只想替换特定绑定,我们可以使用 Session.bind_mapper()Session.bind_table() 方法。

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

为单个事务设置隔离

关于隔离级别的一个关键注意事项是,在已经开始事务的 Connection 上无法安全地修改该设置。数据库无法更改正在进行的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在该领域的行为不一致。

因此,最好使用预先绑定到具有所需隔离级别的引擎的 Session。但是,可以通过在事务开始时使用 Session.connection() 方法来影响每个连接的隔离级别。

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

在上面,我们首先使用构造函数或 sessionmaker 生成一个 Session。然后,我们通过调用 Session.connection() 明确设置数据库级事务的开始,该方法提供了将在数据库级事务开始之前传递给连接的执行选项。事务将使用此选定的隔离级别进行。当事务完成时,连接上的隔离级别将重置为默认值,然后连接将返回到连接池。

Session.begin() 方法也可用于启动 Session 级别事务;在该调用之后调用 Session.connection() 可用于设置每个连接事务的隔离级别。

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

使用事件跟踪事务状态

有关会话事务状态更改的可用事件挂钩概述,请参阅 事务事件 部分。

将 Session 加入外部事务(例如用于测试套件)

如果使用的是已处于事务状态(即已建立 Transaction)的 Connection,则可以通过将 Session 绑定到该 Connection 来使 Session 在该事务中参与。这样做通常的理由是测试套件,该套件允许 ORM 代码与 Session 自由地工作,包括调用 Session.commit() 的能力,之后整个数据库交互将回滚。

在版本 2.0 中变更: 2.0 中,“加入外部事务”的方案再次得到了改进;不再需要“重置”嵌套事务的事件处理程序。

该方法通过在事务中建立一个Connection,并可选地建立一个 SAVEPOINT,然后将它作为“bind”传递给一个Session来实现。Session.join_transaction_mode参数被设置为"create_savepoint",这表明为了实现Session的 BEGIN/COMMIT/ROLLBACK 操作,应该创建新的 SAVEPOINT,这将保持外部事务处于它被传递时的相同状态。

当测试结束时,外部事务被回滚,以便在整个测试期间所做的任何数据更改都被恢复。

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

上述方法是 SQLAlchemy 自身 CI 的一部分,以确保它按预期工作。