使用引擎和连接

本节详细介绍了 EngineConnection 和相关对象的直接用法。重要的是要注意,当使用 SQLAlchemy ORM 时,通常不会访问这些对象;相反,Session 对象用作数据库的接口。但是,对于围绕直接使用文本 SQL 语句和/或 SQL 表达式构造而构建的应用程序,而无需 ORM 的更高级别管理服务的参与,EngineConnection 是王者(和女王?)- 继续阅读。

基本用法

引擎配置 回忆一下,Engine 是通过 create_engine() 调用创建的

engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test")

create_engine() 的典型用法是每个特定的数据库 URL 一次,在单个应用程序进程的生命周期内全局持有。单个 Engine 代表进程管理许多单独的 DBAPI 连接,并且旨在以并发方式调用。 Engine 与 DBAPI connect() 函数不同步,后者仅表示一个连接资源 - Engine 在应用程序的模块级别创建一次时效率最高,而不是每个对象或每个函数调用。

Engine 最基本的功能是提供对 Connection 的访问,然后可以调用 SQL 语句。向数据库发出文本语句如下所示

from sqlalchemy import text

with engine.connect() as connection:
    result = connection.execute(text("select username from users"))
    for row in result:
        print("username:", row.username)

上面,Engine.connect() 方法返回一个 Connection 对象,并通过在 Python 上下文管理器(例如 with: 语句)中使用它,Connection.close() 方法在块的末尾自动调用。 Connection 是实际 DBAPI 连接的代理对象。 DBAPI 连接在创建 Connection 时从连接池中检索。

返回的对象称为 CursorResult,它引用 DBAPI 游标并提供类似于 DBAPI 游标的方法来获取行。当 CursorResult 的所有结果行(如果有)都耗尽时,DBAPI 游标将由 CursorResult 关闭。不返回行的 CursorResult,例如 UPDATE 语句(没有任何返回行),会在构造后立即释放游标资源。

Connectionwith: 块的末尾关闭时,引用的 DBAPI 连接将释放到连接池。从数据库本身的视角来看,假设池有空间存储此连接以供下次使用,则连接池实际上不会“关闭”连接。当连接返回到池以供重用时,池机制会在 DBAPI 连接上发出 rollback() 调用,以便删除任何事务状态或锁(这称为 返回时重置),并且连接已准备好下次使用。

上面的示例说明了文本 SQL 字符串的执行,应通过使用 text() 构造来调用,以指示我们想使用文本 SQL。 Connection.execute() 方法当然可以容纳更多内容;有关教程,请参阅 使用数据 中的 SQLAlchemy 统一教程

使用事务

注意

本节介绍如何在直接使用 EngineConnection 对象时使用事务。当使用 SQLAlchemy ORM 时,事务控制的公共 API 是通过 Session 对象,它在内部使用 Transaction 对象。有关更多信息,请参阅 管理事务

边走边提交

Connection 对象始终在事务块的上下文中发出 SQL 语句。第一次调用 Connection.execute() 方法来执行 SQL 语句时,此事务会自动开始,使用称为自动开始的行为。事务在 Connection 对象的范围内保持不变,直到调用 Connection.commit()Connection.rollback() 方法。在事务结束后,Connection 等待再次调用 Connection.execute() 方法,此时它再次自动开始。

这种调用风格称为边走边提交,并在下面的示例中说明

with engine.connect() as connection:
    connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

    connection.commit()  # commit the transaction

在“边走边提交”样式中,我们可以在正在进行的语句序列中使用 Connection.commit()Connection.rollback() 方法,这些语句是使用 Connection.execute() 发出的;每次事务结束,并且发出新语句时,都会隐式开始新事务

with engine.connect() as connection:
    connection.execute(text("<some statement>"))
    connection.commit()  # commits "some statement"

    # new transaction starts
    connection.execute(text("<some other statement>"))
    connection.rollback()  # rolls back "some other statement"

    # new transaction starts
    connection.execute(text("<a third statement>"))
    connection.commit()  # commits "a third statement"

2.0 版本新增: “边走边提交”样式是 SQLAlchemy 2.0 的新功能。当使用“future”样式引擎时,它也可在 SQLAlchemy 1.4 的“过渡”模式中使用。

一次开始

Connection 对象提供了一种更显式的事务管理样式,称为一次开始。与“边走边提交”相比,“一次开始”允许显式声明事务的起点,并允许将事务本身框架为上下文管理器块,以便事务的结束是隐式的。要使用“一次开始”,请使用 Connection.begin() 方法,该方法返回一个 Transaction 对象,该对象表示 DBAPI 事务。此对象还通过其自身的 Transaction.commit()Transaction.rollback() 方法支持显式管理,但作为首选做法,也支持上下文管理器接口,在该接口中,它将在块正常结束时自行提交,并在引发异常时发出回滚,然后再向外传播异常。下面说明了“一次开始”块的形式

with engine.connect() as connection:
    with connection.begin():
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
        connection.execute(
            some_other_table.insert(), {"q": 8, "p": "this is some more data"}
        )

    # transaction is committed

从引擎连接并一次开始

上述“一次开始”块的便捷简写形式是使用源 Engine 对象级别的 Engine.begin() 方法,而不是执行 Engine.connect()Connection.begin() 的两个单独步骤; Engine.begin() 方法返回一个特殊的上下文管理器,该管理器在内部维护 Connection 的上下文管理器以及 Transaction 的上下文管理器,通常由 Connection.begin() 方法返回

with engine.begin() as connection:
    connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

# transaction is committed, and Connection is released to the connection
# pool

提示

Engine.begin() 块中,我们可以调用 Connection.commit()Connection.rollback() 方法,这将提前结束块正常划分的事务。但是,如果我们这样做,在块结束之前,可能无法在 Connection 上发出进一步的 SQL 操作

>>> from sqlalchemy import create_engine
>>> e = create_engine("sqlite://", echo=True)
>>> with e.begin() as conn:
...     conn.commit()
...     conn.begin()
2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine COMMIT
Traceback (most recent call last):
...
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside
context manager.  Please complete the context manager before emitting
further commands.

混合风格

“边走边提交”和“一次开始”样式可以在单个 Engine.connect() 块中自由混合,前提是 Connection.begin() 的调用不与“自动开始”行为冲突。为了实现这一点,Connection.begin() 应该仅在发出任何 SQL 语句之前或直接在先前调用 Connection.commit()Connection.rollback() 之后调用

with engine.connect() as connection:
    with connection.begin():
        # run statements in a "begin once" block
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})

    # transaction is committed

    # run a new statement outside of a block. The connection
    # autobegins
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

    # commit explicitly
    connection.commit()

    # can use a "begin once" block here
    with connection.begin():
        # run more statements
        connection.execute(...)

当开发使用“一次开始”的代码时,如果事务已“自动开始”,则库将引发 InvalidRequestError

设置事务隔离级别,包括 DBAPI 自动提交

大多数 DBAPI 支持可配置事务隔离级别的概念。传统上,这些是四个级别“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常在 DBAPI 连接开始新事务之前应用于 DBAPI 连接,注意大多数 DBAPI 将在首次发出 SQL 语句时隐式开始此事务。

支持隔离级别的 DBAPI 通常也支持真正的“自动提交”的概念,这意味着 DBAPI 连接本身将置于非事务性自动提交模式。这通常意味着数据库不再自动发出“BEGIN”的典型 DBAPI 行为,但也可能包括其他指令。 SQLAlchemy 将“自动提交”的概念视为任何其他隔离级别;因为它是一种隔离级别,不仅会丢失“读取已提交”,还会丢失原子性。

提示

重要的是要注意,正如将在下面 理解 DBAPI 级别的自动提交隔离级别 部分进一步讨论的那样,“自动提交”隔离级别与任何其他隔离级别一样,影响 Connection 对象的“事务性”行为,该对象继续调用 DBAPI .commit().rollback() 方法(它们在自动提交下不起作用),并且 .begin() 方法假定 DBAPI 将隐式启动事务(这意味着 SQLAlchemy 的“begin”不会更改自动提交模式)。

SQLAlchemy 方言应尽可能大地支持这些隔离级别以及自动提交。

为连接设置隔离级别或 DBAPI 自动提交

对于从 Engine.connect() 获取的单个 Connection 对象,可以使用 Connection.execution_options() 方法为该 Connection 对象持续时间设置隔离级别。参数称为 Connection.execution_options.isolation_level,值是字符串,通常是以下名称的子集

# possible values for Connection.execution_options(isolation_level="<value>")

"AUTOCOMMIT"
"READ COMMITTED"
"READ UNCOMMITTED"
"REPEATABLE READ"
"SERIALIZABLE"

并非每个 DBAPI 都支持每个值;如果某个后端使用了不支持的值,则会引发错误。

例如,要在特定连接上强制 REPEATABLE READ,然后开始事务

with engine.connect().execution_options(
    isolation_level="REPEATABLE READ"
) as connection:
    with connection.begin():
        connection.execute(text("<statement>"))

提示

Connection.execution_options() 方法的返回值是调用该方法的同一 Connection 对象,这意味着它会就地修改 Connection 对象的状态。这是 SQLAlchemy 2.0 的新行为。此行为不适用于 Engine.execution_options() 方法;该方法仍然返回 Engine 的副本,如下所述,可用于构造具有不同执行选项的多个 Engine 对象,这些对象仍然共享相同的方言和连接池。

注意

Connection.execution_options.isolation_level 参数不一定适用于语句级别选项,例如 Executable.execution_options(),如果在该级别设置,将被拒绝。这是因为该选项必须在每个事务的基础上在 DBAPI 连接上设置。

为引擎设置隔离级别或 DBAPI 自动提交

Connection.execution_options.isolation_level 选项也可以在引擎范围内设置,这通常是首选的。这可以通过将 create_engine.isolation_level 参数传递给 create_engine() 来实现

from sqlalchemy import create_engine

eng = create_engine(
    "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)

使用上述设置,每个新的 DBAPI 连接在创建的瞬间都将被设置为对所有后续操作使用 "REPEATABLE READ" 隔离级别设置。

为单个引擎维护多个隔离级别

隔离级别也可以按引擎设置,从而提供更大的灵活性,可以使用 create_engine.execution_options 参数传递给 create_engine() 函数,或者使用 Engine.execution_options() 方法。后者会创建 Engine 的副本,该副本与原始引擎共享方言和连接池,但具有其自己的每个连接的隔离级别设置。

from sqlalchemy import create_engine

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    execution_options={"isolation_level": "REPEATABLE READ"},
)

使用上述设置,DBAPI 连接将被设置为对每个新开始的事务使用 "REPEATABLE READ" 隔离级别设置;但连接在被池化后,将被重置为连接首次建立时存在的原始隔离级别。create_engine() 级别的最终效果与使用 create_engine.isolation_level 参数没有任何不同。

但是,频繁选择在不同隔离级别下运行操作的应用程序可能希望创建一个主 Engine 的多个“子引擎”,每个子引擎都配置为不同的隔离级别。一个这样的用例是应用程序将操作分为“事务性”和“只读”操作,一个单独的 Engine,它使用 "AUTOCOMMIT",可以从主引擎中分离出来。

from sqlalchemy import create_engine

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

在上面,Engine.execution_options() 方法创建了原始 Engine 的浅拷贝。engautocommit_engine 都共享相同的方言和连接池。但是,“AUTOCOMMIT”模式将在从 autocommit_engine 获取连接时设置。

无论隔离级别设置为何,当连接返回连接池时,都会无条件地恢复。

理解 DBAPI 级别的自动提交隔离级别

在父章节中,我们介绍了 Connection.execution_options.isolation_level 参数的概念,以及如何使用它来设置数据库隔离级别,包括 DBAPI 级别的“自动提交”,SQLAlchemy 将其视为另一种事务隔离级别。在本节中,我们将尝试阐明这种方法的含义。

如果我们想检出一个 Connection 对象并在“自动提交”模式下使用它,我们将按如下步骤进行:

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(text("<statement>"))
    connection.execute(text("<statement>"))

上面演示了“DBAPI 自动提交”模式的正常用法。无需使用诸如 Connection.begin()Connection.commit() 之类的方法,因为所有语句都会立即提交到数据库。当代码块结束时,Connection 对象将恢复“自动提交”隔离级别,并且 DBAPI 连接被释放到连接池,DBAPI connection.rollback() 方法通常会在那里被调用,但由于上述语句已经提交,因此此回滚对数据库的状态没有改变。

重要的是要注意,“自动提交”模式即使在调用 Connection.begin() 方法时仍然存在;DBAPI 不会向数据库发出任何 BEGIN,也不会在调用 Connection.commit() 时发出 COMMIT。这种用法也不是错误场景,因为预期“自动提交”隔离级别可能会应用于原本假设事务上下文编写的代码;毕竟,“隔离级别”就像任何其他隔离级别一样,只是事务的配置细节。

在下面的示例中,语句保持自动提交状态,而与 SQLAlchemy 级别的事务块无关:

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")

    # this begin() does not affect the DBAPI connection, isolation stays at AUTOCOMMIT
    with connection.begin() as trans:
        connection.execute(text("<statement>"))
        connection.execute(text("<statement>"))

当我们在启用日志记录的情况下运行像上面这样的代码块时,日志记录将尝试指示,虽然调用了 DBAPI 级别的 .commit(),但由于自动提交模式,它可能不会产生任何效果。

INFO sqlalchemy.engine.Engine BEGIN (implicit)
...
INFO sqlalchemy.engine.Engine COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode

与此同时,即使我们正在使用“DBAPI 自动提交”,SQLAlchemy 的事务语义,即 Connection.begin() 的 Python 内部行为以及“自动开始”的行为,仍然存在,即使这些行为不影响 DBAPI 连接本身。为了说明这一点,下面的代码将引发错误,因为在自动开始已经发生后调用了 Connection.begin()

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")

    # "transaction" is autobegin (but has no effect due to autocommit)
    connection.execute(text("<statement>"))

    # this will raise; "transaction" is already begun
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面的例子还演示了相同的主题,“自动提交”隔离级别是底层数据库事务的配置细节,并且独立于 SQLAlchemy Connection 对象的 begin/commit 行为。“自动提交”模式不会以任何方式与 Connection.begin() 交互,并且 Connection 在执行与其事务相关的自身状态更改时不会查询此状态(除了在引擎日志记录中建议这些块实际上没有提交)。这种设计的理由是保持与 Connection 完全一致的用法模式,其中 DBAPI 自动提交模式可以独立更改,而无需指示其他地方的任何代码更改。

在隔离级别之间切换

隔离级别设置(包括自动提交模式)在连接释放回连接池时会自动重置。因此,最好避免尝试在单个 Connection 对象上切换隔离级别,因为这会导致过多的冗余。

为了说明如何在单个 Connection 检出范围内以临时模式使用“自动提交”,必须使用先前的隔离级别重新应用 Connection.execution_options.isolation_level 参数。上一节演示了尝试调用 Connection.begin() 以在自动提交发生时启动事务;我们可以重写该示例以实际执行此操作,方法是在调用 Connection.begin() 之前首先恢复隔离级别。

# if we wanted to flip autocommit on and off on a single connection/
# which... we usually don't.

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")

    # run statement(s) in autocommit mode
    connection.execute(text("<statement>"))

    # "commit" the autobegun "transaction"
    connection.commit()

    # switch to default isolation level
    connection.execution_options(isolation_level=connection.default_isolation_level)

    # use a begin block
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

在上面,为了手动恢复隔离级别,我们使用了 Connection.default_isolation_level 来恢复默认隔离级别(假设这是我们在这里想要的)。但是,最好使用 Connection 的架构,它已经在检入时自动处理隔离级别的重置。编写上述代码的首选方法是使用两个代码块:

# use an autocommit block
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
    # run statement in autocommit mode
    connection.execute(text("<statement>"))

# use a regular block
with engine.begin() as connection:
    connection.execute(text("<statement>"))

总结一下:

  1. “DBAPI 级别自动提交”隔离级别完全独立于 Connection 对象的“begin”和“commit”概念。

  2. 每个隔离级别使用单独的 Connection 检出。避免尝试在单个连接检出上在“自动提交”之间来回切换;让引擎完成恢复默认隔离级别的工作。

使用服务器端游标(又名流式结果)

一些后端显式支持“服务器端游标”与“客户端游标”的概念。这里的客户端游标意味着数据库驱动程序在从语句执行返回之前,完全将结果集中的所有行提取到内存中。PostgreSQL 和 MySQL/MariaDB 等驱动程序通常默认使用客户端游标。相比之下,服务器端游标表明结果行在数据库服务器的状态中保持挂起,因为结果行被客户端使用。例如,Oracle 数据库的驱动程序通常使用“服务器端”模型,而 SQLite 方言虽然不使用真正的“客户端/服务器”架构,但仍然使用非缓冲结果提取方法,这将使结果行在被使用之前保留在进程内存之外。

从这个基本架构来看,“服务器端游标”在提取非常大的结果集时更节省内存,同时可能会在客户端/服务器通信过程中引入更多复杂性,并且对于小型结果集(通常少于 10000 行)效率较低。

对于那些有条件支持缓冲或非缓冲结果的方言,通常在使用“非缓冲”或服务器端游标模式时存在注意事项。例如,当使用 psycopg2 方言时,如果服务器端游标与任何类型的 DML 或 DDL 语句一起使用,则会引发错误。当使用带有服务器端游标的 MySQL 驱动程序时,DBAPI 连接处于更脆弱的状态,并且不能像从错误条件中那样优雅地恢复,也不允许回滚继续进行,直到游标完全关闭。

因此,SQLAlchemy 的方言将始终默认为错误较少的游标版本,这意味着对于 PostgreSQL 和 MySQL 方言,它默认为缓冲的“客户端”游标,其中在从游标调用任何提取方法之前,整个结果集都会被拉入内存。这种操作模式在绝大多数情况下都是合适的;除非在应用程序分块提取大量行(在处理这些行可以在提取更多行之前完成的情况下)这种不常见的情况下,否则非缓冲游标通常没有用处。

对于提供客户端和服务器端游标选项的数据库驱动程序,Connection.execution_options.stream_resultsConnection.execution_options.yield_per 执行选项提供了在每个 Connection 或每个语句的基础上访问“服务器端游标”的能力。当使用 ORM Session 时,也存在类似的选项。

通过 yield_per 进行固定缓冲区流式传输

由于使用完全非缓冲的服务器端游标的单个行提取操作通常比一次提取批量的行更昂贵,因此 Connection.execution_options.yield_per 执行选项配置 Connection 或语句以使用可用的服务器端游标,同时配置固定大小的行缓冲区,该缓冲区将在行被使用时分批从服务器检索行。此参数可以是正整数值,使用 Connection.execution_options() 方法在 Connection 上或使用 Executable.execution_options() 方法在语句上。

1.4.40 版本新增: Connection.execution_options.yield_per 作为仅核心选项在 SQLAlchemy 1.4.40 中新增;对于之前的 1.4 版本,请结合 Result.yield_per() 直接使用 Connection.execution_options.stream_results

使用此选项等效于手动设置 Connection.execution_options.stream_results 选项(在下一节中描述),然后在 Result 对象上使用给定的整数值调用 Result.yield_per() 方法。在这两种情况下,此组合的效果包括:

  • 为给定后端选择服务器端游标模式(如果可用且不是该后端的默认行为)。

  • 当提取结果行时,它们将被分批缓冲,其中每个批次的大小(直到最后一个批次)将等于传递给 Connection.execution_options.yield_per 选项或 Result.yield_per() 方法的整数参数;最后一个批次的大小将根据剩余的少于此大小的行数确定。

  • 如果使用 Result.partitions() 方法,则其使用的默认分区大小也将等于此整数大小。

以下示例说明了这三种行为:

with engine.connect() as conn:
    with conn.execution_options(yield_per=100).execute(
        text("select * from table")
    ) as result:
        for partition in result.partitions():
            # partition is an iterable that will be at most 100 items
            for row in partition:
                print(f"{row}")

上面的示例说明了 yield_per=100 与使用 Result.partitions() 方法相结合,以在与从服务器提取的大小匹配的批次中运行行处理。Result.partitions() 的使用是可选的,如果直接迭代 Result,则每提取 100 行将缓冲一批新的行。不应使用诸如 Result.all() 之类的方法,因为这将一次性完全提取所有剩余行,并破坏使用 yield_per 的目的。

提示

如上例所示,Result 对象可以用作上下文管理器。当使用服务器端游标迭代时,这是确保 Result 对象被关闭的最佳方法,即使在迭代过程中引发异常也是如此。

Connection.execution_options.yield_per 选项也可移植到 ORM,由 Session 用于获取 ORM 对象,它也限制了一次生成的 ORM 对象数量。请参阅 使用 Yield Per 获取大型结果集 章节 - 在 ORM 查询指南 中,了解有关将 Connection.execution_options.yield_per 与 ORM 一起使用的更多背景信息。

1.4.40 版本新增: 添加了 Connection.execution_options.yield_per 作为核心级别的执行选项,以方便地一次性设置流式结果、缓冲区大小和分区大小,这种方式可以转移到 ORM 的类似用例中。

使用 stream_results 进行动态增长缓冲区流式传输

要在没有特定分区大小的情况下启用服务器端游标,可以使用 Connection.execution_options.stream_results 选项,它像 Connection.execution_options.yield_per 一样,可以在 Connection 对象或语句对象上调用。

