事务和连接管理

管理事务

在版本 1.4 中更改: Session 事务管理已修订得更清晰易用。 特别是,它现在具有“自动开始”操作,这意味着可以控制事务开始的点,而无需使用传统的“自动提交”模式。

Session 每次跟踪单个“虚拟”事务的状态,使用名为 SessionTransaction 的对象。 然后,此对象利用 Session 对象绑定到的底层 Engine 或引擎,以便根据需要使用 Connection 对象启动实际的连接级事务。

这个“虚拟”事务在需要时自动创建,或者也可以使用 Session.begin() 方法启动。 为了尽可能大的程度,在创建 Session 对象级别以及维护 SessionTransaction 的范围级别都支持 Python 上下文管理器。

下面,假设我们从一个 Session 开始

from sqlalchemy.orm import Session

session = Session(engine)

现在,我们可以使用上下文管理器在划定的事务中运行操作

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

在上述上下文结束时,假设没有引发异常,任何挂起对象都将被刷新到数据库,并且数据库事务将被提交。 如果在上述块中引发了异常,则事务将被回滚。 在这两种情况下,退出块后的上述 Session 都准备好在后续事务中使用。

Session.begin() 方法是可选的,Session 也可以在随时提交的方法中使用,在这种方法中,它会在需要时自动开始事务; 这些只需要提交或回滚

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute(text("< some select statement >"))
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object

Session 本身具有 Session.close() 方法。 如果 Session 在尚未提交或回滚的事务中开始,则此方法将取消(即回滚)该事务,并清除 Session 对象状态中包含的所有对象。 如果 Session 的使用方式不能保证调用 Session.commit()Session.rollback()(例如,不在上下文管理器或类似管理器中),则可以使用 close 方法来确保所有资源都已释放

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

最后,会话构造/关闭过程本身可以通过上下文管理器运行。 这是确保 Session 对象的使用范围限定在固定块内的最佳方法。 首先通过 Session 构造函数进行说明

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute(text("<some SELECT statement>"))

# remaining transactional state from the .execute() call is
# discarded

同样,sessionmaker 可以以相同的方式使用

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session

sessionmaker 本身包含 sessionmaker.begin() 方法,允许同时执行两个操作

with Session.begin() as session:
    session.add(some_object)

使用 SAVEPOINT

SAVEPOINT 事务(如果底层引擎支持)可以使用 Session.begin_nested() 方法来划定

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2

每次调用 Session.begin_nested() 时,都会在当前数据库事务的范围内(如果尚未进行中,则启动一个)向数据库发出新的“BEGIN SAVEPOINT”命令,并返回类型为 SessionTransaction 的对象,该对象表示此 SAVEPOINT 的句柄。 当调用此对象上的 .commit() 方法时,会向数据库发出“RELEASE SAVEPOINT”,如果改为调用 .rollback() 方法,则会发出“ROLLBACK TO SAVEPOINT”。 封闭的数据库事务保持进行中。

Session.begin_nested() 通常用作上下文管理器,在其中可以捕获特定的每个实例错误,并结合为事务状态的该部分发出的回滚,而无需回滚整个事务,如下例所示

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

Session.begin_nested() 产生的上下文管理器完成时,它会“提交”保存点,其中包括刷新所有挂起状态的通常行为。 当引发错误时,保存点将被回滚,并且 Session 的状态(对于已更改的对象而言是本地的)将过期。

这种模式非常适合用于 PostgreSQL 并捕获 IntegrityError 以检测重复行的情况; PostgreSQL 通常会在引发此类错误时中止整个事务,但是,当使用 SAVEPOINT 时,外部事务将得到维护。 在下面的示例中,数据列表被持久化到数据库中,偶尔会跳过“重复主键”记录,而不会回滚整个操作

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

当调用 Session.begin_nested() 时,Session 首先将所有当前挂起状态刷新到数据库; 这种情况无条件发生,与通常可用于禁用自动刷新的 Session.autoflush 参数的值无关。 这种行为的基本原理是,当此嵌套事务发生回滚时,Session 可能会使在 SAVEPOINT 范围内创建的任何内存中状态过期,同时确保在刷新这些过期的对象时,对象图的状态(在 SAVEPOINT 开始之前)将可用于从数据库重新加载。