当使用 Connection.execution_options.stream_results 选项传递的 Result 对象被直接迭代时,行在内部被提取,使用默认的缓冲方案,该方案首先缓冲一小组行,然后在每次提取时缓冲更大和更大的缓冲区,直到达到预配置的 1000 行限制。可以使用 Connection.execution_options.max_row_buffer 执行选项影响此缓冲区的最大大小。

with engine.connect() as conn:
    with conn.execution_options(stream_results=True, max_row_buffer=100).execute(
        text("select * from table")
    ) as result:
        for row in result:
            print(f"{row}")

虽然 Connection.execution_options.stream_results 选项可以与 Result.partitions() 方法结合使用,但应将特定分区大小传递给 Result.partitions(),以便不提取整个结果。在设置使用 Result.partitions() 方法时,通常更直接地使用 Connection.execution_options.yield_per 选项。

模式名称的转换

为了支持将通用表集分布到多个模式中的多租户应用程序,可以使用 Connection.execution_options.schema_translate_map 执行选项,以便在不进行任何更改的情况下,将一组 Table 对象重新用于在不同的模式名称下呈现。

给定一个表:

user_table = Table(
    "user",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("name", String(50)),
)

Table 的 “schema”(模式) 由 Table.schema 属性定义为 NoneConnection.execution_options.schema_translate_map 可以指定将所有 schema 为 NoneTable 对象渲染为 schema user_schema_one

connection = engine.connect().execution_options(
    schema_translate_map={None: "user_schema_one"}
)

result = connection.execute(user_table.select())

以上代码将在数据库上调用如下形式的 SQL:

SELECT user_schema_one.user.id, user_schema_one.user.name FROM
user_schema_one.user

也就是说,schema 名称被替换为我们翻译后的名称。该映射可以指定任意数量的目标->目标 schema。

connection = engine.connect().execution_options(
    schema_translate_map={
        None: "user_schema_one",  # no schema name -> "user_schema_one"
        "special": "special_schema",  # schema="special" becomes "special_schema"
        "public": None,  # Table objects with schema="public" will render with no schema
    }
)

Connection.execution_options.schema_translate_map 参数会影响从 SQL 表达式语言生成的所有 DDL 和 SQL 构造,这些构造都派生自 TableSequence 对象。它不会影响通过 text() 构造或传递给 Connection.execute() 的纯字符串 SQL。

此功能仅在 schema 名称直接从 TableSequence 的名称派生时生效;它不会影响直接传递字符串 schema 名称的方法。按照这种模式,它在诸如 MetaData.create_all()MetaData.drop_all() 等方法执行的 “can create”(可以创建)/“can drop”(可以删除)检查中生效,并且在使用给定 Table 对象的表反射时生效。但是,它不会影响 Inspector 对象上的操作,因为 schema 名称被显式传递给这些方法。

提示

要将 schema 翻译功能与 ORM Session 一起使用,请在 Engine 级别设置此选项,然后将该 engine 传递给 SessionSession 为每个事务使用一个新的 Connection

schema_engine = engine.execution_options(schema_translate_map={...})

session = Session(schema_engine)

...

警告

当使用没有扩展的 ORM Session 时,schema 翻译功能仅支持每个 Session 一个 schema 翻译映射。如果为每个语句提供不同的 schema 翻译映射,则将无法工作,因为 ORM Session 不会将当前的 schema 翻译值考虑在单个对象中。

要将单个 Session 与多个 schema_translate_map 配置一起使用,可以使用 Horizontal Sharding 扩展。请参阅 Horizontal Sharding 中的示例。

SQL 编译缓存

1.4 版本新增: SQLAlchemy 现在具有透明查询缓存系统,可大幅降低将 SQL 语句构造转换为 SQL 字符串所涉及的 Python 计算开销,包括 Core 和 ORM。请参阅 透明 SQL 编译缓存添加到 Core、ORM 中的所有 DQL、DML 语句 中的介绍。

SQLAlchemy 包含用于 SQL 编译器及其 ORM 变体的综合缓存系统。此缓存系统在 Engine 中是透明的,并确保给定的 Core 或 ORM SQL 语句的 SQL 编译过程,以及为该语句组装结果获取机制的相关计算,对于该语句对象和所有具有相同结构的其他对象,在特定结构保留在 engine 的 “compiled cache”(编译缓存) 中的期间内,只会发生一次。 “具有相同结构的语句对象” 通常对应于在函数中构造并在每次函数运行时构建的 SQL 语句。

def run_my_statement(connection, parameter):
    stmt = select(table)
    stmt = stmt.where(table.c.col == parameter)
    stmt = stmt.order_by(table.c.id)
    return connection.execute(stmt)

上面的语句将生成类似于 SELECT id, col FROM table WHERE col = :col ORDER BY id 的 SQL,请注意,虽然 parameter 的值是一个普通的 Python 对象(例如字符串或整数),但语句的字符串 SQL 形式不包含此值,因为它使用了绑定参数。后续调用上述 run_my_statement() 函数将在 connection.execute() 调用的范围内使用缓存的编译构造,以提高性能。

注意

重要的是要注意,SQL 编译缓存缓存的仅是传递给数据库的 SQL 字符串,而不是查询返回的数据。它绝不是数据缓存,也不影响特定 SQL 语句返回的结果,也不意味着任何与获取结果行相关的内存使用。

虽然 SQLAlchemy 自早期的 1.x 系列以来就有一个基本的语句缓存,并且还为 ORM 提供了 “Baked Query”(烘焙查询) 扩展,但这两种系统都需要高度特殊的 API 使用才能使缓存有效。 1.4 版本的新缓存是完全自动的,并且无需更改编程风格即可生效。

缓存自动使用,无需任何配置更改,也无需任何特殊步骤即可启用它。以下各节详细介绍了缓存的配置和高级使用模式。

配置

缓存本身是一个类似字典的对象,称为 LRUCache,它是 SQLAlchemy 的内部字典子类,它跟踪特定键的使用情况,并具有定期 “pruning”(修剪)步骤,当缓存大小达到一定阈值时,它会删除最近最少使用的项目。此缓存的大小默认为 500,可以使用 create_engine.query_cache_size 参数进行配置。

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test", query_cache_size=1200
)

缓存的大小可以增长到给定大小的 150%,然后才会被修剪回目标大小。因此,上面大小为 1200 的缓存可以增长到 1800 个元素,此时它将被修剪为 1200 个。

缓存的大小基于每个引擎渲染的每个唯一 SQL 语句的单个条目。来自 Core 和 ORM 的 SQL 语句都被平等对待。DDL 语句通常不会被缓存。为了确定缓存正在做什么,引擎日志记录将包含有关缓存行为的详细信息,这将在下一节中介绍。

使用日志记录估计缓存性能

上面 1200 的缓存大小实际上相当大。对于小型应用程序,100 的大小可能就足够了。为了估计缓存的最佳大小(假设目标主机上有足够的内存),缓存的大小应基于可能为正在使用的目标引擎渲染的唯一 SQL 字符串的数量。查看此情况最便捷的方法是使用 SQL 回显,这可以通过使用 create_engine.echo 标志或使用 Python 日志记录来最直接地启用;有关日志记录配置的背景信息,请参阅 配置日志记录 部分。

例如,我们将检查以下程序生成的日志记录。

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session

Base = declarative_base()


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(String)
    bs = relationship("B")


class B(Base):
    __tablename__ = "b"
    id = Column(Integer, primary_key=True)
    a_id = Column(ForeignKey("a.id"))
    data = Column(String)


e = create_engine("sqlite://", echo=True)
Base.metadata.create_all(e)

s = Session(e)

s.add_all([A(bs=[B(), B(), B()]), A(bs=[B(), B(), B()]), A(bs=[B(), B(), B()])])
s.commit()

for a_rec in s.scalars(select(A)):
    print(a_rec.bs)

运行时,记录的每个 SQL 语句将在传递的参数左侧包含一个带括号的缓存统计信息徽章。我们可能会看到的四种消息类型总结如下:

  • [raw sql] - 驱动程序或最终用户使用 Connection.exec_driver_sql() 发出了原始 SQL - 缓存不适用。

  • [no key] - 语句对象是不缓存的 DDL 语句,或者语句对象包含不可缓存的元素,例如用户定义的构造或任意大的 VALUES 子句。

  • [generated in Xs] - 语句是缓存未命中,必须编译,然后存储在缓存中。生成编译后的构造花费了 X 秒。数字 X 将以小数秒为单位。

  • [cached since Xs ago] - 语句是缓存命中,无需重新编译。该语句自 X 秒前就已存储在缓存中。数字 X 将与应用程序运行的时间以及语句被缓存的时间成正比,例如,24 小时周期将为 86400。

每个徽章将在下面详细描述。

我们看到的上述程序的第一个语句将是 SQLite 方言检查 “a” 和 “b” 表是否存在。

INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("a")
INFO sqlalchemy.engine.Engine [raw sql] ()
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("b")
INFO sqlalchemy.engine.Engine [raw sql] ()

对于上述两个 SQLite PRAGMA 语句,徽章显示 [raw sql],这表示驱动程序正在使用 Connection.exec_driver_sql() 将 Python 字符串直接发送到数据库。缓存不适用于此类语句,因为它们已经以字符串形式存在,并且由于 SQLAlchemy 不会提前解析 SQL 字符串,因此无法知道将返回哪些类型的的结果行。

我们接下来看到的语句是 CREATE TABLE 语句。

INFO sqlalchemy.engine.Engine
CREATE TABLE a (
  id INTEGER NOT NULL,
  data VARCHAR,
  PRIMARY KEY (id)
)

INFO sqlalchemy.engine.Engine [no key 0.00007s] ()
INFO sqlalchemy.engine.Engine
CREATE TABLE b (
  id INTEGER NOT NULL,
  a_id INTEGER,
  data VARCHAR,
  PRIMARY KEY (id),
  FOREIGN KEY(a_id) REFERENCES a (id)
)

INFO sqlalchemy.engine.Engine [no key 0.00006s] ()

对于这些语句中的每一个,徽章都显示 [no key 0.00006s]。这表示对于这两个特定语句,由于面向 DDL 的 CreateTable 构造未生成缓存键,因此未发生缓存。 DDL 构造通常不参与缓存,因为它们通常不会被重复第二次,而且 DDL 也是数据库配置步骤,性能并不那么关键。

[no key] 徽章的另一个重要原因是,它可以为可缓存的 SQL 语句生成,但由于某些特定的子构造当前不可缓存。这方面的示例包括未定义缓存参数的自定义用户定义的 SQL 元素,以及一些生成任意长且不可重现的 SQL 字符串的构造,主要示例是 Values 构造以及将 “multivalued inserts”(多值插入)与 Insert.values() 方法一起使用时。

到目前为止,我们的缓存仍然是空的。但是,接下来的语句将被缓存,一个片段如下所示:

INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [generated in 0.00011s] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [cached since 0.0003533s ago] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [cached since 0.0005326s ago] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [generated in 0.00010s] (1, None)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [cached since 0.0003232s ago] (1, None)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [cached since 0.0004887s ago] (1, None)

上面,我们基本上看到了两个唯一的 SQL 字符串:"INSERT INTO a (data) VALUES (?)""INSERT INTO b (a_id, data) VALUES (?, ?)"。由于 SQLAlchemy 对所有文字值都使用绑定参数,因此即使这些语句针对不同的对象重复多次,但由于参数是分开的,因此实际的 SQL 字符串保持不变。

注意

上面的两个语句是由 ORM 单元的工作过程生成的,实际上会将它们缓存在每个映射器本地的单独缓存中。但是,机制和术语是相同的。禁用或使用备用字典来缓存某些(或所有)语句 部分将在下面介绍用户代码如何在每个语句的基础上使用备用缓存容器。

我们看到的这两个语句的第一个出现的缓存徽章是 [generated in 0.00011s]。这表示该语句不在缓存中,在 0.00011 秒内编译成字符串,然后被缓存。当我们看到 [generated] 徽章时,我们知道这意味着存在缓存未命中。对于特定语句的第一次出现,这是预期的。但是,如果在长时间运行的应用程序中观察到大量新的 [generated] 徽章,而该应用程序通常一遍又一遍地使用同一系列 SQL 语句,则这可能表明 create_engine.query_cache_size 参数太小。当缓存的语句由于 LRU 缓存修剪较少使用的项目而从缓存中逐出时,下次使用时将显示 [generated] 徽章。

然后我们看到的这两个语句的后续出现的缓存徽章看起来像 [cached since 0.0003533s ago]。这表示该语句在缓存中找到,并且最初在 0.0003533 秒前放入缓存中。重要的是要注意,虽然 [generated][cached since] 徽章都引用了秒数,但它们的含义不同;在 [generated] 的情况下,该数字是编译语句所用时间的粗略计时,并且将是一个非常小的时长。在 [cached since] 的情况下,这是语句在缓存中存在的总时间。对于已运行六个小时的应用程序,此数字可能显示为 [cached since 21600 seconds ago],这是一件好事。“cached since”(自缓存以来)的数字很高表明这些语句很长时间以来没有受到缓存未命中的影响。即使应用程序已运行很长时间,但 “cached since”(自缓存以来)的数字仍然很低的语句可能表明这些语句太频繁地受到缓存未命中的影响,并且可能需要增加 create_engine.query_cache_size

我们的示例程序然后执行一些 SELECT 操作,我们可以在其中看到 “generated”(已生成)然后 “cached”(已缓存)的相同模式,用于 “a” 表的 SELECT 以及后续 “b” 表的惰性加载。

INFO sqlalchemy.engine.Engine SELECT a.id AS a_id, a.data AS a_data
FROM a
INFO sqlalchemy.engine.Engine [generated in 0.00009s] ()
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id
INFO sqlalchemy.engine.Engine [generated in 0.00010s] (1,)
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id
INFO sqlalchemy.engine.Engine [cached since 0.0005922s ago] (2,)
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id

从上面的程序中,完整运行显示总共缓存了四个不同的 SQL 字符串。这表明缓存大小为 four 就足够了。这显然是一个非常小的尺寸,默认尺寸 500 可以保持默认值。

缓存使用多少内存?

上一节详细介绍了一些检查 create_engine.query_cache_size 是否需要更大的技术。我们如何知道缓存是否不太大?我们可能希望将 create_engine.query_cache_size 设置为不超过某个数字的原因可能是因为我们的应用程序可能会使用大量不同的语句,例如从搜索 UX 动态构建查询的应用程序,并且我们不希望我们的主机耗尽内存,例如,在过去的 24 小时内运行了 10 万个不同的查询,并且它们都被缓存了。

极其难以衡量 Python 数据结构占用了多少内存,但是,使用通过 top 测量内存增长的过程,因为连续的 250 个新语句被添加到缓存中,这表明中等 Core 语句占用约 12K,而小型 ORM 语句占用约 20K,包括用于 ORM 的结果获取结构,这将更大。

禁用或使用备用字典来缓存某些(或所有)语句

内部使用的缓存称为 LRUCache,但这主要只是一个字典。任何字典都可以用作任何语句系列的缓存,方法是将 Connection.execution_options.compiled_cache 选项用作执行选项。执行选项可以在语句、EngineConnection 上设置,以及在使用 SQLAlchemy-2.0 样式调用的 ORM Session.execute() 方法时设置。例如,要运行一系列 SQL 语句并将它们缓存在特定字典中:

my_cache = {}
with engine.connect().execution_options(compiled_cache=my_cache) as conn:
    conn.execute(table.select())

SQLAlchemy ORM 使用上述技术来保持工作单元 “flush”(刷新)过程中的每个映射器缓存,这些缓存与 Engine 上配置的默认缓存以及某些关系加载器查询分开。

也可以使用此参数禁用缓存,方法是发送 None 值。

# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
    conn.execute(table.select())

第三方方言的缓存

缓存功能要求方言的编译器生成的 SQL 字符串对于许多语句调用是安全的,前提是给定的缓存键与该 SQL 字符串相关联。这意味着语句中的任何文字值,例如 SELECT 的 LIMIT/OFFSET 值,都不能在方言的编译方案中硬编码,因为编译后的字符串将不可重用。 SQLAlchemy 支持使用 BindParameter.render_literal_execute() 方法渲染绑定参数,该方法可以应用于现有 Select._limit_clauseSelect._offset_clause 属性,由自定义编译器完成,这将在本节后面进行说明。

由于存在许多第三方方言,其中许多方言可能正在从 SQL 语句生成文字值,而没有利用较新的 “literal execute”(文字执行)功能,因此 SQLAlchemy 从 1.4.5 版本开始,已向方言添加了一个名为 Dialect.supports_statement_cache 的属性。在运行时,会直接在特定方言的类上检查此属性是否存在,即使它已经存在于超类上,因此即使是子类化现有可缓存 SQLAlchemy 方言(例如 sqlalchemy.dialects.postgresql.PGDialect)的第三方方言,也必须显式包含此属性才能启用缓存。只有在根据需要更改方言并测试了具有不同参数的编译后的 SQL 语句的可重用性之后,才应启用该属性。

对于所有不支持此属性的第三方方言,此类方言的日志记录将指示 dialect does not support caching。(方言不支持缓存)。

当方言已经过缓存测试时,特别是 SQL 编译器已更新为不在 SQL 字符串中直接渲染任何文字 LIMIT/OFFSET 时,方言作者可以按如下方式应用该属性:

from sqlalchemy.engine.default import DefaultDialect


class MyDialect(DefaultDialect):
    supports_statement_cache = True

该标志也需要应用于方言的所有子类。

class MyDBAPIForMyDialect(MyDialect):
    supports_statement_cache = True

1.4.5 版本新增: 添加了 Dialect.supports_statement_cache 属性。

方言修改的典型情况如下。

示例:使用后编译参数渲染 LIMIT/OFFSET

例如,假设方言覆盖了 SQLCompiler.limit_clause() 方法,该方法生成 SQL 语句的 “LIMIT/OFFSET” 子句,如下所示:

# pre 1.4 style code
def limit_clause(self, select, **kw):
    text = ""
    if select._limit is not None:
        text += " \n LIMIT %d" % (select._limit,)
    if select._offset is not None:
        text += " \n OFFSET %d" % (select._offset,)
    return text

上面的例程将 Select._limitSelect._offset 整数值渲染为嵌入在 SQL 语句中的文字整数。对于不支持在 SELECT 语句的 LIMIT/OFFSET 子句中使用绑定参数的数据库来说,这是一个常见的需求。但是,在初始编译阶段渲染整数值与缓存直接不兼容,因为 Select 对象的 limit 和 offset 整数值不是缓存键的一部分,因此许多具有不同 limit/offset 值的 Select 语句将无法以正确的值渲染。

上述代码的更正方法是将文字整数移动到 SQLAlchemy 的 后编译 功能中,该功能将在初始编译阶段之外渲染文字整数,而是在执行时,在语句发送到 DBAPI 之前渲染。在编译阶段可以使用 BindParameter.render_literal_execute() 方法访问此功能,并结合使用 Select._limit_clauseSelect._offset_clause 属性,这些属性将 LIMIT/OFFSET 表示为完整的 SQL 表达式,如下所示:

# 1.4 cache-compatible code
def limit_clause(self, select, **kw):
    text = ""

    limit_clause = select._limit_clause
    offset_clause = select._offset_clause

    if select._simple_int_clause(limit_clause):
        text += " \n LIMIT %s" % (
            self.process(limit_clause.render_literal_execute(), **kw)
        )
    elif limit_clause is not None:
        # assuming the DB doesn't support SQL expressions for LIMIT.
        # Otherwise render here normally
        raise exc.CompileError(
            "dialect 'mydialect' can only render simple integers for LIMIT"
        )
    if select._simple_int_clause(offset_clause):
        text += " \n OFFSET %s" % (
            self.process(offset_clause.render_literal_execute(), **kw)
        )
    elif offset_clause is not None:
        # assuming the DB doesn't support SQL expressions for OFFSET.
        # Otherwise render here normally
        raise exc.CompileError(
            "dialect 'mydialect' can only render simple integers for OFFSET"
        )

    return text

上面的方法将生成一个编译后的 SELECT 语句,如下所示:

SELECT x FROM y
LIMIT __[POSTCOMPILE_param_1]
OFFSET __[POSTCOMPILE_param_2]

在上面,__[POSTCOMPILE_param_1]__[POSTCOMPILE_param_2] 指示符将在语句执行时,在 SQL 字符串从缓存中检索出来后,填充其对应的整数值。

在根据需要进行上述更改后,应将 Dialect.supports_statement_cache 标志设置为 True。强烈建议第三方方言使用 方言第三方测试套件,该套件将断言诸如带有 LIMIT/OFFSET 的 SELECT 等操作是否已正确渲染和缓存。

使用 Lambda 为语句生成增加显著的速度增益

深入炼金术

除非在性能非常密集的情况下,否则此技术通常不是必需的,并且适用于经验丰富的 Python 程序员。虽然相当简单,但它涉及元编程概念,这些概念不适合 Python 初学者。 lambda 方法可以在以后以最少的工作量应用于现有代码。

Python 函数(通常表示为 lambda)可用于生成 SQL 表达式,这些表达式可以根据 lambda 函数本身的 Python 代码位置以及 lambda 中的闭包变量进行缓存。基本原理不仅允许缓存 SQL 表达式构造的 SQL 字符串编译形式(这是 SQLAlchemy 在不使用 lambda 系统时的正常行为),而且还允许缓存 SQL 表达式构造本身的 Python 组合,这也具有一定程度的 Python 开销。

lambda SQL 表达式功能作为一种性能增强功能提供,并且在 with_loader_criteria() ORM 选项中也有可选使用,以便提供通用 SQL 片段。

概要

Lambda 语句是使用 lambda_stmt() 函数构造的,该函数返回 StatementLambdaElement 的实例,它本身是一个可执行的语句构造。可以使用 Python 加法运算符 +StatementLambdaElement.add_criteria() 方法将其他修饰符和条件添加到对象,该方法允许更多选项。

假设 lambda_stmt() 构造是在一个封闭函数或方法中调用的,该函数或方法期望在应用程序中多次使用,以便第一次执行之后的后续执行可以利用已缓存的编译后的 SQL。当 lambda 在 Python 的封闭函数内部构造时,它也会受到闭包变量的影响,这对于整个方法至关重要。

from sqlalchemy import lambda_stmt


def run_my_statement(connection, parameter):
    stmt = lambda_stmt(lambda: select(table))
    stmt += lambda s: s.where(table.c.col == parameter)
    stmt += lambda s: s.order_by(table.c.id)

    return connection.execute(stmt)


with engine.connect() as conn:
    result = run_my_statement(some_connection, "some parameter")

在上面,用于定义 SELECT 语句结构的三个 lambda 可调用对象仅被调用一次,并且生成的 SQL 字符串缓存在引擎的编译缓存中。从那时起,可以多次调用 run_my_statement() 函数,并且其中的 lambda 可调用对象将不会被调用,仅用作缓存键以检索已编译的 SQL。

注意

重要的是要注意,当不使用 lambda 系统时,已经存在 SQL 缓存。 lambda 系统仅通过缓存 SQL 构造本身的构建并使用更简单的缓存键,为每个调用的 SQL 语句增加了额外的工作量减少层。

Lambda 的快速指南

最重要的是,lambda SQL 系统中的重点是确保为 lambda 生成的缓存键与其将生成的 SQL 字符串之间永远不会存在不匹配。LambdaElement 和相关对象将运行并分析给定的 lambda,以便计算每次运行时应如何缓存它,并尝试检测任何潜在问题。基本指南包括:

  • 任何类型的语句均受支持 - 虽然期望 select() 构造是 lambda_stmt() 的主要用例,但 DML 语句(如 insert()update())同样适用

    def upd(id_, newname):
        stmt = lambda_stmt(lambda: users.update())
        stmt += lambda s: s.values(name=newname)
        stmt += lambda s: s.where(users.c.id == id_)
        return stmt
    
    
    with engine.begin() as conn:
        conn.execute(upd(7, "foo"))
  • ORM 用例也直接支持 - lambda_stmt() 可以完全容纳 ORM 功能,并可直接与 Session.execute() 一起使用

    def select_user(session, name):
        stmt = lambda_stmt(lambda: select(User))
        stmt += lambda s: s.where(User.name == name)
    
        row = session.execute(stmt).first()
        return row
  • 绑定参数自动适应 - 与 SQLAlchemy 之前的“baked query”系统相比,lambda SQL 系统可以适应 Python 字面值,这些字面值会自动成为 SQL 绑定参数。这意味着,即使给定的 lambda 只运行一次,成为绑定参数的值也会在每次运行时从 lambda 的闭包中提取

    >>> def my_stmt(x, y):
    ...     stmt = lambda_stmt(lambda: select(func.max(x, y)))
    ...     return stmt
    >>> engine = create_engine("sqlite://", echo=True)
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(5, 10)))
    ...     print(conn.scalar(my_stmt(12, 8)))
    
    SELECT max(?, ?) AS max_1 [generated in 0.00057s] (5, 10)
    10
    SELECT max(?, ?) AS max_1 [cached since 0.002059s ago] (12, 8)
    12

    在上面,StatementLambdaElement 从每次调用 my_stmt() 时生成的 lambda 的闭包中提取了 xy 的值;这些值被替换到缓存的 SQL 构造中作为参数的值。

  • lambda 理想情况下应在所有情况下生成相同的 SQL 结构 - 避免在 lambda 内部使用条件语句或自定义可调用对象,这些语句或对象可能会使其根据输入生成不同的 SQL;如果一个函数可能根据条件使用两个不同的 SQL 片段,请使用两个单独的 lambda

    # **Don't** do this:
    
    
    def my_stmt(parameter, thing=False):
        stmt = lambda_stmt(lambda: select(table))
        stmt += lambda s: (
            s.where(table.c.x > parameter) if thing else s.where(table.c.y == parameter)
        )
        return stmt
    
    
    # **Do** do this:
    
    
    def my_stmt(parameter, thing=False):
        stmt = lambda_stmt(lambda: select(table))
        if thing:
            stmt += lambda s: s.where(table.c.x > parameter)
        else:
            stmt += lambda s: s.where(table.c.y == parameter)
        return stmt

    如果 lambda 没有生成一致的 SQL 构造,可能会发生各种故障,并且有些故障目前不容易检测到。

  • 不要在 lambda 内部使用函数来生成绑定值 - 绑定值跟踪方法要求 SQL 语句中要使用的实际值必须本地存在于 lambda 的闭包中。如果值是从其他函数生成的,则不可能做到这一点,并且如果尝试这样做,LambdaElement 通常应引发错误

    >>> def my_stmt(x, y):
    ...     def get_x():
    ...         return x
    ...
    ...     def get_y():
    ...         return y
    ...
    ...     stmt = lambda_stmt(lambda: select(func.max(get_x(), get_y())))
    ...     return stmt
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(5, 10)))
    Traceback (most recent call last):
      # ...
    sqlalchemy.exc.InvalidRequestError: Can't invoke Python callable get_x()
    inside of lambda expression argument at
    <code object <lambda> at 0x7fed15f350e0, file "<stdin>", line 6>;
    lambda SQL constructs should not invoke functions from closure variables
    to produce literal values since the lambda SQL system normally extracts
    bound values without actually invoking the lambda or any functions within it.

    在上面,如果必须使用 get_x()get_y(),则应在 lambda 外部发生,并分配给本地闭包变量

    >>> def my_stmt(x, y):
    ...     def get_x():
    ...         return x
    ...
    ...     def get_y():
    ...         return y
    ...
    ...     x_param, y_param = get_x(), get_y()
    ...     stmt = lambda_stmt(lambda: select(func.max(x_param, y_param)))
    ...     return stmt
  • 避免在 lambda 内部引用非 SQL 构造,因为它们默认不可缓存 - 此问题指的是 LambdaElement 如何从语句中的其他闭包变量创建缓存键。为了提供最准确的缓存键保证,lambda 闭包中的所有对象都被认为是重要的,并且默认情况下不会假定任何对象适合作为缓存键。因此,以下示例也会引发相当详细的错误消息

    >>> class Foo:
    ...     def __init__(self, x, y):
    ...         self.x = x
    ...         self.y = y
    >>> def my_stmt(foo):
    ...     stmt = lambda_stmt(lambda: select(func.max(foo.x, foo.y)))
    ...     return stmt
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(Foo(5, 10))))
    Traceback (most recent call last):
      # ...
    sqlalchemy.exc.InvalidRequestError: Closure variable named 'foo' inside of
    lambda callable <code object <lambda> at 0x7fed15f35450, file
    "<stdin>", line 2> does not refer to a cacheable SQL element, and also
    does not appear to be serving as a SQL literal bound value based on the
    default SQL expression returned by the function.  This variable needs to
    remain outside the scope of a SQL-generating lambda so that a proper cache
    key may be generated from the lambda's state.  Evaluate this variable
    outside of the lambda, set track_on=[<elements>] to explicitly select
    closure elements to track, or set track_closure_variables=False to exclude
    closure variables from being part of the cache key.

    上面的错误表明 LambdaElement 不会假设传入的 Foo 对象在所有情况下都会继续表现相同。它也不会假设它可以将 Foo 用作缓存键的一部分;如果它要使用 Foo 对象作为缓存键的一部分,如果存在许多不同的 Foo 对象,这将用重复信息填充缓存,并且还会长期持有对所有这些对象的引用。

    解决上述情况的最佳方法是不在 lambda 内部引用 foo,而是在外部引用它

    >>> def my_stmt(foo):
    ...     x_param, y_param = foo.x, foo.y
    ...     stmt = lambda_stmt(lambda: select(func.max(x_param, y_param)))
    ...     return stmt

    在某些情况下,如果 lambda 的 SQL 结构保证永远不会根据输入而更改,则传递 track_closure_variables=False 将禁用除用于绑定参数的闭包变量之外的任何闭包变量的跟踪

    >>> def my_stmt(foo):
    ...     stmt = lambda_stmt(
    ...         lambda: select(func.max(foo.x, foo.y)), track_closure_variables=False
    ...     )
    ...     return stmt

    还可以使用 track_on 参数将对象添加到元素中,以显式构成缓存键的一部分;使用此参数允许特定值用作缓存键,并且还会阻止考虑其他闭包变量。这对于 SQL 构造的一部分源自某种可能具有许多不同值的上下文对象的情况很有用。在下面的示例中,SELECT 语句的第一段将禁用对 foo 变量的跟踪,而第二段将显式跟踪 self 作为缓存键的一部分

    >>> def my_stmt(self, foo):
    ...     stmt = lambda_stmt(
    ...         lambda: select(*self.column_expressions), track_closure_variables=False
    ...     )
    ...     stmt = stmt.add_criteria(lambda: self.where_criteria, track_on=[self])
    ...     return stmt

    使用 track_on 意味着给定的对象将长期存储在 lambda 的内部缓存中,并且只要缓存不清除这些对象(默认使用 1000 个条目的 LRU 方案),就将具有强引用。

缓存键生成

为了理解 lambda SQL 构造发生的一些选项和行为,了解缓存系统很有帮助。

SQLAlchemy 的缓存系统通常通过生成一个表示构造内部所有状态的结构,从给定的 SQL 表达式构造生成缓存键

>>> from sqlalchemy import select, column
>>> stmt = select(column("q"))
>>> cache_key = stmt._generate_cache_key()
>>> print(cache_key)  # somewhat paraphrased
CacheKey(key=(
  '0',
  <class 'sqlalchemy.sql.selectable.Select'>,
  '_raw_columns',
  (
    (
      '1',
      <class 'sqlalchemy.sql.elements.ColumnClause'>,
      'name',
      'q',
      'type',
      (
        <class 'sqlalchemy.sql.sqltypes.NullType'>,
      ),
    ),
  ),
  # a few more elements are here, and many more for a more
  # complicated SELECT statement
),)

上面的键存储在本质上是字典的缓存中,值是一个构造,除其他外,还存储 SQL 语句的字符串形式,在本例中为短语“SELECT q”。我们可以观察到,即使对于非常短的查询,缓存键也相当冗长,因为它必须表示可能变化的关于正在呈现和可能执行的所有内容。

相比之下,lambda 构造系统创建了一种不同类型的缓存键

>>> from sqlalchemy import lambda_stmt
>>> stmt = lambda_stmt(lambda: select(column("q")))
>>> cache_key = stmt._generate_cache_key()
>>> print(cache_key)
CacheKey(key=(
  <code object <lambda> at 0x7fed1617c710, file "<stdin>", line 1>,
  <class 'sqlalchemy.sql.lambdas.StatementLambdaElement'>,
),)

在上面,我们看到缓存键比非 lambda 语句的缓存键短得多,此外,甚至没有必要生成 select(column("q")) 构造本身;Python lambda 本身包含一个名为 __code__ 的属性,该属性引用一个 Python 代码对象,该对象在应用程序的运行时是不可变的和永久的。

当 lambda 还包括闭包变量时,在通常情况下,如果这些变量引用 SQL 构造(例如列对象),它们将成为缓存键的一部分,或者如果它们引用将成为绑定参数的字面值,则它们将被放置在缓存键的单独元素中

>>> def my_stmt(parameter):
...     col = column("q")
...     stmt = lambda_stmt(lambda: select(col))
...     stmt += lambda s: s.where(col == parameter)
...     return stmt

上面的 StatementLambdaElement 包括两个 lambda,它们都引用 col 闭包变量,因此缓存键将表示这两个段以及 column() 对象

>>> stmt = my_stmt(5)
>>> key = stmt._generate_cache_key()
>>> print(key)
CacheKey(key=(
  <code object <lambda> at 0x7f07323c50e0, file "<stdin>", line 3>,
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
  <code object <lambda> at 0x7f07323c5190, file "<stdin>", line 4>,
  <class 'sqlalchemy.sql.lambdas.LinkedLambdaElement'>,
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
),)

缓存键的第二部分检索了语句被调用时将使用的绑定参数

>>> key.bindparams
[BindParameter('%(139668884281280 parameter)s', 5, type_=Integer())]

有关“lambda”缓存和性能比较的一系列示例,请参阅 性能 性能示例中的“short_selects”测试套件。

INSERT 语句的“插入多个值”行为

2.0 版本新增: 有关更改的背景信息,包括示例性能测试,请参阅 针对 MySQL 以外的所有后端实现的优化 ORM 批量插入

提示

insertmanyvalues 功能是一个透明可用的性能功能,它不需要最终用户干预即可根据需要发生。本节介绍了该功能的架构,以及如何衡量其性能和调整其行为,以优化批量 INSERT 语句的速度,特别是 ORM 使用的批量 INSERT 语句。

随着越来越多的数据库添加了对 INSERT..RETURNING 的支持,SQLAlchemy 在处理需要获取服务器生成的值的 INSERT 语句主题方面经历了重大变化,其中最重要的是服务器生成的主键值,这些值允许在新行中引用后续操作。特别是,长期以来,这种情况一直是 ORM 中的一个重大性能问题,ORM 依赖于能够检索服务器生成的主键值,以便正确填充 标识映射

随着最近 SQLite 和 MariaDB 添加了对 RETURNING 的支持,SQLAlchemy 不再需要依赖于 cursor.lastrowid 属性(由大多数后端 DBAPI 提供,且仅限于单行);现在,RETURNING 可以用于除 MySQL 之外的所有 SQLAlchemy 包含的 后端。对于大多数后端,cursor.executemany() DBAPI 方法不允许获取行的剩余性能限制已得到解决,方法是放弃使用 executemany(),而是重组单个 INSERT 语句,使每个语句在单个语句中容纳大量行,该语句使用 cursor.execute() 调用。这种方法源于 psycopg2 DBAPI 的 psycopg2 快速执行助手 功能,SQLAlchemy 在最近的版本系列中逐步增加了对该功能的支持。

当前支持

该功能已为 SQLAlchemy 中包含的所有支持 RETURNING 的后端启用,但 Oracle 数据库除外,对于 Oracle 数据库,python-oracledb 和 cx_Oracle 驱动程序都提供了自己的等效功能。该功能通常在使用 Insert.returning() 方法的 Insert 构造并结合 executemany 执行时发生,当将字典列表传递给 Connection.execute.parameters 参数的 Connection.execute()Session.execute() 方法(以及 asyncio 下的等效方法和简写方法,如 Session.scalars())时发生。当在 ORM 工作单元 进程中使用 Session.add()Session.add_all() 等方法添加行时,也会发生这种情况。

对于 SQLAlchemy 包含的方言,当前的支持或等效支持如下

  • SQLite - 支持 SQLite 3.35 及更高版本

  • PostgreSQL - 所有受支持的 Postgresql 版本(9 及更高版本)

  • SQL Server - 所有受支持的 SQL Server 版本 [1]

  • MariaDB - 支持 MariaDB 10.5 及更高版本

  • MySQL - 不支持,不存在 RETURNING 功能

  • Oracle 数据库 - 使用本机 python-oracledb / cx_Oracle API 支持使用 executemany 的 RETURNING,适用于所有受支持的 Oracle 数据库版本 9 及更高版本,使用多行 OUT 参数。这与“executemanyvalues”的实现不同,但具有相同的使用模式和等效的性能优势。

在 2.0.10 版本中更改

禁用该功能

要为给定的 Engine 整体禁用给定后端的“insertmanyvalues”功能,请将 create_engine.use_insertmanyvalues 参数作为 False 传递给 create_engine()

engine = create_engine(
    "mariadb+mariadbconnector://scott:tiger@host/db", use_insertmanyvalues=False
)

也可以通过将 Table.implicit_returning 参数作为 False 传递来禁用特定 Table 对象隐式使用该功能

t = Table(
    "t",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("x", Integer),
    implicit_returning=False,
)

可能想要为特定表禁用 RETURNING 的原因是解决后端特定的限制。

批量模式操作

该功能有两种操作模式,它们是基于每个方言、每个 Table 透明地选择的。一种是批量模式,它通过重写 INSERT 语句来减少数据库往返次数,其形式为

INSERT INTO a (data, x, y) VALUES (%(data)s, %(x)s, %(y)s) RETURNING a.id

转换为“批量”形式,例如

INSERT INTO a (data, x, y) VALUES
    (%(data_0)s, %(x_0)s, %(y_0)s),
    (%(data_1)s, %(x_1)s, %(y_1)s),
    (%(data_2)s, %(x_2)s, %(y_2)s),
    ...
    (%(data_78)s, %(x_78)s, %(y_78)s)
RETURNING a.id

其中,上面的语句是针对输入数据的子集(“批次”)组织的,批次的大小由数据库后端以及每个批次中的参数数量决定,以对应于语句大小/参数数量的已知限制。然后,该功能为每个批次的输入数据执行一次 INSERT 语句,直到所有记录都被消耗完,并将每个批次的 RETURNING 结果连接成一个大的行集,该行集可从单个 Result 对象获得。

这种“批量”形式允许使用更少的数据库往返次数插入多行,并且已证明可以在大多数支持它的后端实现显着的性能改进。

将 RETURNING 行与参数集关联

2.0.10 版本新增。

上一节中说明的“批量”模式查询不保证返回记录的顺序与输入数据的顺序相对应。当 SQLAlchemy ORM 工作单元 进程以及将返回的服务器生成的值与输入数据关联的应用程序使用时,Insert.returning()UpdateBase.return_defaults() 方法包括一个选项 Insert.returning.sort_by_parameter_order,该选项指示“insertmanyvalues”模式应保证此对应关系。这与数据库后端实际 INSERT 记录的顺序无关,在任何情况下都假定顺序;仅假定返回的记录在接收回时应按照原始输入数据传递的顺序组织。

当存在 Insert.returning.sort_by_parameter_order 参数时,对于使用服务器生成的整数主键值(如 IDENTITY、PostgreSQL SERIAL、MariaDB AUTO_INCREMENT 或 SQLite 的 ROWID 方案)的表,“批量”模式可能会选择使用更复杂的 INSERT..RETURNING 形式,并结合基于返回值的执行后行排序,或者如果此类形式不可用,“insertmanyvalues”功能可能会优雅地降级为“非批量”模式,该模式为每个参数集运行单个 INSERT 语句。

例如,在 SQL Server 上,当自动递增 IDENTITY 列用作主键时,使用以下 SQL 形式

INSERT INTO a (data, x, y)
OUTPUT inserted.id, inserted.id AS id__1
SELECT p0, p1, p2 FROM (VALUES
    (?, ?, ?, 0), (?, ?, ?, 1), (?, ?, ?, 2),
    ...
    (?, ?, ?, 77)
) AS imp_sen(p0, p1, p2, sen_counter) ORDER BY sen_counter

当主键列使用 SERIAL 或 IDENTITY 时,PostgreSQL 也使用类似的形式。上面的形式保证行的插入顺序。但是,它确实保证 IDENTITY 或 SERIAL 值将按照每个参数集的顺序创建 [2]。“insertmanyvalues”功能然后按递增整数标识对上述 INSERT 语句的返回行进行排序。

对于 SQLite 数据库,没有合适的 INSERT 形式可以将新 ROWID 值的生成与参数集传递的顺序相关联。因此,当使用服务器生成的主键值时,当请求有序 RETURNING 时,SQLite 后端将降级为“非批量”模式。对于 MariaDB,insertmanyvalues 使用的默认 INSERT 形式就足够了,因为当使用 InnoDB [3] 时,此数据库后端会将 AUTO_INCREMENT 的顺序与输入数据的顺序对齐。

对于客户端生成的主键,例如当使用 Python uuid.uuid4() 函数为 Uuid 列生成新值时,“insertmanyvalues”功能透明地将此列包含在 RETURNING 记录中,并将其值与给定的输入记录的值相关联,从而保持输入记录和结果行之间的对应关系。由此可见,当使用客户端生成的主键值时,所有后端都允许批量、参数相关的 RETURNING 顺序。

“insertmanyvalues” “批量”模式如何确定要用作输入参数和 RETURNING 行之间对应点的列称为 插入哨兵,这是一个或多个用于跟踪此类值的特定列。“插入哨兵”通常是自动选择的,但也可能是用户配置的,用于非常特殊的情况;配置哨兵列 部分对此进行了描述。

对于不提供合适的 INSERT 形式的后端,该形式可以确定性地与输入值对齐地传递服务器生成的值,或者对于具有其他类型的服务器生成的主键值的 Table 配置,当请求保证 RETURNING 排序时,“insertmanyvalues”模式将使用非批量模式。

另请参阅

非批量模式操作

对于不具有客户端主键值,并且提供数据库无法以确定性或可排序方式相对于多个参数集调用的服务器生成的主键值(或没有主键)的 Table 配置,当“insertmanyvalues”功能被要求满足 Insert.returning.sort_by_parameter_orderInsert 语句的要求时,可能会选择使用非批量模式

在这种模式下,INSERT 的原始 SQL 形式保持不变,“insertmanyvalues”功能将改为针对每个参数集单独运行给定的语句,并将返回的行组织成一个完整的结果集。与之前的 SQLAlchemy 版本不同,它在一个紧密的循环中执行此操作,从而最大限度地减少 Python 开销。在某些情况下,例如在 SQLite 上,“非批量”模式的性能与“批量”模式完全相同。