在现代版本的 SQLAlchemy 中,当回滚由 Session.begin_nested() 启动的 SAVEPOINT 时,自创建 SAVEPOINT 以来修改的内存中对象状态将过期,但是自 SAVEPOINT 开始以来未更改的其他对象状态将得到维护。 这样做是为了使后续操作可以继续使用原本未受影响的数据,而无需从数据库刷新它。

另请参阅

Connection.begin_nested() - Core SAVEPOINT API

Session 级别 vs. Engine 级别事务控制

Core 中的 Connection 和 ORM 中的 _session.Session 具有等效的事务语义,无论是在 sessionmakerEngine 的级别,还是在 SessionConnection 的级别。 以下部分根据以下方案详细说明了这些情况

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

随时提交

SessionConnection 都具有 Connection.commit()Connection.rollback() 方法。 使用 SQLAlchemy 2.0 风格的操作,这些方法在所有情况下都会影响最外层的事务。 对于 Session,假定 Session.autobegin 保留其默认值 True

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

Session:

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()

单次开始

sessionmakerEngine 都具有 Engine.begin() 方法,该方法将获取一个新对象(分别是 SessionConnection)以执行 SQL 语句,然后返回一个上下文管理器,该管理器将维护该对象的开始/提交/回滚上下文。

Engine

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

Session

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically

嵌套事务

当通过 Session.begin_nested()Connection.begin_nested() 方法使用 SAVEPOINT 时,返回的事务对象必须用于提交或回滚 SAVEPOINT。 调用 Session.commit()Connection.commit() 方法将始终提交最外层的事务; 这是 SQLAlchemy 2.0 特有的行为,与 1.x 系列相反。

Engine

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback

# commits automatically

Session

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically

显式开始

Session 具有“自动开始”行为,这意味着一旦操作开始发生,它就会确保 SessionTransaction 存在以跟踪正在进行的操作。 当调用 Session.commit() 时,此事务完成。

通常,尤其是在框架集成中,希望控制“开始”操作发生的点。 为了适应这一点,Session 使用“自动开始”策略,这样可以直接为尚未开始事务的 Session 调用 Session.begin() 方法

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

上述模式更惯用地使用上下文管理器调用。

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Session.begin() 方法和会话的“自动开始”过程使用相同的步骤序列来开始事务。 这包括在发生时调用 SessionEvents.after_transaction_create() 事件; 框架使用此挂钩,以便将其自身的事务处理过程与 ORM Session 的事务处理过程集成。

启用两阶段提交

对于支持两阶段操作的后端(目前为 MySQL 和 PostgreSQL),可以指示会话使用两阶段提交语义。 这将协调跨数据库的事务提交,以便在所有数据库中提交或回滚事务。 您还可以 Session.prepare() 会话以与 SQLAlchemy 未管理的事务进行交互。 要使用两阶段事务,请在会话上设置 twophase=True 标志

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

设置事务隔离级别 / DBAPI AUTOCOMMIT

大多数 DBAPI 都支持可配置事务隔离级别的概念。 传统上,这些级别是“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”这四个级别。 这些通常在 DBAPI 连接开始新事务之前应用于 DBAPI 连接,请注意,大多数 DBAPI 将在首次发出 SQL 语句时隐式开始此事务。

支持隔离级别的 DBAPI 通常也支持真正的“自动提交”的概念,这意味着 DBAPI 连接本身将被置于非事务性自动提交模式。 这通常意味着不再发生自动向数据库发出“BEGIN”的典型 DBAPI 行为,但也可能包括其他指令。 当使用此模式时,DBAPI 在任何情况下都不使用事务。 SQLAlchemy 方法(如 .begin(), .commit().rollback())会静默传递。

SQLAlchemy 的方言支持在每个 Engine 或每个 Connection 的基础上设置隔离模式,在 create_engine() 级别以及在 Connection.execution_options() 级别都使用标志。

当使用 ORM Session 时,它充当引擎和连接的门面,但不直接公开事务隔离。 因此,为了影响事务隔离级别,我们需要根据需要对 EngineConnection 执行操作。

另请参阅

设置事务隔离级别,包括 DBAPI 自动提交 - 请务必查看隔离级别如何在 SQLAlchemy Connection 对象级别工作。