语句执行模型

对于“批量”和“非批量”模式,该功能都必须在对 Core 级别 Connection.execute() 方法的单个调用范围内,使用 DBAPI cursor.execute() 方法调用多个 INSERT 语句,每个语句最多包含固定数量的参数集。此限制是可配置的,如下面的 控制批次大小 所述。对 cursor.execute() 的单独调用会被单独记录,并且也会单独传递给事件侦听器,例如 ConnectionEvents.before_cursor_execute()(请参阅下面的 日志记录和事件)。

配置哨兵列

在典型情况下,“insertmanyvalues”功能为了提供具有确定性行顺序的 INSERT..RETURNING,将自动从给定表的主键中确定哨兵列,如果无法识别,则优雅地降级为“一次一行”模式。作为一个完全可选的功能,为了获得服务器生成的主键的表的完整“insertmanyvalues”批量性能,其默认生成器函数与“哨兵”用例不兼容,其他非主键列可以标记为“哨兵”列,前提是它们满足某些要求。一个典型的例子是具有客户端默认值(如 Python uuid.uuid4() 函数)的非主键 Uuid 列。还有一个构造可以创建简单的整数列,其中客户端整数计数器面向“insertmanyvalues”用例。

可以通过将 Column.insert_sentinel 添加到符合条件的列来指示哨兵列。最基本的“符合条件”的列是一个非空、唯一的列,具有客户端默认值,例如 UUID 列,如下所示

import uuid

from sqlalchemy import Column
from sqlalchemy import FetchedValue
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid

my_table = Table(
    "some_table",
    metadata,
    # assume some arbitrary server-side function generates
    # primary key values, so cannot be tracked by a bulk insert
    Column("id", String(50), server_default=FetchedValue(), primary_key=True),
    Column("data", String(50)),
    Column(
        "uniqueid",
        Uuid(),
        default=uuid.uuid4,
        nullable=False,
        unique=True,
        insert_sentinel=True,
    ),
)

当使用 ORM 声明式模型时,可以使用 mapped_column 构造来使用相同的形式

import uuid

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
    pass


class MyClass(Base):
    __tablename__ = "my_table"

    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))
    uniqueid: Mapped[uuid.UUID] = mapped_column(
        default=uuid.uuid4, unique=True, insert_sentinel=True
    )

虽然默认生成器生成的值必须是唯一的,但上述“哨兵”列上的实际 UNIQUE 约束(由 unique=True 参数指示)本身是可选的,如果不需要,可以省略。

还有一种特殊的“插入哨兵”形式,它是一个专用的可为空整数列,它使用仅在“insertmanyvalues”操作期间使用的特殊默认整数计数器;作为附加行为,该列将从 SQL 语句和结果集中省略自身,并以几乎透明的方式运行。但是,它确实需要物理存在于实际数据库表中。这种样式的 Column 可以使用函数 insert_sentinel() 构造

from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid
from sqlalchemy import insert_sentinel

Table(
    "some_table",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("data", String(50)),
    insert_sentinel("sentinel"),
)

当使用 ORM 声明式时,insert_sentinel() 的声明式友好版本称为 orm_insert_sentinel(),它可以在 Base 类或 mixin 上使用;如果使用 declared_attr() 打包,则该列将应用于所有表绑定的子类,包括连接继承层次结构中

from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import orm_insert_sentinel


class Base(DeclarativeBase):
    @declared_attr
    def _sentinel(cls) -> Mapped[int]:
        return orm_insert_sentinel()


class MyClass(Base):
    __tablename__ = "my_table"

    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))


class MySubClass(MyClass):
    __tablename__ = "sub_table"

    id: Mapped[str] = mapped_column(ForeignKey("my_table.id"), primary_key=True)


class MySingleInhClass(MyClass):
    pass

在上面的示例中,“my_table”和“sub_table”都将有一个额外的整数列,名为“_sentinel”,可以由“insertmanyvalues”功能使用,以帮助优化 ORM 使用的批量插入。

控制批次大小

“insertmanyvalues” 的一个关键特性是,INSERT 语句的大小受限于固定的最大 “values” 子句数量,以及特定方言的固定总绑定参数数量,这些参数可能一次性在一个 INSERT 语句中表示。当给定的参数字典数量超过固定限制,或者在单个 INSERT 语句中呈现的总绑定参数数量超过固定限制(这两个固定限制是分开的)时,将在单个 Connection.execute() 调用的范围内调用多个 INSERT 语句,每个语句容纳一部分参数字典,这部分参数字典被称为一个 “批次”(“batch”)。每个 “批次” 中表示的参数字典数量则被称为 “批次大小”(“batch size”)。例如,批次大小为 500 意味着发出的每个 INSERT 语句最多会 INSERT 500 行。

调整批次大小可能很重要,因为对于值集本身相对较小的 INSERT 操作,较大的批次大小可能性能更高,而对于使用非常大的值集的 INSERT 操作,较小的批次大小可能更合适,因为在这种情况下,渲染的 SQL 大小以及在单个语句中传递的总数据大小都可能因后端行为和内存约束而受益于被限制在一定大小内。因此,批次大小可以在每个 Engine 以及每个语句的基础上进行配置。另一方面,参数限制是根据所用数据库的已知特性固定的。

对于大多数后端,批次大小默认为 1000,此外,每个方言的 “最大参数数量” 限制因素可能会进一步减小每个语句的批次大小。最大参数数量因方言和服务器版本而异;最大值为 32700(选择此值是为了与 PostgreSQL 的 32767 和 SQLite 的现代限制 32766 保持适当的距离,同时为语句中的其他参数以及 DBAPI 的怪异之处留出空间)。旧版本的 SQLite(3.32.0 之前)会将此值设置为 999。MariaDB 没有确定的限制,但 32700 仍然是 SQL 消息大小的限制因素。

“批次大小” 的值可以通过 Engine 范围内的 create_engine.insertmanyvalues_page_size 参数来影响。例如,要影响 INSERT 语句,使其每个语句最多包含 100 个参数集,可以这样做:

e = create_engine("sqlite://", insertmanyvalues_page_size=100)

批次大小也可以在每个语句的基础上受到 Connection.execution_options.insertmanyvalues_page_size 执行选项的影响,例如,对于每次执行:

with e.begin() as conn:
    result = conn.execute(
        table.insert().returning(table.c.id),
        parameterlist,
        execution_options={"insertmanyvalues_page_size": 100},
    )

或者在语句本身上配置:

stmt = (
    table.insert()
    .returning(table.c.id)
    .execution_options(insertmanyvalues_page_size=100)
)
with e.begin() as conn:
    result = conn.execute(stmt, parameterlist)

日志记录和事件

“insertmanyvalues” 功能与 SQLAlchemy 的 语句日志记录 以及游标事件(例如 ConnectionEvents.before_cursor_execute())完全集成。当参数列表被分成多个批次时,**每个 INSERT 语句都会被单独记录并传递给事件处理程序**。这与 SQLAlchemy 之前的 1.x 系列中仅限 psycopg2 的功能的工作方式相比是一个重大变化,在之前的版本中,多个 INSERT 语句的生成对日志记录和事件是隐藏的。日志显示将截断长参数列表以提高可读性,并且还将指示每个语句的特定批次。下面的示例说明了此日志记录的摘录:

INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[generated in 0.00177s (insertmanyvalues) 1/10 (unordered)] ('d0', 0, 0, 'd1',  ...
INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 2/10 (unordered)] ('d100', 100, 1000, 'd101', ...

...

INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 10/10 (unordered)] ('d900', 900, 9000, 'd901', ...

非批次模式 发生时,日志记录将指示这一点以及 insertmanyvalues 消息:

...

INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 67/78 (ordered; batch not supported)] ('d66', 66, 66)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 68/78 (ordered; batch not supported)] ('d67', 67, 67)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 69/78 (ordered; batch not supported)] ('d68', 68, 68)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 70/78 (ordered; batch not supported)] ('d69', 69, 69)

...

另请参阅

配置日志记录

Upsert 支持

PostgreSQL、SQLite 和 MariaDB 方言提供了特定于后端的 “upsert” 构造 insert()insert()insert(),它们都是 Insert 构造,具有额外的方法,例如 on_conflict_do_update()` ``on_duplicate_key()。当这些构造与 RETURNING 一起使用时,它们也支持 “insertmanyvalues” 行为,从而允许高效地执行带有 RETURNING 的 upsert 操作。

Engine 处置

Engine 指的是连接池,这意味着在正常情况下,当 Engine 对象仍然驻留在内存中时,会存在打开的数据库连接。当 Engine 被垃圾回收时,它的连接池不再被该 Engine 引用,并且假设其连接都没有被检出,则连接池及其连接也将被垃圾回收,这将具有关闭实际数据库连接的效果。但除此之外,Engine 将持有打开的数据库连接,假设它使用通常默认的 QueuePool 池实现。

Engine 通常旨在成为一个永久的固定装置,预先建立并在应用程序的整个生命周期中维护。它**不是**旨在在每个连接的基础上创建和处置;相反,它是一个注册表,既维护连接池,又维护有关所用数据库和 DBAPI 的配置信息,以及一定程度的每个数据库资源的内部缓存。

但是,在许多情况下,希望完全关闭 Engine 引用的所有连接资源。通常,在这种情况下,依靠 Python 垃圾回收来发生这种情况不是一个好主意;相反,可以使用 Engine.dispose() 方法显式处置 Engine。这将处置引擎的底层连接池,并将其替换为一个新的空连接池。如果在此时丢弃 Engine 并且不再使用,那么它引用的所有**检入**连接也将被完全关闭。

调用 Engine.dispose() 的有效用例包括:

  • 当程序想要释放连接池持有的任何剩余检入连接,并且期望在任何未来的操作中完全不再连接到该数据库时。

  • 当程序使用多进程或 fork(),并且 Engine 对象被复制到子进程时,应调用 Engine.dispose(),以便引擎创建特定于该 fork 的全新数据库连接。数据库连接通常**不会**跨进程边界传递。在这种情况下,请使用设置为 False 的 Engine.dispose.close 参数。有关此用例的更多背景信息,请参阅 将连接池与多进程或 os.fork() 一起使用 部分。

  • 在测试套件或多租户场景中,可能会创建和处置许多临时的、短期的 Engine 对象。

当引擎被处置或垃圾回收时,**检出**的连接**不会**被丢弃,因为这些连接仍然被应用程序在其他地方强烈引用。但是,在调用 Engine.dispose() 之后,这些连接不再与该 Engine 关联;当它们被关闭时,它们将被返回到它们现在孤立的连接池,一旦所有引用它的连接也不再在任何地方被引用,该连接池最终将被垃圾回收。由于此过程不易控制,因此强烈建议仅在所有检出的连接都已检入或以其他方式与其池解除关联后才调用 Engine.dispose()

对于受到 Engine 对象使用连接池的负面影响的应用程序,另一种选择是完全禁用连接池。这通常只会对新连接的使用产生轻微的性能影响,并且意味着当连接被检入时,它将被完全关闭,并且不会保存在内存中。请参阅 切换池实现 以获取有关如何禁用连接池的指南。

使用驱动程序 SQL 和原始 DBAPI 连接

关于使用 Connection.execute() 的介绍使用了 text() 构造,以说明如何调用文本 SQL 语句。在使用 SQLAlchemy 时,文本 SQL 实际上更多的是例外而不是常态,因为 Core 表达式语言和 ORM 都抽象掉了 SQL 的文本表示。但是,text() 构造本身也提供了一些文本 SQL 的抽象,因为它规范化了绑定参数的传递方式,并且它支持参数和结果集行的数据类型行为。

直接调用驱动程序的 SQL 字符串

对于希望直接调用传递给底层驱动程序(称为 DBAPI)的文本 SQL,而无需 text() 构造的任何干预的用例,可以使用 Connection.exec_driver_sql() 方法:

with engine.connect() as conn:
    conn.exec_driver_sql("SET param='bar'")

1.4 版本新增: 添加了 Connection.exec_driver_sql() 方法。

直接使用 DBAPI 游标

在某些情况下,SQLAlchemy 没有提供通用化的方法来访问某些 DBAPI 函数,例如调用存储过程以及处理多个结果集。在这些情况下,直接处理原始 DBAPI 连接同样方便。

访问原始 DBAPI 连接的最常见方法是从已存在的 Connection 对象直接获取。它通过 Connection.connection 属性存在:

connection = engine.connect()
dbapi_conn = connection.connection

这里的 DBAPI 连接实际上是相对于原始连接池的 “代理”,但这在大多数情况下是可以忽略的实现细节。由于此 DBAPI 连接仍然包含在拥有的 Connection 对象的范围内,因此最好使用 Connection 对象来执行大多数功能,例如事务控制以及调用 Connection.close() 方法;如果在 DBAPI 连接上直接执行这些操作,则拥有的 Connection 将不会意识到这些状态变化。

为了克服由拥有的 Connection 维护的 DBAPI 连接施加的限制,还可以使用 Engine.raw_connection() 方法从 Engine 获取 DBAPI 连接,而无需首先获取 Connection

dbapi_conn = engine.raw_connection()

此 DBAPI 连接再次是 “代理” 形式,与之前的情况相同。现在这种代理的目的显而易见,因为当我们调用此连接的 .close() 方法时,DBAPI 连接通常不会实际关闭,而是 释放 回到引擎的连接池:

dbapi_conn.close()

虽然 SQLAlchemy 未来可能会为更多的 DBAPI 用例添加内置模式,但收益递减,因为这些用例往往很少需要,并且它们也高度依赖于所使用的 DBAPI 类型,因此在任何情况下,直接 DBAPI 调用模式始终可用于那些需要它的情况。

另请参阅

当使用 Engine 时,如何访问原始 DBAPI 连接? - 包括有关如何访问 DBAPI 连接以及使用 asyncio 驱动程序时的 “驱动程序” 连接的更多详细信息。

以下是一些 DBAPI 连接使用的方法。

调用存储过程和用户定义函数

SQLAlchemy 支持多种调用存储过程和用户定义函数的方式。请注意,所有 DBAPI 都有不同的实践,因此您必须查阅底层 DBAPI 的文档,以了解与您的特定用法相关的具体信息。以下示例是假设性的,可能不适用于您的底层 DBAPI。

对于具有特殊语法或参数问题的存储过程或函数,可能会将 DBAPI 级别的 callproc 与您的 DBAPI 一起使用。此模式的一个示例是:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.callproc("my_procedure", ["x", "y", "z"])
    results = list(cursor_obj.fetchall())
    cursor_obj.close()
    connection.commit()
finally:
    connection.close()

注意

并非所有 DBAPI 都使用 callproc,总体用法细节将有所不同。上面的示例仅说明了如何使用特定的 DBAPI 函数。

您的 DBAPI 可能没有 callproc 要求,或者 可能需要使用另一种模式(例如正常的 SQLAlchemy 连接用法)来调用存储过程或用户定义函数。这种用法模式的一个示例是,在编写本文档时,使用 psycopg2 DBAPI 在 PostgreSQL 数据库中执行存储过程,应该使用正常的连接用法来调用:

connection.execute("CALL my_procedure();")

上面的示例是假设性的。底层数据库不保证在这些情况下支持 “CALL” 或 “SELECT”,并且关键字可能会因函数是存储过程还是用户定义函数而异。在这些情况下,您应该查阅底层 DBAPI 和数据库文档,以确定要使用的正确语法和模式。

多个结果集

可以使用原始 DBAPI 游标的 nextset 方法获得对多个结果集的支持:

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute("select * from table1; select * from table2")
    results_one = cursor_obj.fetchall()
    cursor_obj.nextset()
    results_two = cursor_obj.fetchall()
    cursor_obj.close()
finally:
    connection.close()

注册新方言

create_engine() 函数调用使用 setuptools 入口点来定位给定的方言。可以在 setup.py 脚本中为第三方方言建立这些入口点。例如,要创建新的方言 “foodialect://”,步骤如下:

  1. 创建一个名为 foodialect 的包。

  2. 该包应具有一个模块,其中包含方言类,该类通常是 sqlalchemy.engine.default.DefaultDialect 的子类。在本示例中,假设它被称为 FooDialect,并且可以通过 foodialect.dialect 访问其模块。

  3. 可以在 setup.cfg 中建立入口点,如下所示:

    [options.entry_points]
    sqlalchemy.dialects =
        foodialect = foodialect.dialect:FooDialect

如果该方言正在为现有 SQLAlchemy 支持的数据库之上的特定 DBAPI 提供支持,则可以提供包括数据库限定符的名称。例如,如果 FooDialect 实际上是 MySQL 方言,则可以像这样建立入口点:

[options.entry_points]
sqlalchemy.dialects
    mysql.foodialect = foodialect.dialect:FooDialect

然后,上面的入口点将作为 create_engine("mysql+foodialect://") 访问。

在进程中注册方言

SQLAlchemy 还允许在当前进程中注册方言,从而绕过单独安装的需要。使用 register() 函数,如下所示:

from sqlalchemy.dialects import registry


registry.register("mysql.foodialect", "myapp.dialect", "MyMySQLDialect")

以上操作将响应 create_engine("mysql+foodialect://"),并从 myapp.dialect 模块加载 MyMySQLDialect 类。

连接/Engine API

对象名称 描述

连接

为包装的 DB-API 连接提供高级功能。

CreateEnginePlugin

一组旨在增强基于 URL 中的入口点名称构造 Engine 对象的功能的钩子。

Engine

PoolDialect 连接在一起,以提供数据库连接和行为的来源。

ExceptionContext

封装有关正在进行的错误条件的信息。

NestedTransaction

表示 “嵌套” 或 SAVEPOINT 事务。

RootTransaction

表示 Connection 上的 “根” 事务。

Transaction

表示正在进行的数据库事务。

TwoPhaseTransaction

表示两阶段事务。

class sqlalchemy.engine.Connection

为包装的 DB-API 连接提供高级功能。

Connection 对象通过调用 Engine.connect() 方法从 Engine 对象获取,并提供用于执行 SQL 语句以及事务控制的服务。

Connection 对象**不是**线程安全的。虽然可以使用适当的同步访问在线程之间共享 Connection,但底层 DBAPI 连接仍然可能不支持线程之间的共享访问。请查看 DBAPI 文档以了解详细信息。

Connection 对象表示从连接池中检出的单个 DBAPI 连接。在此状态下,连接池对连接没有影响,包括其过期或超时状态。为了使连接池正确管理连接,应在连接不使用时将其返回到连接池(即 connection.close())。

类签名

class sqlalchemy.engine.Connection (sqlalchemy.engine.interfaces.ConnectionEventsTarget, sqlalchemy.inspection.Inspectable)

method sqlalchemy.engine.Connection.__init__(engine: Engine, connection: PoolProxiedConnection | None = None, _has_events: bool | None = None, _allow_revalidate: bool = True, _allow_autobegin: bool = True)

构造一个新的 Connection。

方法 sqlalchemy.engine.Connection.begin() RootTransaction

在自动开始事务之前,先开始一个事务。

例如:

with engine.connect() as conn:
    with conn.begin() as trans:
        conn.execute(table.insert(), {"username": "sandy"})

返回的对象是 RootTransaction 的一个实例。此对象代表事务的“作用域”,当调用 Transaction.rollback()Transaction.commit() 方法时,事务即完成;该对象也可以作为上下文管理器使用,如上所示。

Connection.begin() 方法开始一个事务,该事务通常会在首次使用连接执行语句时开始。使用此方法的原因可能是为了在特定时间调用 ConnectionEvents.begin() 事件,或者为了在连接检出的作用域内,以上下文管理的代码块来组织代码,例如:

with engine.connect() as conn:
    with conn.begin():
        conn.execute(...)
        conn.execute(...)

    with conn.begin():
        conn.execute(...)
        conn.execute(...)

上述代码在行为上与以下不使用 Connection.begin() 的代码没有根本区别;以下风格被称为“边走边提交”风格:

with engine.connect() as conn:
    conn.execute(...)
    conn.execute(...)
    conn.commit()

    conn.execute(...)
    conn.execute(...)
    conn.commit()

从数据库的角度来看,Connection.begin() 方法不会发出任何 SQL 或以任何方式更改底层 DBAPI 连接的状态;Python DBAPI 没有显式事务开始的概念。

另请参阅

使用事务和 DBAPI - 在 SQLAlchemy 统一教程

Connection.begin_nested() - 使用 SAVEPOINT

Connection.begin_twophase() - 使用两阶段/XID 事务

Engine.begin() - 可从 Engine 获取的上下文管理器

方法 sqlalchemy.engine.Connection.begin_nested() NestedTransaction

开始一个嵌套事务(即 SAVEPOINT)并返回一个事务句柄,该句柄控制 SAVEPOINT 的作用域。

例如:

with engine.begin() as connection:
    with connection.begin_nested():
        connection.execute(table.insert(), {"username": "sandy"})

返回的对象是 NestedTransaction 的一个实例,它包括事务方法 NestedTransaction.commit()NestedTransaction.rollback();对于嵌套事务,这些方法对应于操作 “RELEASE SAVEPOINT <name>” 和 “ROLLBACK TO SAVEPOINT <name>”。保存点 (savepoint) 的名称对于 NestedTransaction 对象是本地的,并且是自动生成的。与任何其他 Transaction 一样,NestedTransaction 可以用作上下文管理器,如上所示,这将根据代码块内的操作是否成功或引发异常来 “release” 或 “rollback”。

嵌套事务需要在底层数据库中支持 SAVEPOINT,否则行为未定义。SAVEPOINT 通常用于在可能失败的事务中运行操作,同时继续外部事务。例如:

from sqlalchemy import exc

with engine.begin() as connection:
    trans = connection.begin_nested()
    try:
        connection.execute(table.insert(), {"username": "sandy"})
        trans.commit()
    except exc.IntegrityError:  # catch for duplicate username
        trans.rollback()  # rollback to savepoint

    # outer transaction continues
    connection.execute(...)

如果在没有首先调用 Connection.begin()Engine.begin() 的情况下调用 Connection.begin_nested(),则 Connection 对象将首先“自动开始”外部事务。可以使用“边走边提交”风格提交此外部事务,例如:

with engine.connect() as connection:  # begin() wasn't called

    with connection.begin_nested():  # will auto-"begin()" first
        connection.execute(...)
    # savepoint is released

    connection.execute(...)

    # explicitly commit outer transaction
    connection.commit()

    # can continue working with connection here

在版本 2.0 中变更: Connection.begin_nested() 现在将参与连接的“自动开始”行为,这是 2.0 版本/ 1.4 版本中的 “future” 风格连接的新特性。

另请参阅

Connection.begin()

使用 SAVEPOINT - ORM 对 SAVEPOINT 的支持

方法 sqlalchemy.engine.Connection.begin_twophase(xid: Any | None = None) TwoPhaseTransaction

开始一个两阶段或 XA 事务并返回一个事务句柄。

返回的对象是 TwoPhaseTransaction 的一个实例,除了 Transaction 提供的方法之外,还提供了一个 TwoPhaseTransaction.prepare() 方法。

参数:

xid – 两阶段事务 ID。如果未提供,将生成一个随机 ID。

方法 sqlalchemy.engine.Connection.close() None

关闭此 Connection

这将导致释放底层数据库资源,即内部引用的 DBAPI 连接。DBAPI 连接通常会恢复到 Pool 连接池中,该连接池由生成此 ConnectionEngine 引用。DBAPI 连接上存在的任何事务状态也会通过 DBAPI 连接的 rollback() 方法无条件释放,无论任何可能与此 Connection 相关的 Transaction 对象如何。

这也会产生调用 Connection.rollback() 的效果,如果存在任何事务。

在调用 Connection.close() 后,Connection 将永久处于关闭状态,并且不允许进一步操作。

属性 sqlalchemy.engine.Connection.closed

如果此连接已关闭,则返回 True。

方法 sqlalchemy.engine.Connection.commit() None

提交当前正在进行的事务。

如果已启动事务,则此方法会提交当前事务。如果未启动事务,则该方法无效,假设连接处于非无效状态。

当首次执行语句时,或当调用 Connection.begin() 方法时,事务会在 Connection 上自动开始。

注意

Connection.commit() 方法仅作用于链接到 Connection 对象的主数据库事务。它不操作可能从 Connection.begin_nested() 方法调用的 SAVEPOINT;要控制 SAVEPOINT,请在 NestedTransaction 上调用 NestedTransaction.commit(),该对象由 Connection.begin_nested() 方法本身返回。

属性 sqlalchemy.engine.Connection.connection

此 Connection 管理的底层 DB-API 连接。

这是一个 SQLAlchemy 连接池代理连接,然后它具有属性 _ConnectionFairy.dbapi_connection,该属性引用实际的驱动程序连接。

属性 sqlalchemy.engine.Connection.default_isolation_level

与正在使用的 Dialect 关联的初始连接时隔离级别。

此值独立于 Connection.execution_options.isolation_levelEngine.execution_options.isolation_level 执行选项,并且由 Dialect 在创建第一个连接时确定,方法是在发出任何其他命令之前对数据库执行 SQL 查询以获取当前隔离级别。

调用此访问器不会调用任何新的 SQL 查询。

另请参阅

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

方法 sqlalchemy.engine.Connection.detach() None

将底层 DB-API 连接从其连接池中分离。

例如:

with engine.connect() as conn:
    conn.detach()
    conn.execute(text("SET search_path TO schema1, schema2"))

    # work with connection

# connection is fully closed (since we used "with:", can
# also call .close())

Connection 实例将保持可用。当关闭(或从上面的上下文管理器上下文中退出)时,DB-API 连接将被字面意义上地关闭,并且不会返回到其原始池。

此方法可用于将应用程序的其余部分与连接上的修改状态(例如事务隔离级别或类似状态)隔离。

方法 sqlalchemy.engine.Connection.exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

直接在 DBAPI 光标上执行字符串 SQL 语句,无需任何 SQL 编译步骤。

这可以用于将任何字符串直接传递给正在使用的 DBAPI 的 cursor.execute() 方法。

参数:
  • statement – 要执行的语句字符串。绑定参数必须使用底层 DBAPI 的 paramstyle,例如 “qmark”、“pyformat”、“format” 等。

  • parameters – 表示要在执行中使用的绑定参数值。格式为以下之一:命名参数字典、位置参数元组,或包含字典或元组的列表,以支持多重执行。

返回值:

一个 CursorResult

例如:多个字典

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    [{"id": 1, "value": "v1"}, {"id": 2, "value": "v2"}],
)

单个字典

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    dict(id=1, value="v1"),
)

单个元组

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (?, ?)", (1, "v1")
)

另请参阅

PEP 249

方法 sqlalchemy.engine.Connection.execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行 SQL 语句构造并返回 CursorResult

参数:
  • statement

    要执行的语句。这始终是一个同时位于 ClauseElementExecutable 层次结构中的对象,包括:

  • parameters – 将绑定到语句中的参数。这可以是参数名称到值的字典,也可以是字典的可变序列(例如,列表)。当传递字典列表时,底层语句执行将使用 DBAPI cursor.executemany() 方法。当传递单个字典时,将使用 DBAPI cursor.execute() 方法。

  • execution_options – 可选的执行选项字典,它将与语句执行关联。此字典可以提供 Connection.execution_options() 接受的选项的子集。

返回值:

一个 Result 对象。

方法 sqlalchemy.engine.Connection.execution_options(**opt: Any) Connection

为此连接设置非 SQL 选项,这些选项在执行期间生效。

此方法就地修改此 Connection;返回值是调用该方法的同一 Connection 对象。请注意,这与其他对象(如 Engine.execution_options()Executable.execution_options())上的 execution_options 方法的行为形成对比。其基本原理是,许多此类执行选项在任何情况下都必然会修改基本 DBAPI 连接的状态,因此没有可行的方法将此类选项的效果局限于“子”连接。

在版本 2.0 中变更: 与具有此方法的其他对象形成对比,Connection.execution_options() 方法就地修改连接,而无需创建其副本。

正如其他地方讨论的那样,Connection.execution_options() 方法接受任何任意参数,包括用户定义的名称。给定的所有参数都可以通过多种方式使用,包括使用 Connection.get_execution_options() 方法。 请参阅 Executable.execution_options()Engine.execution_options() 中的示例。

SQLAlchemy 本身当前识别的关键字包括 Executable.execution_options() 下列出的所有关键字,以及其他特定于 Connection 的关键字。

参数:
method sqlalchemy.engine.Connection.get_execution_options() _ExecuteOptions

获取将在执行期间生效的非 SQL 选项。

1.3 版本新增。

method sqlalchemy.engine.Connection.get_isolation_level() Literal['SERIALIZABLE', 'REPEATABLE READ', 'READ COMMITTED', 'READ UNCOMMITTED', 'AUTOCOMMIT']

返回此连接范围内数据库中存在的当前 实际 隔离级别。

此属性将对数据库执行实时 SQL 操作,以便获取当前隔离级别,因此返回的值是底层 DBAPI 连接上的实际级别,而与如何设置此状态无关。 这将是四种实际隔离模式之一:READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE。 它 包括 AUTOCOMMIT 隔离级别设置。 第三方方言也可能具有其他隔离级别设置。

注意

此方法 不会报告 AUTOCOMMIT 隔离级别,这是一个独立的 dbapi 设置,独立于 实际 隔离级别。 当使用 AUTOCOMMIT 时,数据库连接仍然具有有效的“传统”隔离模式,通常是 READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE 这四个值之一。

Connection.default_isolation_level 访问器进行比较,后者返回数据库在初始连接时存在的隔离级别。

另请参阅

Connection.default_isolation_level - 查看默认级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

method sqlalchemy.engine.Connection.get_nested_transaction() NestedTransaction | None

返回当前正在进行的嵌套事务(如果有)。

1.4 版本新增。

method sqlalchemy.engine.Connection.get_transaction() RootTransaction | None

返回当前正在进行的主事务(如果有)。

1.4 版本新增。

method sqlalchemy.engine.Connection.in_nested_transaction() bool

如果事务正在进行中,则返回 True。

method sqlalchemy.engine.Connection.in_transaction() bool

如果事务正在进行中,则返回 True。

attribute sqlalchemy.engine.Connection.info

与此 Connection 引用的底层 DBAPI 连接关联的信息字典,允许用户定义的数据与连接关联。

此处的数据将随 DBAPI 连接一起传递,包括在将其返回到连接池并在后续 Connection 实例中再次使用之后。

method sqlalchemy.engine.Connection.invalidate(exception: BaseException | None = None) None

使与此 Connection 关联的底层 DBAPI 连接失效。

将尝试立即关闭底层 DBAPI 连接;但是,如果此操作失败,则会记录错误,但不会引发错误。然后,无论 close() 是否成功,连接都会被丢弃。

在下次使用时(“使用”通常意味着使用 Connection.execute() 方法或类似方法),此 Connection 将尝试使用 Pool 的服务作为连接源(例如“重新连接”)来获取新的 DBAPI 连接。

如果在调用 Connection.invalidate() 方法时事务正在进行中(例如,已调用 Connection.begin() 方法),则在 DBAPI 级别,与此事务关联的所有状态都将丢失,因为 DBAPI 连接已关闭。在调用 Transaction.rollback() 方法结束 Transaction 对象之前,Connection 将不允许继续重新连接;在此之前,任何尝试继续使用 Connection 的操作都将引发 InvalidRequestError。这是为了防止应用程序在事务由于失效而丢失的情况下意外地继续进行中的事务操作。

与自动失效一样,Connection.invalidate() 方法将在连接池级别调用 PoolEvents.invalidate() 事件。

参数:

exception – 一个可选的 Exception 实例,它是失效的原因。 将传递给事件处理程序和日志记录函数。

attribute sqlalchemy.engine.Connection.invalidated

如果此连接已失效,则返回 True。

这不表示连接是否在池级别失效,但是

method sqlalchemy.engine.Connection.rollback() None

回滚当前正在进行的事务。

如果已启动事务,则此方法将回滚当前事务。 如果未启动事务,则该方法无效。 如果已启动事务且连接处于失效状态,则使用此方法清除事务。

当首次执行语句时,或当调用 Connection.begin() 方法时,事务会在 Connection 上自动开始。

注意

Connection.rollback() 方法仅作用于链接到 Connection 对象的主数据库事务。 它不操作从 Connection.begin_nested() 方法调用的 SAVEPOINT; 要控制 SAVEPOINT,请在 Connection.begin_nested() 方法本身返回的 NestedTransaction 上调用 NestedTransaction.rollback()

method sqlalchemy.engine.Connection.scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) Any

执行 SQL 语句构造并返回标量对象。

此方法是调用 Connection.execute() 方法后调用 Result.scalar() 方法的简写形式。 参数是等效的。

返回值:

表示返回的第一行第一列的标量 Python 值。

method sqlalchemy.engine.Connection.scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) ScalarResult[Any]

执行并返回标量结果集,它从每行的第一列产生标量值。

此方法等效于调用 Connection.execute() 以接收 Result 对象,然后调用 Result.scalars() 方法来生成 ScalarResult 实例。

返回值:

一个 ScalarResult

1.4.24 版本新增。

method sqlalchemy.engine.Connection.schema_for_object(obj: HasSchemaAttr) str | None

返回给定模式项的模式名称,同时考虑当前的模式转换映射。

class sqlalchemy.engine.CreateEnginePlugin

一组旨在增强基于 URL 中的入口点名称构造 Engine 对象的功能的钩子。

CreateEnginePlugin 的目的是允许第三方系统应用引擎、连接池和方言级别的事件监听器,而无需修改目标应用程序;相反,插件名称可以添加到数据库 URL 中。CreateEnginePlugin 的目标应用包括

  • 连接和 SQL 性能工具,例如,使用事件来跟踪检出次数和/或在语句上花费的时间

  • 连接性插件,例如代理

一个基本的 CreateEnginePlugin,它将记录器附加到 Engine 对象,可能如下所示

import logging

from sqlalchemy.engine import CreateEnginePlugin
from sqlalchemy import event


class LogCursorEventsPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        # consume the parameter "log_cursor_logging_name" from the
        # URL query
        logging_name = url.query.get(
            "log_cursor_logging_name", "log_cursor"
        )

        self.log = logging.getLogger(logging_name)

    def update_url(self, url):
        "update the URL to one that no longer includes our parameters"
        return url.difference_update_query(["log_cursor_logging_name"])

    def engine_created(self, engine):
        "attach an event listener after the new Engine is constructed"
        event.listen(engine, "before_cursor_execute", self._log_event)

    def _log_event(
        self,
        conn,
        cursor,
        statement,
        parameters,
        context,
        executemany,
    ):

        self.log.info("Plugin logged cursor event: %s", statement)

插件通过入口点注册,方式类似于方言

entry_points = {
    "sqlalchemy.plugins": [
        "log_cursor_plugin = myapp.plugins:LogCursorEventsPlugin"
    ]
}

使用上述名称的插件将从数据库 URL 中调用,如下所示

from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://scott:tiger@localhost/test?"
    "plugin=log_cursor_plugin&log_cursor_logging_name=mylogger"
)

plugin URL 参数支持多个实例,因此一个 URL 可以指定多个插件;它们按照 URL 中声明的顺序加载

engine = create_engine(
    "mysql+pymysql://scott:tiger@localhost/test?"
    "plugin=plugin_one&plugin=plugin_twp&plugin=plugin_three"
)

插件名称也可以直接传递给 create_engine(),使用 create_engine.plugins 参数

engine = create_engine(
    "mysql+pymysql://scott:tiger@localhost/test", plugins=["myplugin"]
)

1.2.3 版本新增: 插件名称也可以作为列表指定给 create_engine()

插件可以从 URL 对象以及 kwargs 字典(即传递给 create_engine() 调用的参数字典)中“消费”插件特定的参数。“消费”这些参数包括在插件初始化时必须移除它们,以便这些参数不会传递给 Dialect 构造函数,否则它们会引发 ArgumentError 异常,因为方言不识别这些参数。

从 SQLAlchemy 1.4 版本开始,参数应继续直接从 kwargs 字典中消费,通过使用诸如 dict.pop 之类的方法移除值。来自 URL 对象的参数应通过实现 CreateEnginePlugin.update_url() 方法来消费,该方法返回 URL 的新副本,其中插件特定的参数已被移除

class MyPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        self.my_argument_one = url.query["my_argument_one"]
        self.my_argument_two = url.query["my_argument_two"]
        self.my_argument_three = kwargs.pop("my_argument_three", None)

    def update_url(self, url):
        return url.difference_update_query(
            ["my_argument_one", "my_argument_two"]
        )

如上所示的参数将从如下的 create_engine() 调用中消费

from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://scott:tiger@localhost/test?"
    "plugin=myplugin&my_argument_one=foo&my_argument_two=bar",
    my_argument_three="bat",
)

在 1.4 版本中变更: URL 对象现在是不可变的;需要更改 URLCreateEnginePlugin 应实现新添加的 CreateEnginePlugin.update_url() 方法,该方法在插件构造后调用。

对于迁移,请按以下方式构造插件,检查 CreateEnginePlugin.update_url() 方法是否存在以检测正在运行的版本

class MyPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        if hasattr(CreateEnginePlugin, "update_url"):
            # detect the 1.4 API
            self.my_argument_one = url.query["my_argument_one"]
            self.my_argument_two = url.query["my_argument_two"]
        else:
            # detect the 1.3 and earlier API - mutate the
            # URL directly
            self.my_argument_one = url.query.pop("my_argument_one")
            self.my_argument_two = url.query.pop("my_argument_two")

        self.my_argument_three = kwargs.pop("my_argument_three", None)

    def update_url(self, url):
        # this method is only called in the 1.4 version
        return url.difference_update_query(
            ["my_argument_one", "my_argument_two"]
        )

另请参阅

URL 对象现在是不可变的 - URL 更改的概述,其中还包括关于 CreateEnginePlugin 的说明。

当引擎创建过程完成并生成 Engine 对象时,它会再次通过 CreateEnginePlugin.engine_created() 钩子传递给插件。在这个钩子中,可以对引擎进行额外的更改,最常见的是设置事件(例如 核心事件 中定义的事件)。

method sqlalchemy.engine.CreateEnginePlugin.__init__(url: URL, kwargs: Dict[str, Any])

构造一个新的 CreateEnginePlugin

对于每次调用 create_engine(),都会单独实例化插件对象。单个 Engine 将传递给与此 URL 对应的 CreateEnginePlugin.engine_created() 方法。

参数:
method sqlalchemy.engine.CreateEnginePlugin.engine_created(engine: Engine) None

接收完全构造后的 Engine 对象。

插件可以对引擎进行额外的更改,例如注册引擎或连接池事件。

method sqlalchemy.engine.CreateEnginePlugin.handle_dialect_kwargs(dialect_cls: Type[Dialect], dialect_args: Dict[str, Any]) None

解析和修改方言 kwargs

method sqlalchemy.engine.CreateEnginePlugin.handle_pool_kwargs(pool_cls: Type[Pool], pool_args: Dict[str, Any]) None

解析和修改连接池 kwargs

method sqlalchemy.engine.CreateEnginePlugin.update_url(url: URL) URL

更新 URL

应返回一个新的 URL。此方法通常用于从 URL 中消费配置参数,这些参数必须被移除,因为它们不会被方言识别。URL.difference_update_query() 方法可用于移除这些参数。有关示例,请参见 CreateEnginePlugin 的文档字符串。

1.4 版本新增。

class sqlalchemy.engine.Engine

PoolDialect 连接在一起,以提供数据库连接和行为的来源。

Engine 对象是使用 create_engine() 函数公开实例化的。

类签名

class sqlalchemy.engine.Engine (sqlalchemy.engine.interfaces.ConnectionEventsTarget, sqlalchemy.log.Identified, sqlalchemy.inspection.Inspectable)

method sqlalchemy.engine.Engine.begin() Iterator[Connection]

返回一个上下文管理器,该管理器提供一个已建立 TransactionConnection

例如:

with engine.begin() as conn:
    conn.execute(text("insert into table (x, y, z) values (1, 2, 3)"))
    conn.execute(text("my_special_procedure(5)"))

在成功操作后,Transaction 将被提交。如果引发错误,Transaction 将被回滚。

另请参阅

Engine.connect() - 从 Engine 获取 Connection

Connection.begin() - 为特定的 Connection 启动 Transaction

method sqlalchemy.engine.Engine.clear_compiled_cache() None

清除与方言关联的已编译缓存。

适用于通过 create_engine.query_cache_size 参数建立的内置缓存。它不会影响通过 Connection.execution_options.compiled_cache 参数传递的任何字典缓存。

1.4 版本新增。

method sqlalchemy.engine.Engine.connect() Connection

返回一个新的 Connection 对象。

Connection 充当 Python 上下文管理器,因此此方法的典型用法如下所示

with engine.connect() as connection:
    connection.execute(text("insert into table values ('foo')"))
    connection.commit()

在上面代码中,在代码块完成后,连接将“关闭”,并且其底层的 DBAPI 资源将返回到连接池。 这也具有回滚任何显式开始的或通过自动开始开始的事务的效果,并且如果已启动且仍在进行中,则将发出 ConnectionEvents.rollback() 事件。

另请参阅

Engine.begin()

method sqlalchemy.engine.Engine.dispose(close: bool = True) None

释放此 Engine 使用的连接池。

在旧连接池被释放后,立即创建一个新的连接池。先前的连接池可以通过两种方式释放:主动释放,即关闭该池中所有当前检入的连接;或被动释放,即丢失对其的引用,但不关闭任何连接。后一种策略更适用于 fork 的 Python 进程中的初始化程序。

参数:

close

如果保留其默认值 True,则具有完全关闭所有当前检入的数据库连接的效果。仍然检出的连接将不会被关闭,但是它们将不再与此 Engine 关联,因此当它们被单独关闭时,最终它们关联的 Pool 将被垃圾回收,并且如果尚未在检入时关闭,它们将被完全关闭。

如果设置为 False,则先前的连接池将被取消引用,并且在其他方面不会以任何方式触及。

1.4.33 版本新增: 添加了 Engine.dispose.close 参数,以允许在子进程中替换连接池,而不会干扰父进程使用的连接。

attribute sqlalchemy.engine.Engine.driver

Engine 使用的 Dialect 的驱动名称。

attribute sqlalchemy.engine.Engine.engine

返回此 Engine

用于接受同一变量中的 Connection / Engine 对象的旧方案。

method sqlalchemy.engine.Engine.execution_options(**opt: Any) OptionEngine

返回一个新的 Engine,它将为 Connection 对象提供给定的执行选项。

返回的 Engine 与原始 Engine 保持关联,因为它共享相同的连接池和其他状态

  • Engine 使用的 Pool 是相同的实例。Engine.dispose() 方法将替换父引擎以及此引擎的连接池实例。

  • 事件监听器是“级联”的 - 意味着,新的 Engine 继承父级的事件,并且新事件可以单独与新的 Engine 关联。

  • 日志配置和 logging_name 从父 Engine 复制。

Engine.execution_options() 方法的目的是实现这样的方案:多个 Engine 对象引用相同的连接池,但通过影响每个引擎的某些执行级别行为的选项来区分。一个这样的例子是分解为单独的“读取器”和“写入器”Engine 实例,其中一个 Engine 具有较低的 隔离级别 设置,或者甚至使用“autocommit”禁用事务。此配置的示例位于 为单个 Engine 维护多个隔离级别

另一个示例是使用自定义选项 shard_id,该选项由事件使用,以更改数据库连接上的当前模式

from sqlalchemy import event
from sqlalchemy.engine import Engine

primary_engine = create_engine("mysql+mysqldb://")
shard1 = primary_engine.execution_options(shard_id="shard1")
shard2 = primary_engine.execution_options(shard_id="shard2")

shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}


@event.listens_for(Engine, "before_cursor_execute")
def _switch_shard(conn, cursor, stmt, params, context, executemany):
    shard_id = conn.get_execution_options().get("shard_id", "default")
    current_shard = conn.info.get("current_shard", None)

    if current_shard != shard_id:
        cursor.execute("use %s" % shards[shard_id])
        conn.info["current_shard"] = shard_id

上面的配方说明了两个 Engine 对象,它们都将充当 Connection 对象的工厂,这些对象具有预先建立的“shard_id”执行选项。ConnectionEvents.before_cursor_execute() 事件处理程序然后解释此执行选项,以发出 MySQL use 语句以在语句执行之前切换数据库,同时跟踪我们已使用 Connection.info 字典建立的数据库。

另请参阅

Connection.execution_options() - 更新 Connection 对象上的执行选项。

Engine.update_execution_options() - 就地更新给定 Engine 的执行选项。

Engine.get_execution_options()

method sqlalchemy.engine.Engine.get_execution_options() _ExecuteOptions

获取将在执行期间生效的非 SQL 选项。

attribute sqlalchemy.engine.Engine.name

Engine 使用的 Dialect 的字符串名称。

method sqlalchemy.engine.Engine.raw_connection() PoolProxiedConnection

从连接池返回“原始” DBAPI 连接。

返回的对象是正在使用的底层驱动程序使用的 DBAPI 连接对象的代理版本。该对象将具有与真实 DBAPI 连接相同的所有行为,除了其 close() 方法将导致连接返回到池,而不是真正关闭。

当不需要 Connection 提供的 API 时,此方法为特殊情况提供直接 DBAPI 连接访问。当 Connection 对象已经存在时,可以使用 Connection.connection 访问器访问 DBAPI 连接。

method sqlalchemy.engine.Engine.update_execution_options(**opt: Any) None

更新此 Engine 的默认 execution_options 字典。

**opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options 参数发送到 create_engine()

class sqlalchemy.engine.ExceptionContext

封装有关正在进行的错误条件的信息。

此对象的存在仅仅是为了传递给 DialectEvents.handle_error() 事件,支持可以在不向后兼容的情况下扩展的接口。

attribute sqlalchemy.engine.ExceptionContext.chained_exception: BaseException | None

异常链中先前处理程序返回的异常(如果有)。

如果存在,除非后续处理程序替换它,否则此异常将是 SQLAlchemy 最终引发的异常。

可能为 None。

attribute sqlalchemy.engine.ExceptionContext.connection: Connection | None

异常期间使用的 Connection

此成员存在,除非首次连接时发生故障。

attribute sqlalchemy.engine.ExceptionContext.cursor: DBAPICursor | None

DBAPI 游标对象。

可能为 None。

attribute sqlalchemy.engine.ExceptionContext.dialect: Dialect

正在使用的 Dialect

此成员存在于事件挂钩的所有调用中。

2.0 版本新增。

attribute sqlalchemy.engine.ExceptionContext.engine: Engine | None

异常期间使用的 Engine

此成员在所有情况下都存在,除非在连接池“pre-ping”过程中处理错误时。

attribute sqlalchemy.engine.ExceptionContext.execution_context: ExecutionContext | None

与正在进行的执行操作相对应的 ExecutionContext

这存在于语句执行操作,但不适用于事务开始/结束等操作。当在可以构造 ExecutionContext 之前引发异常时,它也不存在。

请注意,ExceptionContext.statementExceptionContext.parameters 成员可能表示与 ExecutionContext 不同的值,可能是在 ConnectionEvents.before_cursor_execute() 事件或类似事件修改要发送的语句/参数的情况下。

可能为 None。

attribute sqlalchemy.engine.ExceptionContext.invalidate_pool_on_disconnect: bool

表示当“断开连接”条件生效时,是否应使池中的所有连接失效。

DialectEvents.handle_error() 事件的范围内将此标志设置为 False 将产生这样的效果:在断开连接期间,池中的所有连接集合将不会失效;实际上只会使作为错误主题的当前连接失效。

此标志的目的是用于自定义断开连接处理方案,其中池中其他连接的失效将基于其他条件执行,甚至基于每个连接执行。

attribute sqlalchemy.engine.ExceptionContext.is_disconnect: bool

表示发生的异常是否表示“断开连接”条件。

此标志在 DialectEvents.handle_error() 处理程序的范围内始终为 True 或 False。

SQLAlchemy 将遵循此标志,以确定是否应随后使连接失效。也就是说,通过分配给此标志,可以通过更改此标志来调用或阻止“断开连接”事件,然后导致连接和池失效。

注意

使用 create_engine.pool_pre_ping 参数启用的池“pre_ping”处理程序在决定“ping”是否返回 false 之前,不会 咨询此事件,而不是接收到未处理的错误。对于此用例,可以使用 基于 engine_connect() 的旧配方。未来的 API 允许更全面地自定义所有功能中的“断开连接”检测机制。

attribute sqlalchemy.engine.ExceptionContext.is_pre_ping: bool

指示此错误是否发生在 create_engine.pool_pre_ping 设置为 True 时执行的“pre-ping”步骤中。在此模式下,ExceptionContext.engine 属性将为 None。正在使用的方言可以通过 ExceptionContext.dialect 属性访问。

2.0.5 版本新增功能。

attribute sqlalchemy.engine.ExceptionContext.original_exception: BaseException

捕获的异常对象。

此成员始终存在。

attribute sqlalchemy.engine.ExceptionContext.parameters: _DBAPIAnyExecuteParams | None

直接发送到 DBAPI 的参数集合。

可能为 None。

attribute sqlalchemy.engine.ExceptionContext.sqlalchemy_exception: StatementError | None

sqlalchemy.exc.StatementError,它包装了原始异常,如果事件没有规避异常处理,则将引发该异常。

可能为 None,因为并非所有异常类型都由 SQLAlchemy 包装。对于子类化 dbapi 的 Error 类的 DBAPI 级别异常,此字段将始终存在。

attribute sqlalchemy.engine.ExceptionContext.statement: str | None

直接发送到 DBAPI 的字符串 SQL 语句。

可能为 None。

class sqlalchemy.engine.NestedTransaction

表示 “嵌套” 或 SAVEPOINT 事务。

NestedTransaction 对象是通过调用 Connection.begin_nested() 方法创建的 Connection

当使用 NestedTransaction 时,“begin”/“commit”/“rollback”的语义如下

模仿保存点方面外部事务的语义的基本原理,以便代码可以以不可知的方式处理“保存点”事务和“外部”事务。

另请参阅

使用 SAVEPOINT - SAVEPOINT API 的 ORM 版本。

method sqlalchemy.engine.NestedTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。否则,该方法返回。

这用于取消事务,而不影响封闭事务的范围。

method sqlalchemy.engine.NestedTransaction.commit() None

提交此 Transaction

此实现可能因正在使用的事务类型而异

method sqlalchemy.engine.NestedTransaction.rollback() None

inherited from the Transaction.rollback() method of Transaction

回滚此 Transaction

此实现可能因正在使用的事务类型而异

  • 对于简单的数据库事务 (例如 RootTransaction),它对应于 ROLLBACK 操作。

  • 对于 NestedTransaction,它对应于 “ROLLBACK TO SAVEPOINT” 操作。

  • 对于 TwoPhaseTransaction,可以使用用于两阶段事务的 DBAPI 特定方法。

class sqlalchemy.engine.RootTransaction

表示 Connection 上的 “根” 事务。

这对应于当前 Connection 正在发生的 “BEGIN/COMMIT/ROLLBACK” 操作。RootTransaction 是通过调用 Connection.begin() 方法创建的,并在其活动期间始终与 Connection 关联。当前正在使用的 RootTransaction 可以通过 Connection.get_transaction 方法访问 Connection

2.0 风格 的用法中,Connection 也采用了 “autobegin” 行为,当非事务状态的连接用于在 DBAPI 连接上发出命令时,将创建一个新的 RootTransaction。在 2.0 风格的用法中,RootTransaction 的作用域可以使用 Connection.commit()Connection.rollback() 方法来控制。

method sqlalchemy.engine.RootTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。否则,该方法返回。

这用于取消事务,而不影响封闭事务的范围。

method sqlalchemy.engine.RootTransaction.commit() None

提交此 Transaction

此实现可能因正在使用的事务类型而异

method sqlalchemy.engine.RootTransaction.rollback() None

inherited from the Transaction.rollback() method of Transaction

回滚此 Transaction

此实现可能因正在使用的事务类型而异

  • 对于简单的数据库事务 (例如 RootTransaction),它对应于 ROLLBACK 操作。

  • 对于 NestedTransaction,它对应于 “ROLLBACK TO SAVEPOINT” 操作。

  • 对于 TwoPhaseTransaction,可以使用用于两阶段事务的 DBAPI 特定方法。

class sqlalchemy.engine.Transaction

表示正在进行的数据库事务。

Transaction 对象是通过调用 Connection.begin() 方法获得的 Connection

from sqlalchemy import create_engine

engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
connection = engine.connect()
trans = connection.begin()
connection.execute(text("insert into x (a, b) values (1, 2)"))
trans.commit()

该对象提供了 rollback()commit() 方法,以控制事务边界。它还实现了上下文管理器接口,以便 Python with 语句可以与 Connection.begin() 方法一起使用

with connection.begin():
    connection.execute(text("insert into x (a, b) values (1, 2)"))

Transaction 对象是非线程安全的。

类签名

class sqlalchemy.engine.Transaction (sqlalchemy.engine.util.TransactionalContext)

method sqlalchemy.engine.Transaction.close() None

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。否则,该方法返回。

这用于取消事务,而不影响封闭事务的范围。

method sqlalchemy.engine.Transaction.commit() None

提交此 Transaction

此实现可能因正在使用的事务类型而异

method sqlalchemy.engine.Transaction.rollback() None

回滚此 Transaction

此实现可能因正在使用的事务类型而异

  • 对于简单的数据库事务 (例如 RootTransaction),它对应于 ROLLBACK 操作。

  • 对于 NestedTransaction,它对应于 “ROLLBACK TO SAVEPOINT” 操作。

  • 对于 TwoPhaseTransaction,可以使用用于两阶段事务的 DBAPI 特定方法。

class sqlalchemy.engine.TwoPhaseTransaction

表示两阶段事务。

可以使用 Connection.begin_twophase() 方法获取新的 TwoPhaseTransaction 对象。

该接口与 Transaction 的接口相同,但增加了 prepare() 方法。

method sqlalchemy.engine.TwoPhaseTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将 rollback()。否则,该方法返回。

这用于取消事务,而不影响封闭事务的范围。

method sqlalchemy.engine.TwoPhaseTransaction.commit() None

提交此 Transaction

此实现可能因正在使用的事务类型而异

method sqlalchemy.engine.TwoPhaseTransaction.prepare() None

准备此 TwoPhaseTransaction

在 PREPARE 之后,事务可以被提交。

method sqlalchemy.engine.TwoPhaseTransaction.rollback() None

inherited from the Transaction.rollback() method of Transaction

回滚此 Transaction

此实现可能因正在使用的事务类型而异

  • 对于简单的数据库事务 (例如 RootTransaction),它对应于 ROLLBACK 操作。

  • 对于 NestedTransaction,它对应于 “ROLLBACK TO SAVEPOINT” 操作。

  • 对于 TwoPhaseTransaction,可以使用用于两阶段事务的 DBAPI 特定方法。

结果集 API

对象名称 描述

ChunkedIteratorResult

一个 IteratorResult,它从生成迭代器的可调用对象工作。

CursorResult

一个表示来自 DBAPI 游标状态的 Result。

FilterResult

一个 Result 的包装器,它返回除 Row 对象之外的对象,例如字典或标量对象。

FrozenResult

表示一个 “冻结” 状态的 Result 对象,适用于缓存。

IteratorResult

一个 Result,它从 Row 对象或类似的类行数据的 Python 迭代器中获取数据。

MappingResult

一个 Result 的包装器,它返回字典值而不是 Row 值。

MergedResult

一个从任意数量的 Result 对象合并而来的 Result

Result

表示一组数据库结果。

Row

表示单个结果行。

RowMapping

一个 Mapping,它将列名和对象映射到 Row 值。

ScalarResult

一个 Result 的包装器,它返回标量值而不是 Row 值。

TupleResult

一个 Result,其类型被指定为返回纯 Python 元组而不是行。

class sqlalchemy.engine.ChunkedIteratorResult

一个 IteratorResult,它从生成迭代器的可调用对象工作。

给定的 chunks 参数是一个函数,该函数被赋予每次块中要返回的行数,或者 None 表示所有行。然后,该函数应返回一个未消耗的列表迭代器,每个列表都是请求的大小。

可以随时再次调用该函数,在这种情况下,它应从相同的结果集继续,但调整给定的块大小。

1.4 版本新增。

成员

yield_per()

method sqlalchemy.engine.ChunkedIteratorResult.yield_per(num: int) Self

配置行提取策略,一次提取 num 行。

这会影响结果在迭代结果对象时或以其他方式使用诸如 Result.fetchone() 之类一次返回一行的方法的基础行为。来自底层游标或其他数据源的数据将被缓冲到内存中最多这么多行,然后缓冲的集合将一次生成一行或根据需要生成多行。每次缓冲区清除时,它将被刷新为这么多行或剩余的行数(如果较少)。

Result.yield_per() 方法通常与 Connection.execution_options.stream_results 执行选项结合使用,这将允许正在使用的数据库方言利用服务器端游标,如果 DBAPI 支持与默认操作模式分开的特定 “服务器端游标” 模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,该选项将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法以一次建立固定的行缓冲区大小。

Connection.execution_options.yield_per 执行选项可用于 ORM 操作,有关 Session 面向的用法,请参阅 使用 Yield Per 获取大型结果集Connection 的 Core-only 版本是 SQLAlchemy 1.4.40 中的新功能。

1.4 版本新增。

参数:

num – 每次重新填充缓冲区时要提取的行数。如果设置为小于 1 的值,则提取下一个缓冲区的全部行。

class sqlalchemy.engine.CursorResult

一个表示来自 DBAPI 游标状态的 Result。

Changed in version 1.4: CursorResult` 类取代了之前的 ResultProxy 接口。此类基于 Result 调用 API,该 API 为 SQLAlchemy Core 和 SQLAlchemy ORM 提供了更新的用法模型和调用外观。