为 Sessionmaker / Engine 级别设置隔离

要全局设置具有特定隔离级别的 Sessionsessionmaker,第一种技术是可以在所有情况下针对特定隔离级别构造 Engine,然后将其用作 Session 和/或 sessionmaker 的连接源

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)

另一种选择(如果同时存在两个具有不同隔离级别的引擎,则很有用)是使用 Engine.execution_options() 方法,该方法将生成原始 Engine 的浅表副本,该副本与父引擎共享相同的连接池。 当操作将分为“事务性”和“自动提交”操作时,这通常是首选方法

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

上面,“eng”和“autocommit_engine”都共享相同的方言和连接池。 但是,当从 autocommit_engine 获取连接时,将在连接上设置“AUTOCOMMIT”模式。 然后,两个 sessionmaker 对象“transactional_session”和“autocommit_session”在与数据库连接一起工作时会继承这些特征。

autocommit_session”继续具有事务语义,包括 Session.commit()Session.rollback() 仍然认为它们是在“提交”和“回滚”对象,但是事务将静默地不存在。 因此,通常(虽然不是严格要求)具有 AUTOCOMMIT 隔离的 Session 以只读方式使用,即

with autocommit_session() as session:
    some_objects = session.execute(text("<statement>"))
    some_other_objects = session.execute(text("<statement>"))

# closes connection

为单个 Session 设置隔离

当我们创建一个新的 Session 时,无论是直接使用构造函数还是当我们调用 sessionmaker 生成的可调用对象时,我们都可以直接传递 bind 参数,从而覆盖预先存在的绑定。 例如,我们可以从默认 sessionmaker 创建我们的 Session,并传递为自动提交设置的引擎

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

对于配置了多个 “binds” 的 Sessionsessionmaker 的情况,我们可以完全重新指定 binds 参数,或者如果我们只想替换特定的 binds,我们可以使用 Session.bind_mapper()Session.bind_table() 方法。

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

为单个事务设置隔离级别

关于隔离级别的一个关键注意事项是,不能在已启动事务的 Connection 上安全地修改设置。数据库无法更改正在进行中的事务的隔离级别,并且某些 DBAPI 和 SQLAlchemy 方言在此区域的行为不一致。

因此,最好使用预先绑定到具有所需隔离级别的引擎的 Session。但是,可以使用 Session.connection() 方法在事务开始时影响每个连接的隔离级别。

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

上面,我们首先使用构造函数或 sessionmaker 生成一个 Session。然后,我们通过调用 Session.connection() 显式设置数据库级别事务的开始,这提供了将在数据库级别事务开始之前传递给连接的执行选项。事务以选定的隔离级别进行。当事务完成时,连接上的隔离级别将重置为其默认值,然后连接返回到连接池。

Session.begin() 方法也可用于开始 Session 级别的事务;在该调用之后调用 Session.connection() 可用于设置每个连接事务的隔离级别。

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

使用事件跟踪事务状态

有关会话事务状态更改的可用事件钩子的概述,请参阅 事务事件 部分。

将 Session 加入到外部事务中(例如用于测试套件)

如果正在使用的 Connection 已经处于事务状态(即已建立 Transaction),则可以通过将 Session 绑定到该 Connection,使 Session 参与到该事务中。通常的理由是测试套件允许 ORM 代码自由地使用 Session,包括调用 Session.commit() 的能力,之后整个数据库交互将被回滚。

在 2.0 版本中更改: “加入外部事务” 的方法在 2.0 版本中再次得到改进;不再需要 “重置” 嵌套事务的事件处理程序。

此方法的工作原理是在事务内建立一个 Connection,并可选择性地建立一个 SAVEPOINT,然后将其作为 “bind” 传递给 SessionSession.join_transaction_mode 参数传递了 "create_savepoint" 设置,这表明应该创建新的 SAVEPOINT 以实现 Session 的 BEGIN/COMMIT/ROLLBACK,这将使外部事务保持在其传递时的状态。

当测试结束时,外部事务将被回滚,以便在整个测试过程中对数据的任何更改都将被还原。

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

上述方法是 SQLAlchemy 自身 CI 的一部分,以确保它保持预期工作状态。