通过 Row 类返回数据库行,该类在 DBAPI 返回的原始数据之上提供了额外的 API 功能和行为。通过使用诸如 Result.scalars() 方法之类的过滤器,也可以返回其他类型的对象。

另请参阅

使用 SELECT 语句 - 访问 CursorResultRow 对象的入门材料。

method sqlalchemy.engine.CursorResult.all() Sequence[Row[_TP]]

inherited from the Result.all() method of Result

以序列形式返回所有行。

调用后关闭结果集。后续调用将返回一个空序列。

1.4 版本新增。

返回值:

一个 Row 对象的序列。

另请参阅

使用服务器端游标(又名流式结果) - 如何流式传输大型结果集而无需将其完全加载到 python 中。

method sqlalchemy.engine.CursorResult.close() Any

关闭此 CursorResult

这将关闭与语句执行相对应的底层 DBAPI 游标(如果仍然存在)。请注意,当 CursorResult 用尽所有可用行时,DBAPI 游标会自动释放。CursorResult.close() 通常是一个可选方法,除非在丢弃 CursorResult 时,该 CursorResult 仍有其他行等待获取。

调用此方法后,再调用获取方法将不再有效,后续使用将引发 ResourceClosedError

另请参阅

使用引擎和连接

method sqlalchemy.engine.CursorResult.columns(*col_expressions: _KeyIndexType) Self

inherited from the Result.columns() method of Result

建立每行应返回的列。

此方法可用于限制返回的列以及对它们进行重新排序。给定的表达式列表通常是一系列整数或字符串键名。它们也可能是适当的 ColumnElement 对象,它们对应于给定的语句构造。

Changed in version 2.0: 由于版本 1.4 中的一个错误,Result.columns() 方法存在不正确的行为,即当使用单个索引调用该方法时,会导致 Result 对象产生标量值,而不是 Row 对象。在版本 2.0 中,此行为已得到纠正,现在使用单个索引调用 Result.columns() 将生成一个 Result 对象,该对象继续产生 Row 对象,这些对象仅包含单个列。

例如:

statement = select(table.c.x, table.c.y, table.c.z)
result = connection.execute(statement)

for z, y in result.columns("z", "y"):
    ...

使用语句本身的列对象的示例

for z, y in result.columns(
    statement.selected_columns.c.z, statement.selected_columns.c.y
):
    ...

1.4 版本新增。

参数:

*col_expressions – 指示要返回的列。元素可以是整数行索引、字符串列名或与 select 构造相对应的适当的 ColumnElement 对象。

返回值:

Result 对象具有给定的修改。

method sqlalchemy.engine.CursorResult.fetchall() Sequence[Row[_TP]]

继承自 Result.fetchall() 方法,来自 Result

Result.all() 方法的同义词。

method sqlalchemy.engine.CursorResult.fetchmany(size: int | None = None) Sequence[Row[_TP]]

继承自 Result.fetchmany() 方法,来自 Result

提取多行。

当所有行都耗尽时,返回一个空序列。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要分组提取行,请使用 Result.partitions() 方法。

返回值:

一个 Row 对象的序列。

另请参阅

Result.partitions()

method sqlalchemy.engine.CursorResult.fetchone() Row[_TP] | None

继承自 Result.fetchone() 方法,来自 Result

提取一行。

当所有行都耗尽时,返回 None。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要仅提取结果的第一行,请使用 Result.first() 方法。要迭代所有行,请直接迭代 Result 对象。

返回值:

如果没有应用过滤器,则为 Row 对象;如果不再有行,则为 None

method sqlalchemy.engine.CursorResult.first() Row[_TP] | None

继承自 Result.first() 方法,来自 Result

提取第一行,如果没有行则返回 None

关闭结果集并丢弃剩余的行。

注意

默认情况下,此方法返回一个 row,例如元组。要准确返回一个标量值,即第一行的第一列,请使用 Result.scalar() 方法,或者组合使用 Result.scalars()Result.first()

此外,与旧版 ORM Query.first() 方法的行为相反,没有应用 limit 到调用以生成此 Result 的 SQL 查询;对于在产生行之前将结果缓冲在内存中的 DBAPI 驱动程序,所有行都将被发送到 Python 进程,除了第一行之外的所有行都将被丢弃。

返回值:

一个 Row 对象,如果没有剩余行,则为 None。

method sqlalchemy.engine.CursorResult.freeze() FrozenResult[_TP]

继承自 Result.freeze() 方法,来自 Result

返回一个可调用对象,该对象在调用时将生成此 Result 的副本。

返回的可调用对象是 FrozenResult 的一个实例。

这用于结果集缓存。该方法必须在结果未被消耗时在结果上调用,调用该方法将完全消耗结果。当从缓存中检索 FrozenResult 时,可以多次调用它,每次都会根据其存储的行集生成一个新的 Result 对象。

另请参阅

重新执行语句 - ORM 中实现结果集缓存的示例用法。

attribute sqlalchemy.engine.CursorResult.inserted_primary_key

返回刚刚插入的行的主键。

返回值是一个 Row 对象,表示主键值的命名元组,其顺序与源 Table 中配置的主键列的顺序相同。

Changed in version 1.4.8: - CursorResult.inserted_primary_key 值现在是通过 Row 类实现的命名元组,而不是普通元组。

此访问器仅适用于未显式指定 Insert.returning() 的单行 insert() 构造。对多行插入的支持(尽管大多数后端尚不可用)将使用 CursorResult.inserted_primary_key_rows 访问器进行访问。

请注意,指定了 server_default 子句或不符合“自动递增”列条件的(请参阅 Column 中的注释)并且使用数据库端默认值生成的主键列,除非后端支持“returning”并且插入语句执行时启用了“隐式 returning”,否则在此列表中将显示为 None

如果执行的语句不是编译的表达式构造或不是 insert() 构造,则引发 InvalidRequestError

attribute sqlalchemy.engine.CursorResult.inserted_primary_key_rows

CursorResult.inserted_primary_key 的值作为列表中包含的行返回;某些方言也可能支持多行形式。

注意

如下所示,在当前的 SQLAlchemy 版本中,当使用 psycopg2 方言时,此访问器仅在 CursorResult.inserted_primary_key 已经提供的内容之外才有用。未来的版本希望将此功能推广到更多方言。

添加此访问器是为了支持提供 Psycopg2 快速执行助手 功能的方言,目前仅限 psycopg2 方言,它允许一次 INSERT 多行,同时仍然保留能够返回服务器生成的主键值的行为。

  • 当使用 psycopg2 方言,或其他可能在即将发布的版本中支持“快速 executemany”样式插入的方言时:当调用 INSERT 语句,同时将行列表作为第二个参数传递给 Connection.execute() 时,此访问器将提供行列表,其中每行包含为 INSERT 的每一行的主键值。

  • 当使用所有其他尚不支持此功能的方言/后端时:此访问器仅对 单行 INSERT 语句 有用,并返回与 CursorResult.inserted_primary_key 相同的信息,但包含在单元素列表中。当 INSERT 语句与要 INSERT 的行列表一起执行时,该列表将包含每个插入语句中的行的行,但是对于任何服务器生成的值,它将包含 None

SQLAlchemy 的未来版本将进一步推广 psycopg2 的“快速执行助手”功能,以适应其他方言,从而使此访问器具有更广泛的用途。

1.4 版本新增。

attribute sqlalchemy.engine.CursorResult.is_insert

如果此 CursorResult 是执行表达式语言编译的 insert() 构造的结果,则为 True。

当为 True 时,这意味着 inserted_primary_key 属性是可访问的,前提是语句不包含用户定义的“returning”构造。

method sqlalchemy.engine.CursorResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法,来自 sqlalchemy.engine._WithKeys

返回一个可迭代的视图,该视图产生每个 Row 将表示的字符串键。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

该视图还可以使用 Python in 运算符测试键的包含性,这将测试视图中表示的字符串键,以及备用键(例如列对象)。

Changed in version 1.4: 返回键视图对象而不是普通列表。

method sqlalchemy.engine.CursorResult.last_inserted_params()

返回来自此执行的插入参数的集合。

如果执行的语句不是编译的表达式构造或不是 insert() 构造,则引发 InvalidRequestError

method sqlalchemy.engine.CursorResult.last_updated_params()

返回来自此执行的更新参数的集合。

如果执行的语句不是编译的表达式构造或不是 update() 构造,则引发 InvalidRequestError

method sqlalchemy.engine.CursorResult.lastrow_has_defaults()

从底层 ExecutionContext 返回 lastrow_has_defaults()

有关详细信息,请参阅 ExecutionContext

attribute sqlalchemy.engine.CursorResult.lastrowid

返回 DBAPI 游标上的 ‘lastrowid’ 访问器。

这是一个 DBAPI 特定的方法,仅适用于支持它的后端,对于适用的语句才有效。它在不同后端之间的行为不一致。

当使用 insert() 表达式构造时,通常不需要使用此方法;CursorResult.inserted_primary_key 属性为新插入的行提供主键值的元组,而与数据库后端无关。

method sqlalchemy.engine.CursorResult.mappings() MappingResult

继承自 Result.mappings() 方法,来自 Result

将 mappings 过滤器应用于返回的行,返回 MappingResult 的实例。

当应用此过滤器时,提取行将返回 RowMapping 对象,而不是 Row 对象。

1.4 版本新增。

返回值:

一个新的 MappingResult 过滤对象,引用此 Result 对象。

method sqlalchemy.engine.CursorResult.merge(*others: Result[Any]) MergedResult[Any]

将此 Result 与其他兼容的结果对象合并。

返回的对象是 MergedResult 的一个实例,它将由给定结果对象的迭代器组成。

新结果将使用此结果对象的元数据。后续结果对象必须针对相同的结果/游标元数据集,否则行为未定义。

method sqlalchemy.engine.CursorResult.one() Row[_TP]

继承自 Result.one() 方法,来自 Result

返回恰好一行,否则引发异常。

如果结果未返回任何行,则引发 NoResultFound;如果将返回多行,则引发 MultipleResultsFound

注意

默认情况下,此方法返回一个 row,例如元组。要准确返回一个标量值,即第一行的第一列,请使用 Result.scalar_one() 方法,或者组合使用 Result.scalars()Result.one()

1.4 版本新增。

返回值:

第一个 Row

Raises:

MultipleResultsFound, NoResultFound

方法 sqlalchemy.engine.CursorResult.one_or_none() Row[_TP] | None

继承自 Result.one_or_none() 方法,源自 Result

最多返回一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound 异常。

1.4 版本新增。

返回值:

第一个 Row,如果没有行可用则返回 None

Raises:

MultipleResultsFound

方法 sqlalchemy.engine.CursorResult.partitions(size: int | None = None) Iterator[Sequence[Row[_TP]]]

继承自 Result.partitions() 方法,源自 Result

迭代处理给定大小的行子列表。

每个列表的大小将为给定大小,除了要产生的最后一个列表,它可能包含少量行。不会产生空列表。

当迭代器完全消耗时,结果对象将自动关闭。

请注意,后端驱动程序通常会提前缓冲整个结果,除非使用了 Connection.execution_options.stream_results 执行选项,表明驱动程序应尽可能不预先缓冲结果。并非所有驱动程序都支持此选项,对于不支持的驱动程序,此选项将被静默忽略。

当使用 ORM 时,Result.partitions() 方法通常从内存角度来看更有效,当它与 yield_per 执行选项 结合使用时,该选项指示 DBAPI 驱动程序使用服务器端游标(如果可用),并指示 ORM 加载内部机制一次仅从结果构建一定数量的 ORM 对象,然后再将其产生出来。

1.4 版本新增。

参数:

size – 指示每个产生的列表中存在的最大行数。如果为 None,则使用 Result.yield_per() 方法(如果已调用)或 Connection.execution_options.yield_per 执行选项设置的值,在这方面它们是等效的。如果未设置 yield_per,则使用 Result.fetchmany() 默认值,这可能是后端特定的且定义不明确。

返回值:

列表的迭代器

方法 sqlalchemy.engine.CursorResult.postfetch_cols()

从底层 ExecutionContext 返回 postfetch_cols()

有关详细信息,请参阅 ExecutionContext

如果执行的语句不是编译后的表达式结构,或者不是 insert() 或 update() 结构,则引发 InvalidRequestError 异常。

方法 sqlalchemy.engine.CursorResult.prefetch_cols()

从底层 ExecutionContext 返回 prefetch_cols()

有关详细信息,请参阅 ExecutionContext

如果执行的语句不是编译后的表达式结构,或者不是 insert() 或 update() 结构,则引发 InvalidRequestError 异常。

属性 sqlalchemy.engine.CursorResult.returned_defaults

返回使用 ValuesBase.return_defaults() 功能获取的默认列的值。

该值是 Row 的实例,如果未使用 ValuesBase.return_defaults() 或者后端不支持 RETURNING,则为 None

另请参阅

ValuesBase.return_defaults()

属性 sqlalchemy.engine.CursorResult.returned_defaults_rows

返回一个行列表,其中每行包含使用 ValuesBase.return_defaults() 功能获取的默认列的值。

返回值是一个 Row 对象列表。

1.4 版本新增。

属性 sqlalchemy.engine.CursorResult.returns_rows

如果此 CursorResult 返回零行或多行,则为 True。

即,如果可以合法调用方法 CursorResult.fetchone()CursorResult.fetchmany()CursorResult.fetchall()

总的来说,CursorResult.returns_rows 的值应始终与 DBAPI 游标是否具有 .description 属性同义,指示结果列的存在,并注意到,如果发出了返回行的语句,则即使返回零行的游标仍然具有 .description

对于针对 SELECT 语句的所有结果以及使用 RETURNING 的 DML 语句 INSERT/UPDATE/DELETE,此属性应为 True。对于未使用 RETURNING 的 INSERT/UPDATE/DELETE 语句,该值通常为 False,但是也存在一些方言特定的例外情况,例如当使用 MSSQL / pyodbc 方言时,会内联发出 SELECT 以检索插入的主键值。

属性 sqlalchemy.engine.CursorResult.rowcount

返回此结果的 ‘rowcount’。

‘rowcount’ 的主要目的是报告由 UPDATE 或 DELETE 语句的 WHERE 条件匹配的行数(即,对于单个参数集执行一次),然后可以将其与预期更新或删除的行数进行比较,以此来断言数据完整性。

此属性从 DBAPI 的 cursor.rowcount 属性传输而来,在游标关闭之前,以支持在游标关闭后不提供此值的 DBAPI。某些 DBAPI 可能会为其他类型的语句提供有意义的值,例如 INSERT 和 SELECT 语句。为了检索这些语句的 cursor.rowcount,请将 Connection.execution_options.preserve_rowcount 执行选项设置为 True,这将导致 cursor.rowcount 值在返回任何结果或游标关闭之前被无条件地记忆化,而与语句类型无关。

对于 DBAPI 不支持特定类型语句和/或执行的行数的情况,返回的值将为 -1,该值直接从 DBAPI 传递,并且是 PEP 249 的一部分。但是,所有 DBAPI 都应支持单参数集 UPDATE 和 DELETE 语句的行数。

注意

关于 CursorResult.rowcount 的说明

  • 此属性返回匹配的行数,这不一定与实际修改的行数相同。例如,如果给定的 SET 值与行中已存在的值相同,则 UPDATE 语句可能对给定行没有净更改。这样的行将被匹配但不会被修改。在同时具有这两种样式的后端(例如 MySQL)上,rowcount 被配置为在所有情况下都返回匹配计数。

  • 在默认情况下,CursorResult.rowcount 在与 UPDATE 或 DELETE 语句结合使用时,并且仅在具有单个参数集时才有用。对于其他类型的语句,除非使用 Connection.execution_options.preserve_rowcount 执行选项,否则 SQLAlchemy 不会尝试预先记忆化该值。请注意,与 PEP 249 相反,许多 DBAPI 不支持非 UPDATE 或 DELETE 语句的行数值,尤其是在返回的行未完全预缓冲时。对于特定类型的语句不支持行数的 DBAPI,对于此类语句应返回 -1 值。

  • 当使用多个参数集执行单个语句时(即 executemany),CursorResult.rowcount 可能没有意义。大多数 DBAPI 不会对多个参数集中的 “rowcount” 值求和,并且在访问时将返回 -1

  • Connection.execution_options.preserve_rowcount 执行选项设置为 True 时,SQLAlchemy 的 INSERT 语句的 “插入多个值” 行为 功能确实支持正确填充 CursorResult.rowcount

  • 使用 RETURNING 的语句可能不支持行数,而是返回 -1 值。

方法 sqlalchemy.engine.CursorResult.scalar() Any

继承自 Result.scalar() 方法,源自 Result

获取第一行的第一列,并关闭结果集。

如果没有要获取的行,则返回 None

不执行验证来测试是否还有其他行。

调用此方法后,对象将完全关闭,例如,CursorResult.close() 方法将被调用。

返回值:

一个 Python 标量值,如果没有剩余行,则返回 None

方法 sqlalchemy.engine.CursorResult.scalar_one() Any

继承自 Result.scalar_one() 方法,源自 Result

返回恰好一个标量结果,否则引发异常。

这等效于调用 Result.scalars(),然后调用 ScalarResult.one()

方法 sqlalchemy.engine.CursorResult.scalar_one_or_none() Any | None

继承自 Result.scalar_one_or_none() 方法,源自 Result

返回恰好一个标量结果,或者 None

这等效于调用 Result.scalars(),然后调用 ScalarResult.one_or_none()

方法 sqlalchemy.engine.CursorResult.scalars(index: _KeyIndexType = 0) ScalarResult[Any]

继承自 Result.scalars() 方法,源自 Result

返回一个 ScalarResult 过滤对象,它将返回单个元素,而不是 Row 对象。

例如:

>>> result = conn.execute(text("select int_id from table"))
>>> result.scalars().all()
[1, 2, 3]

当从 ScalarResult 过滤对象中获取结果时,Result 将返回的单列行将作为列的值返回。

1.4 版本新增。

参数:

index – 整数或行键,指示要从每行中获取的列,默认为 0,表示第一列。

返回值:

一个新的 ScalarResult 过滤对象,指向此 Result 对象。

方法 sqlalchemy.engine.CursorResult.splice_horizontally(other)

返回一个新的 CursorResult,它将此 CursorResult 的行与另一个 CursorResult 的行“水平拼接”在一起。

提示

此方法是为了 SQLAlchemy ORM 的利益而设计的,不适用于一般用途。

“水平拼接” 意味着对于第一个和第二个结果集中的每一行,都会生成一个将两行连接在一起的新行,然后该新行将成为新的行。传入的 CursorResult 必须具有相同数量的行。通常期望这两个结果集也来自相同的排序顺序,因为结果行是基于它们在结果中的位置拼接在一起的。

此处的预期用例是,针对不同表的多个 INSERT..RETURNING 语句(肯定需要排序)可以生成一个看起来像这两个表 JOIN 的单个结果。

例如:

r1 = connection.execute(
    users.insert().returning(
        users.c.user_name, users.c.user_id, sort_by_parameter_order=True
    ),
    user_values,
)

r2 = connection.execute(
    addresses.insert().returning(
        addresses.c.address_id,
        addresses.c.address,
        addresses.c.user_id,
        sort_by_parameter_order=True,
    ),
    address_values,
)

rows = r1.splice_horizontally(r2).all()
assert rows == [
    ("john", 1, 1, "foo@bar.com", 1),
    ("jack", 2, 2, "bar@bat.com", 2),
]

2.0 版本新增。

方法 sqlalchemy.engine.CursorResult.splice_vertically(other)

返回一个新的 CursorResult,它将此 CursorResult 的行与另一个 CursorResult 的行“垂直拼接”,即“扩展”在一起。

提示

此方法是为了 SQLAlchemy ORM 的利益而设计的,不适用于一般用途。

“垂直拼接” 意味着给定结果的行将附加到此游标结果的行之后。传入的 CursorResult 必须具有表示与此 CursorResult 中相同的列列表(顺序相同)的行。

2.0 版本新增。

方法 sqlalchemy.engine.CursorResult.supports_sane_multi_rowcount()

从方言返回 supports_sane_multi_rowcount

有关背景信息,请参阅 CursorResult.rowcount

方法 sqlalchemy.engine.CursorResult.supports_sane_rowcount()

从方言返回 supports_sane_rowcount

有关背景信息,请参阅 CursorResult.rowcount

属性 sqlalchemy.engine.CursorResult.t

继承自 Result.t 属性,源自 Result

将 “类型化元组” 类型过滤器应用于返回的行。

Result.t 属性是调用 Result.tuples() 方法的同义词。

2.0 版本新增。

方法 sqlalchemy.engine.CursorResult.tuples() TupleResult[_TP]

继承自 Result.tuples() 方法,源自 Result

将 “类型化元组” 类型过滤器应用于返回的行。

此方法在运行时返回相同的 Result 对象,但在类型提示中注解为返回 TupleResult 对象,这将向 PEP 484 类型工具表明返回的是纯类型化 Tuple 实例,而不是行。这允许对 Row 对象进行元组解包和 __getitem__ 访问以进行类型化,适用于调用语句本身包含类型信息的情况。

2.0 版本新增。

返回值:

类型提示时的 TupleResult 类型。

另请参阅

Result.t - 更短的同义词

Row._t - Row 版本

方法 sqlalchemy.engine.CursorResult.unique(strategy: Callable[[Any], Any] | None = None) Self

继承自 Result.unique() 方法,源自 Result

对此 Result 返回的对象应用唯一性过滤。

当应用此过滤器且不带任何参数时,返回的行或对象将被过滤,以便唯一地返回每一行。用于确定此唯一性的算法默认是整个元组的 Python 哈希标识。在某些情况下,可以使用专门的按实体哈希方案,例如当使用 ORM 时,应用一种针对返回对象的主键标识的方案。

唯一过滤器在所有其他过滤器之后应用,这意味着如果已使用诸如 Result.columns()Result.scalars() 方法等方法优化了返回的列,则唯一化将应用于仅返回的列。 无论在 Result 对象上调用这些方法的顺序如何,都会发生这种情况。

唯一过滤器还会更改用于诸如 Result.fetchmany()Result.partitions() 等方法的计算规则。 当使用 Result.unique() 时,这些方法将在应用唯一化后继续产生请求的行数或对象数。 但是,这必然会影响底层游标或数据源的缓冲行为,因此可能需要多次调用底层的 cursor.fetchmany() 才能累积足够的对象,以便提供请求大小的唯一集合。

参数:

strategy – 一个可调用对象,将应用于正在迭代的行或对象,它应返回一个对象,该对象表示行的唯一值。 Python 的 set() 用于存储这些标识。 如果未传递,则使用默认的唯一性策略,该策略可能已由此 Result 对象的来源组装。

method sqlalchemy.engine.CursorResult.yield_per(num: int) Self

配置行提取策略,一次提取 num 行。

这会影响结果在迭代结果对象时或以其他方式使用诸如 Result.fetchone() 之类一次返回一行的方法的基础行为。来自底层游标或其他数据源的数据将被缓冲到内存中最多这么多行,然后缓冲的集合将一次生成一行或根据需要生成多行。每次缓冲区清除时,它将被刷新为这么多行或剩余的行数(如果较少)。

Result.yield_per() 方法通常与 Connection.execution_options.stream_results 执行选项结合使用,这将允许正在使用的数据库方言利用服务器端游标,如果 DBAPI 支持与默认操作模式分开的特定 “服务器端游标” 模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,该选项将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法以一次建立固定的行缓冲区大小。

Connection.execution_options.yield_per 执行选项可用于 ORM 操作,有关 Session 面向的用法,请参阅 使用 Yield Per 获取大型结果集Connection 的 Core-only 版本是 SQLAlchemy 1.4.40 中的新功能。

1.4 版本新增。

参数:

num – 每次重新填充缓冲区时要获取的行数。 如果设置为小于 1 的值,则获取下一个缓冲区的所有行。

class sqlalchemy.engine.FilterResult

一个 Result 的包装器,它返回除 Row 对象之外的对象,例如字典或标量对象。

FilterResult 是附加结果 API(包括 MappingResultScalarResultAsyncResult)的通用基础。

类签名

class sqlalchemy.engine.FilterResult (sqlalchemy.engine.ResultInternal)

method sqlalchemy.engine.FilterResult.close() None

关闭此 FilterResult

1.4.43 版本新增。

attribute sqlalchemy.engine.FilterResult.closed

如果底层 Result 报告已关闭,则返回 True

1.4.43 版本新增。

method sqlalchemy.engine.FilterResult.yield_per(num: int) Self

配置行提取策略,一次提取 num 行。

FilterResult.yield_per() 方法是传递给 Result.yield_per() 方法的通道。 有关使用说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.engine.FrozenResult

表示一个 “冻结” 状态的 Result 对象,适用于缓存。

FrozenResult 对象从任何 Result 对象的 Result.freeze() 方法返回。

每次将 FrozenResult 作为可调用对象调用时,都会从固定的数据集生成一个新的可迭代 Result 对象

result = connection.execute(query)

frozen = result.freeze()

unfrozen_result_one = frozen()

for row in unfrozen_result_one:
    print(row)

unfrozen_result_two = frozen()
rows = unfrozen_result_two.all()

# ... etc

1.4 版本新增。

另请参阅

重新执行语句 - ORM 中实现结果集缓存的示例用法。

merge_frozen_result() - ORM 函数,用于将冻结结果合并回 Session

类签名

class sqlalchemy.engine.FrozenResult (typing.Generic)

class sqlalchemy.engine.IteratorResult

一个 Result,它从 Row 对象或类似的类行数据的 Python 迭代器中获取数据。

1.4 版本新增。

成员

closed

attribute sqlalchemy.engine.IteratorResult.closed

如果此 IteratorResult 已关闭,则返回 True

1.4.43 版本新增。

class sqlalchemy.engine.MergedResult

一个从任意数量的 Result 对象合并而来的 Result

Result.merge() 方法返回。

1.4 版本新增。

class sqlalchemy.engine.Result

表示一组数据库结果。

1.4 版本新增: Result 对象为 SQLAlchemy Core 和 SQLAlchemy ORM 提供了完全更新的使用模型和调用外观。 在 Core 中,它构成了 CursorResult 对象的基础,该对象取代了之前的 ResultProxy 接口。 使用 ORM 时,通常使用称为 ChunkedIteratorResult 的更高级别的对象。

注意

在 SQLAlchemy 1.4 及更高版本中,此对象用于 Session.execute() 返回的 ORM 结果,它可以单独或在类似元组的行中产生 ORM 映射对象的实例。 请注意,Result 对象不会像传统的 Query 对象那样自动去重实例或行。 对于 Python 内的实例或行去重,请使用 Result.unique() 修饰符方法。

另请参阅

获取行 - 在 SQLAlchemy 统一教程

类签名

class sqlalchemy.engine.Result (sqlalchemy.engine._WithKeys, sqlalchemy.engine.ResultInternal)

method sqlalchemy.engine.Result.all() Sequence[Row[_TP]]

以序列形式返回所有行。

调用后关闭结果集。后续调用将返回一个空序列。

1.4 版本新增。

返回值:

一个 Row 对象的序列。

另请参阅

使用服务器端游标(又名流式结果) - 如何流式传输大型结果集而无需将其完全加载到 python 中。

method sqlalchemy.engine.Result.close() None

关闭此 Result

此方法的行为是实现特定的,默认情况下未实现。 该方法通常应结束结果对象正在使用的资源,并导致任何后续迭代或行获取引发 ResourceClosedError

1.4.27 版本新增: - .close() 以前通常不适用于所有 Result 类,而仅在 Core 语句执行返回的 CursorResult 上可用。 由于大多数其他结果对象(即 ORM 使用的对象)在任何情况下都代理 CursorResult,因此,当 ORM 查询使用 yield_per 执行选项(其中它不会立即耗尽并自动关闭数据库游标)时,这允许从外部接口关闭底层游标结果。

attribute sqlalchemy.engine.Result.closed

如果此 Result 报告 .closed,则返回 True

1.4.43 版本新增。

method sqlalchemy.engine.Result.columns(*col_expressions: _KeyIndexType) Self

建立每行应返回的列。

此方法可用于限制返回的列以及对它们进行重新排序。给定的表达式列表通常是一系列整数或字符串键名。它们也可能是适当的 ColumnElement 对象,它们对应于给定的语句构造。

Changed in version 2.0: 由于版本 1.4 中的一个错误,Result.columns() 方法存在不正确的行为,即当使用单个索引调用该方法时,会导致 Result 对象产生标量值,而不是 Row 对象。在版本 2.0 中,此行为已得到纠正,现在使用单个索引调用 Result.columns() 将生成一个 Result 对象,该对象继续产生 Row 对象,这些对象仅包含单个列。

例如:

statement = select(table.c.x, table.c.y, table.c.z)
result = connection.execute(statement)

for z, y in result.columns("z", "y"):
    ...

使用语句本身的列对象的示例

for z, y in result.columns(
    statement.selected_columns.c.z, statement.selected_columns.c.y
):
    ...

1.4 版本新增。

参数:

*col_expressions – 指示要返回的列。 元素可以是整数行索引、字符串列名或与 select 构造相对应的适当的 ColumnElement 对象。

返回值:

Result 对象具有给定的修改。

method sqlalchemy.engine.Result.fetchall() Sequence[Row[_TP]]

Result.all() 方法的同义词。

method sqlalchemy.engine.Result.fetchmany(size: int | None = None) Sequence[Row[_TP]]

提取多行。

当所有行都耗尽时,返回一个空序列。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要分组提取行,请使用 Result.partitions() 方法。

返回值:

一个 Row 对象的序列。

另请参阅

Result.partitions()

method sqlalchemy.engine.Result.fetchone() Row[_TP] | None

提取一行。

当所有行都耗尽时,返回 None。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要仅提取结果的第一行,请使用 Result.first() 方法。要迭代所有行,请直接迭代 Result 对象。

返回值:

如果没有应用过滤器,则为 Row 对象;如果不再有行,则为 None

method sqlalchemy.engine.Result.first() Row[_TP] | None

提取第一行,如果没有行则返回 None

关闭结果集并丢弃剩余的行。

注意

默认情况下,此方法返回一个 row,例如元组。要准确返回一个标量值,即第一行的第一列,请使用 Result.scalar() 方法,或者组合使用 Result.scalars()Result.first()

此外,与旧版 ORM Query.first() 方法的行为相反,没有应用 limit 到调用以生成此 Result 的 SQL 查询;对于在产生行之前将结果缓冲在内存中的 DBAPI 驱动程序,所有行都将被发送到 Python 进程,除了第一行之外的所有行都将被丢弃。

返回值:

一个 Row 对象,如果没有剩余行,则为 None。

method sqlalchemy.engine.Result.freeze() FrozenResult[_TP]

返回一个可调用对象,该对象在调用时将生成此 Result 的副本。

返回的可调用对象是 FrozenResult 的一个实例。

这用于结果集缓存。该方法必须在结果未被消耗时在结果上调用,调用该方法将完全消耗结果。当从缓存中检索 FrozenResult 时,可以多次调用它,每次都会根据其存储的行集生成一个新的 Result 对象。

另请参阅

重新执行语句 - ORM 中实现结果集缓存的示例用法。

method sqlalchemy.engine.Result.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法,来自 sqlalchemy.engine._WithKeys

返回一个可迭代的视图,该视图产生每个 Row 将表示的字符串键。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

该视图还可以使用 Python in 运算符测试键的包含性,这将测试视图中表示的字符串键,以及备用键(例如列对象)。

Changed in version 1.4: 返回键视图对象而不是普通列表。

method sqlalchemy.engine.Result.mappings() MappingResult

将 mappings 过滤器应用于返回的行,返回 MappingResult 的实例。

当应用此过滤器时,提取行将返回 RowMapping 对象,而不是 Row 对象。

1.4 版本新增。

返回值:

一个新的 MappingResult 过滤对象,引用此 Result 对象。

method sqlalchemy.engine.Result.merge(*others: Result[Any]) MergedResult[_TP]

将此 Result 与其他兼容的结果对象合并。

返回的对象是 MergedResult 的一个实例,它将由给定结果对象的迭代器组成。

新结果将使用此结果对象的元数据。后续结果对象必须针对相同的结果/游标元数据集,否则行为未定义。

method sqlalchemy.engine.Result.one() Row[_TP]

返回恰好一行,否则引发异常。

如果结果未返回任何行,则引发 NoResultFound;如果将返回多行,则引发 MultipleResultsFound

注意

默认情况下,此方法返回一个 row,例如元组。要准确返回一个标量值,即第一行的第一列,请使用 Result.scalar_one() 方法,或者组合使用 Result.scalars()Result.one()

1.4 版本新增。

返回值:

第一个 Row

Raises:

MultipleResultsFound, NoResultFound

method sqlalchemy.engine.Result.one_or_none() Row[_TP] | None

最多返回一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound 异常。

1.4 版本新增。

返回值:

第一个 Row,如果没有行可用则返回 None

Raises:

MultipleResultsFound

method sqlalchemy.engine.Result.partitions(size: int | None = None) Iterator[Sequence[Row[_TP]]]

迭代处理给定大小的行子列表。

每个列表的大小将为给定大小,除了要产生的最后一个列表,它可能包含少量行。不会产生空列表。

当迭代器完全消耗时,结果对象将自动关闭。

请注意,后端驱动程序通常会提前缓冲整个结果,除非使用了 Connection.execution_options.stream_results 执行选项,表明驱动程序应尽可能不预先缓冲结果。并非所有驱动程序都支持此选项,对于不支持的驱动程序,此选项将被静默忽略。

当使用 ORM 时,Result.partitions() 方法通常从内存角度来看更有效,当它与 yield_per 执行选项 结合使用时,该选项指示 DBAPI 驱动程序使用服务器端游标(如果可用),并指示 ORM 加载内部机制一次仅从结果构建一定数量的 ORM 对象,然后再将其产生出来。

1.4 版本新增。

参数:

size – 指示每个产生的列表中存在的最大行数。 如果为 None,则使用 Result.yield_per() 方法(如果已调用)或 Connection.execution_options.yield_per 执行选项设置的值,在这方面它是等效的。 如果未设置 yield_per,则使用 Result.fetchmany() 默认值,该默认值可能是后端特定的且未明确定义的。

返回值:

列表的迭代器

method sqlalchemy.engine.Result.scalar() Any

获取第一行的第一列,并关闭结果集。

如果没有要获取的行,则返回 None

不执行验证来测试是否还有其他行。

调用此方法后,对象将完全关闭,例如,CursorResult.close() 方法将被调用。

返回值:

一个 Python 标量值,如果没有剩余行,则返回 None

method sqlalchemy.engine.Result.scalar_one() Any

返回恰好一个标量结果,否则引发异常。

这等效于调用 Result.scalars(),然后调用 ScalarResult.one()

method sqlalchemy.engine.Result.scalar_one_or_none() Any | None

返回恰好一个标量结果,或者 None

这等效于调用 Result.scalars(),然后调用 ScalarResult.one_or_none()

method sqlalchemy.engine.Result.scalars(index: _KeyIndexType = 0) ScalarResult[Any]

返回一个 ScalarResult 过滤对象,它将返回单个元素,而不是 Row 对象。

例如:

>>> result = conn.execute(text("select int_id from table"))
>>> result.scalars().all()
[1, 2, 3]

当从 ScalarResult 过滤对象中获取结果时,Result 将返回的单列行将作为列的值返回。

1.4 版本新增。

参数:

index – 整数或行键,指示要从每行获取的列,默认为 0,表示第一列。

返回值:

一个新的 ScalarResult 过滤对象,指向此 Result 对象。

attribute sqlalchemy.engine.Result.t

将 “类型化元组” 类型过滤器应用于返回的行。

Result.t 属性是调用 Result.tuples() 方法的同义词。

2.0 版本新增。

method sqlalchemy.engine.Result.tuples() TupleResult[_TP]

将 “类型化元组” 类型过滤器应用于返回的行。

此方法在运行时返回相同的 Result 对象,但注释为返回 TupleResult 对象,该对象将向 PEP 484 类型工具指示返回的是纯类型化的 Tuple 实例,而不是行。 这允许对 Row 对象进行元组解包和 __getitem__ 访问以进行类型化,对于那些调用的语句本身包含类型信息的情况。

2.0 版本新增。

返回值:

类型提示时的 TupleResult 类型。

另请参阅

Result.t - 更短的同义词

Row._t - Row 版本

方法 sqlalchemy.engine.Result.unique(strategy: Callable[[Any], Any] | None = None) Self

对此 Result 返回的对象应用唯一性过滤。

当应用此过滤器且不带任何参数时,返回的行或对象将被过滤,以便唯一地返回每一行。用于确定此唯一性的算法默认是整个元组的 Python 哈希标识。在某些情况下,可以使用专门的按实体哈希方案,例如当使用 ORM 时,应用一种针对返回对象的主键标识的方案。

唯一过滤器在所有其他过滤器之后应用,这意味着如果已使用诸如 Result.columns()Result.scalars() 方法等方法优化了返回的列,则唯一化将应用于仅返回的列。 无论在 Result 对象上调用这些方法的顺序如何,都会发生这种情况。

唯一过滤器还会更改用于诸如 Result.fetchmany()Result.partitions() 等方法的计算规则。 当使用 Result.unique() 时,这些方法将在应用唯一化后继续产生请求的行数或对象数。 但是,这必然会影响底层游标或数据源的缓冲行为,因此可能需要多次调用底层的 cursor.fetchmany() 才能累积足够的对象,以便提供请求大小的唯一集合。

参数:

strategy – 一个可调用对象,它将被应用于正在迭代的行或对象,并且应该返回一个表示行唯一值的对象。Python 的 set() 用于存储这些标识。如果未传递,则使用默认的唯一性策略,该策略可能由此 Result 对象的源组装而成。

方法 sqlalchemy.engine.Result.yield_per(num: int) Self

配置行提取策略,一次提取 num 行。

这会影响结果在迭代结果对象时或以其他方式使用诸如 Result.fetchone() 之类一次返回一行的方法的基础行为。来自底层游标或其他数据源的数据将被缓冲到内存中最多这么多行,然后缓冲的集合将一次生成一行或根据需要生成多行。每次缓冲区清除时,它将被刷新为这么多行或剩余的行数(如果较少)。

Result.yield_per() 方法通常与 Connection.execution_options.stream_results 执行选项结合使用,这将允许正在使用的数据库方言利用服务器端游标,如果 DBAPI 支持与默认操作模式分开的特定 “服务器端游标” 模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,该选项将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法以一次建立固定的行缓冲区大小。

Connection.execution_options.yield_per 执行选项可用于 ORM 操作,有关 Session 面向的用法,请参阅 使用 Yield Per 获取大型结果集Connection 的 Core-only 版本是 SQLAlchemy 1.4.40 中的新功能。

1.4 版本新增。

参数:

num – 每次缓冲区重新填充时要获取的行数。如果设置为小于 1 的值,则获取下一个缓冲区的全部行。

sqlalchemy.engine.ScalarResult

一个 Result 的包装器,它返回标量值而不是 Row 值。

ScalarResult 对象通过调用 Result.scalars() 方法获取。

ScalarResult 的一个特殊限制是它没有 fetchone() 方法;由于 fetchone() 的语义是 None 值表示没有更多结果,这与 ScalarResult 不兼容,因为无法区分 None 作为行值与 None 作为指示符。使用 next(result) 逐个接收值。

方法 sqlalchemy.engine.ScalarResult.all() Sequence[_R]

返回序列中的所有标量值。

等效于 Result.all(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.close() None

继承自 FilterResult.close() 方法,属于 FilterResult

关闭此 FilterResult

1.4.43 版本新增。

属性 sqlalchemy.engine.ScalarResult.closed

继承自 FilterResult.closed 属性,属于 FilterResult

如果底层 Result 报告已关闭,则返回 True

1.4.43 版本新增。

方法 sqlalchemy.engine.ScalarResult.fetchall() Sequence[_R]

ScalarResult.all() 方法的同义词。

方法 sqlalchemy.engine.ScalarResult.fetchmany(size: int | None = None) Sequence[_R]

获取多个对象。

等效于 Result.fetchmany(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.first() _R | None

获取第一个对象,如果没有对象则返回 None

等效于 Result.first(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.one() _R

返回恰好一个对象,否则引发异常。

等效于 Result.one(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.one_or_none() _R | None

最多返回一个对象,否则引发异常。

等效于 Result.one_or_none(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.partitions(size: int | None = None) Iterator[Sequence[_R]]

迭代给定大小的元素子列表。

等效于 Result.partitions(),不同之处在于返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.unique(strategy: Callable[[Any], Any] | None = None) Self

对此 ScalarResult 返回的对象应用唯一性过滤。

有关用法详情,请参阅 Result.unique()

方法 sqlalchemy.engine.ScalarResult.yield_per(num: int) Self

继承自 FilterResult.yield_per() 方法,属于 FilterResult

配置行提取策略,一次提取 num 行。

FilterResult.yield_per() 方法是传递给 Result.yield_per() 方法的通道。 有关使用说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

sqlalchemy.engine.MappingResult

一个 Result 的包装器,它返回字典值而不是 Row 值。

MappingResult 对象通过调用 Result.mappings() 方法获取。

类签名

class sqlalchemy.engine.MappingResult (sqlalchemy.engine._WithKeys, sqlalchemy.engine.FilterResult)

方法 sqlalchemy.engine.MappingResult.all() Sequence[RowMapping]

返回序列中的所有标量值。

等效于 Result.all(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.close() None

继承自 FilterResult.close() 方法,属于 FilterResult

关闭此 FilterResult

1.4.43 版本新增。

属性 sqlalchemy.engine.MappingResult.closed

继承自 FilterResult.closed 属性,属于 FilterResult

如果底层 Result 报告已关闭,则返回 True

1.4.43 版本新增。

方法 sqlalchemy.engine.MappingResult.columns(*col_expressions: _KeyIndexType) Self

建立每行应返回的列。

方法 sqlalchemy.engine.MappingResult.fetchall() Sequence[RowMapping]

MappingResult.all() 方法的同义词。

方法 sqlalchemy.engine.MappingResult.fetchmany(size: int | None = None) Sequence[RowMapping]

获取多个对象。

等效于 Result.fetchmany(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.fetchone() RowMapping | None

获取一个对象。

等效于 Result.fetchone(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.first() RowMapping | None

获取第一个对象,如果没有对象则返回 None

等效于 Result.first(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法,来自 sqlalchemy.engine._WithKeys

返回一个可迭代的视图,该视图产生每个 Row 将表示的字符串键。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

该视图还可以使用 Python in 运算符测试键的包含性,这将测试视图中表示的字符串键,以及备用键(例如列对象)。

Changed in version 1.4: 返回键视图对象而不是普通列表。

方法 sqlalchemy.engine.MappingResult.one() RowMapping

返回恰好一个对象,否则引发异常。

等效于 Result.one(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.one_or_none() RowMapping | None

最多返回一个对象,否则引发异常。

等效于 Result.one_or_none(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.partitions(size: int | None = None) Iterator[Sequence[RowMapping]]

迭代给定大小的元素子列表。

等效于 Result.partitions(),不同之处在于返回的是 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.unique(strategy: Callable[[Any], Any] | None = None) Self

对此 MappingResult 返回的对象应用唯一性过滤。

有关用法详情,请参阅 Result.unique()

方法 sqlalchemy.engine.MappingResult.yield_per(num: int) Self

继承自 FilterResult.yield_per() 方法,属于 FilterResult

配置行提取策略,一次提取 num 行。

FilterResult.yield_per() 方法是传递给 Result.yield_per() 方法的通道。 有关使用说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

sqlalchemy.engine.Row

表示单个结果行。

Row 对象表示数据库结果的一行。在 SQLAlchemy 1.x 系列中,它通常与 CursorResult 对象关联,但在 SQLAlchemy 1.4 中,ORM 也将其用于类似元组的结果。

Row 对象旨在尽可能地表现得像 Python 命名元组。 对于行的映射(即字典)行为,例如测试键的包含,请参阅 Row._mapping 属性。

另请参阅

使用 SELECT 语句 - 包括从 SELECT 语句中选择行的示例。

在版本 1.4 中更改:重命名 RowProxyRowRow 不再是 “代理” 对象,因为它包含最终形式的数据,现在主要表现得像一个命名元组。 类似映射的功能已移至 Row._mapping 属性。 有关此更改的背景,请参阅 RowProxy 不再是 “代理”; 现在称为 Row,并且行为类似于增强的命名元组

类签名

class sqlalchemy.engine.Row (sqlalchemy.engine._py_row.BaseRow, collections.abc.Sequence, typing.Generic)

方法 sqlalchemy.engine.Row. _asdict ( ) Dict[str, Any]

返回一个新的字典,该字典将字段名称映射到其对应的值。

此方法类似于 Python 命名元组 ._asdict() 方法,并通过将 dict() 构造函数应用于 Row._mapping 属性来工作。

1.4 版本新增。

另请参阅

Row._mapping

属性 sqlalchemy.engine.Row. _fields

返回此 Row 表示的字符串键的元组。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

此属性类似于 Python 命名元组 ._fields 属性。

1.4 版本新增。

另请参阅

Row._mapping

属性 sqlalchemy.engine.Row. _mapping

返回此 RowRowMapping

此对象为行中包含的数据提供一致的 Python 映射(即字典)接口。 Row 本身的行为类似于命名元组。

另请参阅

Row._fields

1.4 版本新增。

属性 sqlalchemy.engine.Row. _t

Row._tuple() 的同义词。

在版本 2.0.19 中新增:- Row._t 属性取代了之前的 Row.t 属性,后者现在带有下划线,以避免与列名发生名称冲突,就像 Row 上的其他命名元组方法一样。

另请参阅

Result.t

方法 sqlalchemy.engine.Row. _tuple ( ) _TP

返回此 Row 的 “元组” 形式。

在运行时,此方法返回 “self”; Row 对象已经是一个命名元组。 但是,在类型级别,如果此 Row 是类型化的,则 “元组” 返回类型将是一个 PEP 484 Tuple 数据类型,其中包含有关各个元素的类型信息,从而支持类型化的解包和属性访问。

在版本 2.0.19 中新增:- Row._tuple() 方法取代了之前的 Row.tuple() 方法,后者现在带有下划线,以避免与列名发生名称冲突,就像 Row 上的其他命名元组方法一样。

另请参阅

Row._t - 简写属性表示法

Result.tuples()

属性 sqlalchemy.engine.Row. count
属性 sqlalchemy.engine.Row. index
属性 sqlalchemy.engine.Row. t

Row._tuple() 的同义词。

自版本 2.0.19 起已弃用:Row.t 属性已被弃用,推荐使用 Row._t; 所有 Row 方法和库级属性都旨在以下划线开头,以避免名称冲突。 请使用 Row._t

2.0 版本新增。

方法 sqlalchemy.engine.Row. tuple ( ) _TP

返回此 Row 的 “元组” 形式。

自版本 2.0.19 起已弃用:Row.tuple() 方法已被弃用,推荐使用 Row._tuple(); 所有 Row 方法和库级属性都旨在以下划线开头,以避免名称冲突。 请使用 Row._tuple()

2.0 版本新增。

sqlalchemy.engine. RowMapping

一个 Mapping,它将列名和对象映射到 Row 值。

RowMapping 可从 Row 通过 Row._mapping 属性获得,以及从 MappingResult 对象提供的可迭代接口获得,该对象由 Result.mappings() 方法返回。

RowMapping 提供对行内容的 Python 映射(即字典)访问。 这包括对测试特定键(字符串列名或对象)的包含的支持,以及键、值和项的迭代

for row in result:
    if "a" in row._mapping:
        print("Column 'a': %s" % row._mapping["a"])

    print("Column b: %s" % row._mapping[table.c.b])

在版本 1.4 中新增:RowMapping 对象取代了以前由数据库结果行提供的类似映射的访问,该行现在旨在主要表现得像一个命名元组。

类签名

class sqlalchemy.engine.RowMapping (sqlalchemy.engine._py_row.BaseRow, collections.abc.Mapping, typing.Generic)

方法 sqlalchemy.engine.RowMapping. items ( ) ROMappingItemsView

返回底层 Row 中元素的键/值元组的视图。

方法 sqlalchemy.engine.RowMapping. keys ( ) RMKeyView

返回底层 Row 表示的字符串列名的 “键” 的视图。

方法 sqlalchemy.engine.RowMapping. values ( ) ROMappingKeysValuesView

返回底层 Row 中表示的值的视图。

sqlalchemy.engine. TupleResult

一个 Result,其类型被指定为返回纯 Python 元组而不是行。

由于 Row 在各方面都像元组一样工作,因此此类仅是类型提示类,运行时仍使用常规 Result

类签名

class sqlalchemy.engine.TupleResult (sqlalchemy.engine.FilterResult, sqlalchemy.util.langhelpers.TypingOnly)