使用引擎和连接

本节详细介绍了 EngineConnection 和相关对象的直接使用方式。需要注意的是,在使用 SQLAlchemy ORM 时,通常不会访问这些对象;而是使用 Session 对象作为数据库的接口。但是,对于那些直接使用文本 SQL 语句和/或 SQL 表达式构造而无需 ORM 高级管理服务的应用程序,EngineConnection 是关键(和女王?) - 继续阅读。

基本用法

回顾 引擎配置Engine 是通过 create_engine() 调用创建的

engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test")

create_engine() 的典型用法是对每个特定的数据库 URL 只调用一次,在单个应用程序进程的整个生命周期中全局保留。单个 Engine 代表进程管理许多单独的 DBAPI 连接,并且旨在以并发方式被调用。 Engine 等同于 DBAPI 的 connect() 函数,后者只代表一个连接资源 - Engine 在模块级别创建一次时效率最高,而不是在每个对象或每个函数调用时创建。

Engine 的最基本功能是提供对 Connection 的访问权限,然后可以使用它调用 SQL 语句。要向数据库发出文本语句,操作如下

from sqlalchemy import text

with engine.connect() as connection:
    result = connection.execute(text("select username from users"))
    for row in result:
        print("username:", row.username)

上面的 Engine.connect() 方法返回一个 Connection 对象,通过在 Python 上下文管理器(例如 with: 语句)中使用它,Connection.close() 方法会在块结束时自动调用。 Connection 是实际 DBAPI 连接的**代理**对象。DBAPI 连接是在创建 Connection 时从连接池中检索的。

返回的对象称为 CursorResult,它引用 DBAPI 游标,并提供类似于 DBAPI 游标的方法来获取行。当 CursorResult 的所有结果行(如果有)都耗尽时,DBAPI 游标将由 CursorResult 关闭。不返回任何行的 CursorResult,例如没有返回行的 UPDATE 语句,将在构造后立即释放游标资源。

with: 块结束时,Connection 关闭,引用的 DBAPI 连接将释放到连接池中。从数据库本身的角度来看,连接池实际上不会“关闭”连接,假设池有空间存储此连接以便下次使用。当连接返回到池中以供重复使用时,池机制会在 DBAPI 连接上发出 rollback() 调用,以删除任何事务状态或锁(这被称为 返回时重置),并且连接已准备好供下次使用。

我们上面的示例演示了文本 SQL 字符串的执行,应使用 text() 结构来调用,以指示我们想要使用文本 SQL。当然,Connection.execute() 方法可以容纳更多内容;请参阅 处理数据(位于 SQLAlchemy 统一教程 中)以获取教程。

使用事务

注意

本节介绍了在直接使用 EngineConnection 对象时如何使用事务。在使用 SQLAlchemy ORM 时,事务控制的公共 API 是通过 Session 对象,它在内部使用 Transaction 对象。有关更多信息,请参阅 管理事务

边走边提交

Connection 对象始终在事务块的上下文中发出 SQL 语句。第一次调用 Connection.execute() 方法以执行 SQL 语句时,此事务将使用称为**autobegin**的行为自动开始。事务将继续在 Connection 对象的作用域内持续存在,直到调用 Connection.commit()Connection.rollback() 方法。在事务结束之后,Connection 将等待再次调用 Connection.execute() 方法,此时它将再次自动开始。

此调用方式被称为**边走边提交**,并在下面的示例中进行了说明

with engine.connect() as connection:
    connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

    connection.commit()  # commit the transaction

在“边走边提交”的风格中,我们可以在使用 Connection.execute() 发出的其他语句的持续序列中随意调用 Connection.commit()Connection.rollback() 方法;每次事务结束时,以及发出新语句时,都会隐式地开始一个新事务

with engine.connect() as connection:
    connection.execute(text("<some statement>"))
    connection.commit()  # commits "some statement"

    # new transaction starts
    connection.execute(text("<some other statement>"))
    connection.rollback()  # rolls back "some other statement"

    # new transaction starts
    connection.execute(text("<a third statement>"))
    connection.commit()  # commits "a third statement"

2.0 版新增: “边走边提交”的风格是 SQLAlchemy 2.0 的一项新功能。它也适用于使用“future”风格引擎时,SQLAlchemy 1.4 的“transitional”模式。

只开始一次

Connection 对象提供了一种更明确的事务管理风格,称为**只开始一次**。与“边走边提交”相比,“只开始一次”允许显式地声明事务的起点,并允许将事务本身框定为上下文管理器块,以便事务的结束隐式地发生。要使用“只开始一次”,将使用 Connection.begin() 方法,它将返回一个 Transaction 对象,它代表 DBAPI 事务。此对象还支持通过其自身的 Transaction.commit()Transaction.rollback() 方法进行显式管理,但作为首选做法,它还支持上下文管理器接口,在该接口中,如果块正常结束,它将自行提交,并在抛出异常时发出回滚,然后将异常向外传播。下面说明了“只开始一次”块的形式

with engine.connect() as connection:
    with connection.begin():
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
        connection.execute(
            some_other_table.insert(), {"q": 8, "p": "this is some more data"}
        )

    # transaction is committed

从引擎连接并只开始一次

上述“一次开始”块的便捷简写形式是在源 Engine 对象级别使用 Engine.begin() 方法,而不是执行 Engine.connect()Connection.begin() 两个独立的步骤;Engine.begin() 方法返回一个特殊的上下文管理器,它在内部维护 Connection 的上下文管理器以及通常由 Connection.begin() 方法返回的 Transaction 的上下文管理器。

with engine.begin() as connection:
    connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

# transaction is committed, and Connection is released to the connection
# pool

提示

Engine.begin() 块中,我们可以调用 Connection.commit()Connection.rollback() 方法,这将提前结束由块标记的交易。但是,如果我们这样做,在块结束之前,Connection 上将不再发出任何 SQL 操作。

>>> from sqlalchemy import create_engine
>>> e = create_engine("sqlite://", echo=True)
>>> with e.begin() as conn:
...     conn.commit()
...     conn.begin()
2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2021-11-08 09:49:07,517 INFO sqlalchemy.engine.Engine COMMIT
Traceback (most recent call last):
...
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside
context manager.  Please complete the context manager before emitting
further commands.

混合样式

只要调用 Connection.begin() 不与“自动开始”行为冲突,就可以在一个 Engine.connect() 块中自由混合“即时提交”和“一次开始”样式。为此,Connection.begin() 只能在发出任何 SQL 语句之前调用,或者在之前调用 Connection.commit()Connection.rollback() 之后直接调用。

with engine.connect() as connection:
    with connection.begin():
        # run statements in a "begin once" block
        connection.execute(some_table.insert(), {"x": 7, "y": "this is some data"})

    # transaction is committed

    # run a new statement outside of a block. The connection
    # autobegins
    connection.execute(
        some_other_table.insert(), {"q": 8, "p": "this is some more data"}
    )

    # commit explicitly
    connection.commit()

    # can use a "begin once" block here
    with connection.begin():
        # run more statements
        connection.execute(...)

在开发使用“一次开始”的代码时,如果交易已经“自动开始”,库将引发 InvalidRequestError

设置交易隔离级别,包括 DBAPI 自动提交

大多数 DBAPI 支持可配置的交易 隔离 级别。传统上,这四个级别是“READ UNCOMMITTED”、“READ COMMITTED”、“REPEATABLE READ”和“SERIALIZABLE”。这些通常应用于 DBAPI 连接,在它开始新的交易之前,要注意大多数 DBAPI 会在首次发出 SQL 语句时隐式地开始此交易。

支持隔离级别的 DBAPI 通常也支持真正的“自动提交”,这意味着 DBAPI 连接本身将被置于非交易自动提交模式。这通常意味着 DBAPI 的典型行为(即自动向数据库发出“BEGIN”)不再发生,但也可能包括其他指令。SQLAlchemy 将“自动提交”的概念视为其他任何隔离级别;它是一个失去“读取已提交”的隔离级别,并且还失去了原子性。

提示

重要的是要注意,“自动提交”隔离级别与任何其他隔离级别一样,**不会**影响 Connection 对象的“事务”行为,该对象继续调用 DBAPI 的 .commit().rollback() 方法(它们在自动提交下没有效果),并且 .begin() 方法假设 DBAPI 会隐式地启动交易(这意味着 SQLAlchemy 的“开始”**不会改变自动提交模式**)。

SQLAlchemy 方言应该尽可能多地支持这些隔离级别以及自动提交。

为连接设置隔离级别或 DBAPI 自动提交

对于从 Engine.connect() 获取的单个 Connection 对象,可以使用 Connection.execution_options() 方法为该 Connection 对象的持续时间设置隔离级别。参数称为 Connection.execution_options.isolation_level,值是字符串,通常是以下名称的子集

# possible values for Connection.execution_options(isolation_level="<value>")

"AUTOCOMMIT"
"READ COMMITTED"
"READ UNCOMMITTED"
"REPEATABLE READ"
"SERIALIZABLE"

并非每个 DBAPI 都支持每个值;如果对某个后端使用不支持的值,则会引发错误。

例如,要强制特定连接上的 REPEATABLE READ,然后开始交易

with engine.connect().execution_options(
    isolation_level="REPEATABLE READ"
) as connection:
    with connection.begin():
        connection.execute(text("<statement>"))

提示

Connection.execution_options() 方法的返回值是调用该方法的相同 Connection 对象,这意味着它修改了 Connection 对象的状态。这是从 SQLAlchemy 2.0 开始的新行为。此行为不适用于 Engine.execution_options() 方法;该方法仍然返回 Engine 的副本,如下所述,它可用于构建具有不同执行选项的多个 Engine 对象,但这些对象共享相同的方言和连接池。

注意

Connection.execution_options.isolation_level 参数不适用于语句级别的选项,例如 Executable.execution_options() 的选项,如果在该级别设置,则会被拒绝。这是因为必须在每个事务的基础上针对 DBAPI 连接设置该选项。

为引擎设置隔离级别或 DBAPI 自动提交

Connection.execution_options.isolation_level 选项也可以在引擎级别设置,这通常更可取。可以通过将 create_engine.isolation_level 参数传递给 create_engine() 来实现。

from sqlalchemy import create_engine

eng = create_engine(
    "postgresql://scott:tiger@localhost/test", isolation_level="REPEATABLE READ"
)

使用上述设置,每个新的 DBAPI 连接在创建的那一刻都会被设置为对所有后续操作使用 "REPEATABLE READ" 隔离级别设置。

为单个引擎维护多个隔离级别

隔离级别也可以在每个引擎级别设置,并具有更高的灵活性,可以使用 create_engine.execution_options 参数传递给 create_engine() 或使用 Engine.execution_options() 方法,后者将创建一个共享原始引擎的方言和连接池的 Engine 副本,但它有自己的每个连接隔离级别设置。

from sqlalchemy import create_engine

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    execution_options={"isolation_level": "REPEATABLE READ"},
)

使用上述设置,DBAPI 连接将被设置为对每个开始的新交易使用 "REPEATABLE READ" 隔离级别设置;但连接作为池化将被重置为连接首次发生时存在的原始隔离级别。在 create_engine() 级别,最终效果与使用 create_engine.isolation_level 参数没有区别。

然而,一个经常选择在不同隔离级别下运行操作的应用程序可能希望为一个主要的 Engine 创建多个“子引擎”,每个子引擎都将配置为不同的隔离级别。一个这样的用例是,应用程序的操作分为“事务性”和“只读”操作,一个单独的 Engine 利用 "AUTOCOMMIT" 可以从主引擎中分离出来。

from sqlalchemy import create_engine

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

上面,Engine.execution_options() 方法创建了原始 Engine 的浅拷贝。engautocommit_engine 共享相同的方言和连接池。但是,当从 autocommit_engine 获取连接时,将对连接设置“AUTOCOMMIT”模式。

无论隔离级别设置是什么,当连接返回到连接池时,都会无条件地恢复。

了解 DBAPI 级别的 Autocommit 隔离级别

在上一节中,我们介绍了 Connection.execution_options.isolation_level 参数的概念,以及如何使用它来设置数据库隔离级别,包括 DBAPI 级别的“autocommit”,它被 SQLAlchemy 视为另一个事务隔离级别。在本节中,我们将尝试阐明这种方法的影响。

如果我们要签出 Connection 对象并在“autocommit”模式下使用它,我们将按如下方式进行

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")
    connection.execute(text("<statement>"))
    connection.execute(text("<statement>"))

上面说明了“DBAPI autocommit”模式的正常用法。不需要使用诸如 Connection.begin()Connection.commit() 之类的方法,因为所有语句都会立即提交到数据库。当块结束时,Connection 对象将恢复“autocommit”隔离级别,并且 DBAPI 连接将被释放到连接池中,在那里 DBAPI connection.rollback() 方法通常会被调用,但是由于上面的语句已经被提交,所以此回滚对数据库的状态没有改变。

重要的是要注意,“autocommit”模式即使在调用 Connection.begin() 方法时也会持续存在;DBAPI 不会向数据库发出任何 BEGIN,也不会在调用 Connection.commit() 时发出 COMMIT。这种用法也不是错误情况,因为预计“autocommit”隔离级别可以应用于以事务性上下文为前提编写的代码;毕竟,“隔离级别”本身是事务本身的配置细节,就像任何其他隔离级别一样。

在下面的示例中,语句将保持自动提交,无论 SQLAlchemy 级别的事务块如何

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")

    # this begin() does not affect the DBAPI connection, isolation stays at AUTOCOMMIT
    with connection.begin() as trans:
        connection.execute(text("<statement>"))
        connection.execute(text("<statement>"))

当我们像上面一样运行一个带有日志记录的块时,日志记录将尝试表明,虽然调用了 DBAPI 级别的 .commit(),但它可能由于 autocommit 模式而无效

INFO sqlalchemy.engine.Engine BEGIN (implicit)
...
INFO sqlalchemy.engine.Engine COMMIT using DBAPI connection.commit(), DBAPI should ignore due to autocommit mode

同时,即使我们使用“DBAPI autocommit”,SQLAlchemy 的事务语义(即,Connection.begin() 在 Python 中的行为以及“autobegin”的行为)仍然存在,即使这些不会影响 DBAPI 连接本身。为了说明这一点,下面的代码将引发错误,因为在 autobegin 已经发生之后调用了 Connection.begin()

with engine.connect() as connection:
    connection = connection.execution_options(isolation_level="AUTOCOMMIT")

    # "transaction" is autobegin (but has no effect due to autocommit)
    connection.execute(text("<statement>"))

    # this will raise; "transaction" is already begun
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面的示例也说明了相同的主题,即“autocommit”隔离级别是底层数据库事务的配置细节,独立于 SQLAlchemy Connection 对象的 begin/commit 行为。“autocommit”模式不会以任何方式与 Connection.begin() 交互,并且 Connection 在执行其自身关于事务的状态更改时不会查询此状态(除了在引擎日志中建议这些块实际上没有提交之外)。这种设计的理由是,要保持与 Connection 完全一致的使用模式,其中 DBAPI-autocommit 模式可以独立地更改,而无需在其他地方指示任何代码更改。

在隔离级别之间切换

隔离级别设置(包括 autocommit 模式)在连接释放回连接池时会自动重置。因此,最好避免尝试在一个 Connection 对象上切换隔离级别,因为这会导致过度冗长。

为了说明如何在单个 Connection 签出范围内的临时模式下使用“autocommit”,Connection.execution_options.isolation_level 参数必须重新应用之前的隔离级别。上一节说明了尝试调用 Connection.begin() 以在 autocommit 发生时开始事务;我们可以通过首先在调用 Connection.begin() 之前恢复隔离级别来重写该示例

# if we wanted to flip autocommit on and off on a single connection/
# which... we usually don't.

with engine.connect() as connection:
    connection.execution_options(isolation_level="AUTOCOMMIT")

    # run statement(s) in autocommit mode
    connection.execute(text("<statement>"))

    # "commit" the autobegun "transaction"
    connection.commit()

    # switch to default isolation level
    connection.execution_options(isolation_level=connection.default_isolation_level)

    # use a begin block
    with connection.begin() as trans:
        connection.execute(text("<statement>"))

上面,为了手动恢复隔离级别,我们使用了 Connection.default_isolation_level 来恢复默认隔离级别(假设这就是我们在这里想要的)。但是,也许更好的方法是使用 Connection 的体系结构,该体系结构已经在签入时自动处理隔离级别的重置。首选方法是使用两个块

# use an autocommit block
with engine.connect().execution_options(isolation_level="AUTOCOMMIT") as connection:
    # run statement in autocommit mode
    connection.execute(text("<statement>"))

# use a regular block
with engine.begin() as connection:
    connection.execute(text("<statement>"))

总结一下

  1. “DBAPI 级别的 autocommit”隔离级别完全独立于 Connection 对象的“begin”和“commit”的概念

  2. 每个隔离级别使用单个 Connection 签出。避免尝试在一个连接签出中来回切换“autocommit”;让引擎来完成恢复默认隔离级别的工作

使用服务器端游标(又名流结果)

一些后端明确支持“服务器端游标”与“客户端游标”的概念。这里的客户端游标意味着数据库驱动程序在从语句执行返回之前将结果集中的所有行完全获取到内存中。诸如 PostgreSQL 和 MySQL/MariaDB 的驱动程序通常默认使用客户端游标。相反,服务器端游标表示结果行在客户端使用结果行时仍保留在数据库服务器的状态中。例如,Oracle 的驱动程序通常使用“服务器端”模型,而 SQLite 方言虽然没有使用真正的“客户端/服务器”架构,但仍然使用非缓冲的结果获取方法,该方法会在使用结果行之前将其保留在进程内存之外。

从这种基本架构可以得出,当获取非常大的结果集时,"服务器端游标"更节省内存,但同时可能会在客户端/服务器通信过程中引入更多复杂性,并且对于较小的结果集(通常小于 10000 行)效率更低。

对于那些对缓冲或非缓冲结果具有条件支持的方言,通常对使用“非缓冲”或服务器端游标模式有一些注意事项。例如,当使用 psycopg2 方言时,如果服务器端游标与任何类型的 DML 或 DDL 语句一起使用,则会引发错误。当使用带有服务器端游标的 MySQL 驱动程序时,DBAPI 连接处于更脆弱的状态,并且不会从错误条件中优雅地恢复,也不会允许回滚继续,直到游标完全关闭。

出于这个原因,SQLAlchemy 的方言将始终默认使用游标的错误更少的版本,这意味着对于 PostgreSQL 和 MySQL 方言,它默认使用缓冲的“客户端”游标,其中完整的结果集在从游标调用任何 fetch 方法之前被拉入内存。这种操作模式在**绝大多数**情况下都是合适的;非缓冲游标通常没有用,除了在应用程序以块的形式获取大量行这种罕见情况下,这些行的处理可以在获取更多行之前完成。

对于提供客户端和服务器端游标选项的数据库驱动程序,Connection.execution_options.stream_resultsConnection.execution_options.yield_per 执行选项提供了对每个Connection 或每个语句的“服务器端游标”的访问。在使用 ORM Session 时也存在类似的选项。

通过 yield_per 进行固定缓冲的流式传输

由于使用完全未缓冲的服务器端游标的单个行获取操作通常比一次获取多行批次更昂贵,因此 Connection.execution_options.yield_per 执行选项配置了一个 Connection 或语句以使用可用的服务器端游标,同时配置一个固定大小的行缓冲区,该缓冲区将按批次从服务器检索行,因为它们被消耗。此参数可以使用 Connection.execution_options() 方法在 Connection 上或使用 Executable.execution_options() 方法在语句上设置为正整数。

版本 1.4.40 中的新增功能: Connection.execution_options.yield_per 作为仅针对 Core 的选项,从 SQLAlchemy 1.4.40 开始新增;对于之前的 1.4 版本,请直接使用 Connection.execution_options.stream_resultsResult.yield_per() 结合使用。

使用此选项等同于手动设置 Connection.execution_options.stream_results 选项(在下一节中描述),然后在 Result 对象上调用 Result.yield_per() 方法,并使用给定的整数。在这两种情况下,这种组合产生的效果都包括

  • 如果可用并且不是该后端默认行为,则会为给定后端选择服务器端游标模式

  • 当结果行被获取时,它们将被按批次缓冲,其中除最后一批之外的每一批的大小将等于传递给 Connection.execution_options.yield_per 选项或 Result.yield_per() 方法的整数参数;最后一批的尺寸将根据剩余行少于此尺寸进行调整

  • 如果使用,Result.partitions() 方法使用的默认分区大小也将设置为此整数大小。

下面的示例说明了这三种行为

with engine.connect() as conn:
    with conn.execution_options(yield_per=100).execute(
        text("select * from table")
    ) as result:
        for partition in result.partitions():
            # partition is an iterable that will be at most 100 items
            for row in partition:
                print(f"{row}")

上面的示例说明了 yield_per=100 的组合以及使用 Result.partitions() 方法在与从服务器获取的大小匹配的批次中对行运行处理。使用 Result.partitions() 是可选的,如果直接迭代 Result,则将为每个获取的 100 行缓冲新一批行。调用诸如 Result.all() 之类的​​方法**不应**使用,因为这将一次性完全获取所有剩余行,并破坏使用 yield_per 的目的。

提示

Result 对象可以作为上下文管理器使用,如上所示。当使用服务器端游标进行迭代时,这是确保 Result 对象关闭的最佳方法,即使迭代过程中引发了异常。

Connection.execution_options.yield_per 选项也可以移植到 ORM,由 Session 用于获取 ORM 对象,它也会限制一次生成的 ORM 对象的数量。有关使用 Connection.execution_options.yield_per 与 ORM 的更多背景信息,请参见部分 使用 Yield Per 获取大型结果集 - 在 ORM 查询指南 中。

版本 1.4.40 中的新增功能: 添加了 Connection.execution_options.yield_per 作为核心级别的执行选项,以便以可移植到 ORM 的类似用例的方式方便地一次设置流式结果、缓冲区大小和分区大小。

使用 stream_results 进行动态增长缓冲区的流式传输

要启用服务器端游标而无需指定分区大小,可以使用 Connection.execution_options.stream_results 选项,它类似于 Connection.execution_options.yield_per,可以在 Connection 对象或语句对象上调用。

当直接迭代使用 Connection.execution_options.stream_results 选项传递的 Result 对象时,行将使用默认的缓冲方案在内部获取,该方案首先缓冲一小部分行,然后在每次获取时缓冲越来越多的缓冲区,直到预先配置的 1000 行限制。可以使用 Connection.execution_options.max_row_buffer 执行选项影响此缓冲区的最大大小

with engine.connect() as conn:
    with conn.execution_options(stream_results=True, max_row_buffer=100).execute(
        text("select * from table")
    ) as result:
        for row in result:
            print(f"{row}")

虽然 Connection.execution_options.stream_results 选项可以与 Result.partitions() 方法结合使用,但应向 Result.partitions() 传递特定的分区大小,以确保不会获取整个结果。通常,在设置使用 Result.partitions() 方法时,使用 Connection.execution_options.yield_per 选项更直接。

架构名称的转换

为了支持将通用表格集分布到多个架构中的多租户应用程序,可以使用 Connection.execution_options.schema_translate_map 执行选项,无需任何更改,即可将一组 Table 对象重新用作不同架构名称下的对象。

给定一个表格

user_table = Table(
    "user",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("name", String(50)),
)

Table 的“架构”由 Table.schema 属性定义,值为 NoneConnection.execution_options.schema_translate_map 可以指定,所有架构为 NoneTable 对象,其架构将呈现为 user_schema_one

connection = engine.connect().execution_options(
    schema_translate_map={None: "user_schema_one"}
)

result = connection.execute(user_table.select())

以上代码将调用数据库上的 SQL,形式如下

SELECT user_schema_one.user.id, user_schema_one.user.name FROM
user_schema_one.user

也就是说,架构名称将替换为我们的翻译名称。该映射可以指定任意数量的源->目标架构

connection = engine.connect().execution_options(
    schema_translate_map={
        None: "user_schema_one",  # no schema name -> "user_schema_one"
        "special": "special_schema",  # schema="special" becomes "special_schema"
        "public": None,  # Table objects with schema="public" will render with no schema
    }
)

Connection.execution_options.schema_translate_map 参数会影响从 SQL 表达式语言生成的(从 TableSequence 对象派生)所有 DDL 和 SQL 结构。它 **不** 影响通过 text() 结构或通过传递给 Connection.execute() 的纯字符串使用的文字字符串 SQL。

此功能 **仅** 在从 TableSequence 直接派生架构名称的情况下起作用;它不会影响直接传递字符串架构名称的方法。根据这种模式,它将在 MetaData.create_all()MetaData.drop_all() 等方法执行的“可以创建”/“可以删除”检查中生效,并且在使用给定 Table 对象进行表格反射时生效。但是,它 **不会** 影响 Inspector 对象上的操作,因为架构名称是显式传递给这些方法的。

提示

要将架构转换功能与 ORM Session 结合使用,请在 Engine 级别设置此选项,然后将该引擎传递给 SessionSession 为每个事务使用新的 Connection

schema_engine = engine.execution_options(schema_translate_map={...})

session = Session(schema_engine)

...

警告

在不使用扩展的情况下使用 ORM Session 时,架构转换功能仅支持 **每个 Session 一个架构转换映射**。如果在每个语句的基础上提供了不同的架构转换映射,则 **无法使用** 此功能,因为 ORM Session 不会考虑单个对象的当前架构转换值。

要使用具有多个 schema_translate_map 配置的单个 Session,可以使用 水平分片 扩展。请参阅 水平分片 中的示例。

SQL 编译缓存

新功能在版本 1.4 中添加: SQLAlchemy 现在拥有一个透明的查询缓存系统,该系统大大降低了将 SQL 语句结构转换为 SQL 字符串(跨核心和 ORM)所涉及的 Python 计算开销。请参阅 核心和 ORM 中所有 DQL、DML 语句添加了透明 SQL 编译缓存 中的介绍。

SQLAlchemy 包括一个用于 SQL 编译器及其 ORM 变体的综合缓存系统。该缓存系统在 Engine 中是透明的,并提供以下功能:对于给定的核心或 ORM SQL 语句,以及与该语句相关的用于组装结果提取机制的计算,SQL 编译过程只会在该语句对象以及所有结构相同的其他对象(对于该特定结构在引擎的“编译缓存”中存在的持续时间)上执行一次。通过“结构相同的语句对象”,这通常对应于在函数中构建的 SQL 语句,并且每次函数运行时都会构建该语句

def run_my_statement(connection, parameter):
    stmt = select(table)
    stmt = stmt.where(table.c.col == parameter)
    stmt = stmt.order_by(table.c.id)
    return connection.execute(stmt)

上述语句将生成类似于 SELECT id, col FROM table WHERE col = :col ORDER BY id 的 SQL,请注意,虽然 parameter 的值是一个简单的 Python 对象,例如字符串或整数,但语句的字符串 SQL 形式不包含此值,因为它使用绑定参数。随后调用上述 run_my_statement() 函数将使用 connection.execute() 调用的范围内缓存的编译结构,以提高性能。

注意

重要的是要注意,SQL 编译缓存正在缓存 **传递给数据库的 SQL 字符串**,**而不是查询返回的数据**。它绝不是数据缓存,不会影响特定 SQL 语句返回的结果,也不表示与提取结果行相关的任何内存使用。

虽然 SQLAlchemy 从 1.x 系列早期就开始使用基本的语句缓存,并且还为 ORM 提供了“Baked Query”扩展,但这两个系统都需要高度特殊的 API 使用才能使缓存生效。从 1.4 开始的新缓存是完全自动的,不需要更改编程风格即可生效。

缓存无需任何配置更改即可自动使用,无需执行任何特殊步骤来启用它。以下部分详细介绍了缓存的配置和高级用法模式。

配置

缓存本身是一个类似字典的对象,称为 LRUCache,它是一个内部的 SQLAlchemy 字典子类,用于跟踪特定键的使用情况,并具有周期性的“修剪”步骤,当缓存的大小达到某个阈值时,会删除最近最少使用的项。此缓存的大小默认为 500,可以使用 create_engine.query_cache_size 参数进行配置。

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test", query_cache_size=1200
)

缓存的大小可以增长到给定大小的 150%,然后将其修剪回目标大小。因此,大小为 1200 的缓存可以增长到 1800 个元素,然后将其修剪回 1200 个元素。

缓存的大小基于每个引擎对每个唯一的 SQL 语句生成的单个条目。从 Core 和 ORM 生成的 SQL 语句被同等对待。DDL 语句通常不会被缓存。为了确定缓存的行为,引擎日志将包含有关缓存行为的详细信息,下一节将对此进行描述。

使用日志估算缓存性能

上面提到的 1200 的缓存大小实际上相当大。对于小型应用程序,大小为 100 可能就足够了。为了估算缓存的最佳大小,假设目标主机上有足够的内存,则缓存的大小应基于目标引擎可能渲染的唯一 SQL 字符串的数量。最有效的方法是使用 SQL 回显,这可以通过使用 create_engine.echo 标志直接启用,或者使用 Python 日志记录;有关日志记录配置的背景信息,请参见 配置日志记录 一节。

例如,我们将检查以下程序产生的日志记录。

from sqlalchemy import Column
from sqlalchemy import create_engine
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session

Base = declarative_base()


class A(Base):
    __tablename__ = "a"

    id = Column(Integer, primary_key=True)
    data = Column(String)
    bs = relationship("B")


class B(Base):
    __tablename__ = "b"
    id = Column(Integer, primary_key=True)
    a_id = Column(ForeignKey("a.id"))
    data = Column(String)


e = create_engine("sqlite://", echo=True)
Base.metadata.create_all(e)

s = Session(e)

s.add_all([A(bs=[B(), B(), B()]), A(bs=[B(), B(), B()]), A(bs=[B(), B(), B()])])
s.commit()

for a_rec in s.scalars(select(A)):
    print(a_rec.bs)

运行时,每个记录的 SQL 语句将在传递的参数左侧包含一个方括号内的缓存统计信息徽章。我们可能看到的四种类型的消息总结如下。

  • [raw sql] - 驱动程序或最终用户使用 Connection.exec_driver_sql() 发出的原始 SQL - 缓存不适用。

  • [no key] - 语句对象是不会被缓存的 DDL 语句,或者语句对象包含不可缓存的元素,例如用户定义的构造或任意大的 VALUES 子句。

  • [generated in Xs] - 该语句是缓存未命中,必须进行编译,然后存储在缓存中。生成编译后的构造需要 X 秒。数字 X 将以小数秒表示。

  • [cached since Xs ago] - 该语句是缓存命中,不需要重新编译。该语句已在 X 秒前存储在缓存中。数字 X 将与应用程序运行的时间和语句在缓存中的时间成正比,例如,对于 24 小时,它将是 86400。

每个徽章将在下面进行详细描述。

我们为上面程序看到的第一个语句将是 SQLite 方言检查“a”和“b”表是否存在。

INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("a")
INFO sqlalchemy.engine.Engine [raw sql] ()
INFO sqlalchemy.engine.Engine PRAGMA main.table_info("b")
INFO sqlalchemy.engine.Engine [raw sql] ()

对于上面两个 SQLite PRAGMA 语句,徽章显示为 [raw sql],这表示驱动程序使用 Connection.exec_driver_sql() 将 Python 字符串直接发送到数据库。缓存不适用于此类语句,因为它们已经以字符串形式存在,并且由于 SQLAlchemy 不会提前解析 SQL 字符串,因此无法知道将返回哪种结果行。

接下来我们看到的语句是 CREATE TABLE 语句。

INFO sqlalchemy.engine.Engine
CREATE TABLE a (
  id INTEGER NOT NULL,
  data VARCHAR,
  PRIMARY KEY (id)
)

INFO sqlalchemy.engine.Engine [no key 0.00007s] ()
INFO sqlalchemy.engine.Engine
CREATE TABLE b (
  id INTEGER NOT NULL,
  a_id INTEGER,
  data VARCHAR,
  PRIMARY KEY (id),
  FOREIGN KEY(a_id) REFERENCES a (id)
)

INFO sqlalchemy.engine.Engine [no key 0.00006s] ()

对于这些语句中的每一个,徽章显示为 [no key 0.00006s]。这表示这两个特定语句没有进行缓存,因为面向 DDL 的 CreateTable 构造没有生成缓存键。DDL 构造通常不参与缓存,因为它们通常不会重复第二次,并且 DDL 也是数据库配置步骤,其中性能并不那么重要。

[no key] 徽章对于另一个原因很重要,因为它可以用于可以缓存的 SQL 语句,但存在一些当前不可缓存的特定子构造。这方面的例子包括没有定义缓存参数的自定义用户定义 SQL 元素,以及生成任意长且不可重现的 SQL 字符串的一些构造,主要示例是 Values 构造,以及使用“多值插入”与 Insert.values() 方法一起使用时。

到目前为止,我们的缓存仍然为空。但是,接下来的语句将被缓存,一个片段如下所示。

INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [generated in 0.00011s] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [cached since 0.0003533s ago] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO a (data) VALUES (?)
INFO sqlalchemy.engine.Engine [cached since 0.0005326s ago] (None,)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [generated in 0.00010s] (1, None)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [cached since 0.0003232s ago] (1, None)
INFO sqlalchemy.engine.Engine INSERT INTO b (a_id, data) VALUES (?, ?)
INFO sqlalchemy.engine.Engine [cached since 0.0004887s ago] (1, None)

上面,我们看到实际上有两个独特的 SQL 字符串;"INSERT INTO a (data) VALUES (?)""INSERT INTO b (a_id, data) VALUES (?, ?)"。由于 SQLAlchemy 对所有文字值使用绑定参数,即使这些语句对于不同的对象重复很多次,因为参数是分开的,实际的 SQL 字符串保持不变。

注意

上面的两个语句是由 ORM 工作单元进程生成的,实际上会将这些语句缓存在每个映射器本地的一个单独缓存中。但是,机制和术语相同。下一节 禁用或使用备用字典来缓存某些(或所有)语句 将描述用户界面代码如何也能够在每个语句的基础上使用备用缓存容器。

我们为这两个语句的第一次出现看到的缓存徽章是 [generated in 0.00011s]。这表示该语句不在缓存中,已在 .00011 秒内编译成字符串,然后被缓存。当我们看到 [generated] 徽章时,我们知道这意味着存在缓存未命中。这对于特定语句的第一次出现是预期的。但是,如果对于长时间运行的应用程序(通常反复使用同一系列 SQL 语句)观察到许多新的 [generated] 徽章,这可能意味着 create_engine.query_cache_size 参数过小。当由于 LRU 缓存修剪使用较少的项而从缓存中逐出已缓存的语句时,它将在下次使用该语句时显示 [generated] 徽章。

然后我们为这两个语句的后续出现看到的缓存徽章看起来像 [cached since 0.0003533s ago]。这表示该语句已在缓存中找到,并且最初是在 .0003533 秒前放入缓存的。重要的是要注意,虽然 [generated][cached since] 徽章是指秒数,但它们的意思不同;在 [generated] 的情况下,该数字是编译语句所需时间的粗略计时,并且将是一个极小的数字。在 [cached since] 的情况下,这是语句在缓存中存在的时间。对于运行了 6 小时的应用程序,该数字可能显示为 [cached since 21600 seconds ago],这是一个好现象。看到“缓存自”的较高数字表明,这些语句很长时间没有出现缓存未命中。如果应用程序已经运行了很长时间,但语句的“缓存自”数字仍然很低,这可能表明这些语句经常出现缓存未命中,可能需要增加 create_engine.query_cache_size 的值。

我们的示例程序然后执行一些 SELECT 操作,我们可以在其中看到“生成”然后“缓存”的相同模式,用于“a”表的 SELECT 以及随后“b”表的延迟加载。

INFO sqlalchemy.engine.Engine SELECT a.id AS a_id, a.data AS a_data
FROM a
INFO sqlalchemy.engine.Engine [generated in 0.00009s] ()
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id
INFO sqlalchemy.engine.Engine [generated in 0.00010s] (1,)
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id
INFO sqlalchemy.engine.Engine [cached since 0.0005922s ago] (2,)
INFO sqlalchemy.engine.Engine SELECT b.id AS b_id, b.a_id AS b_a_id, b.data AS b_data
FROM b
WHERE ? = b.a_id

从上面的程序中,完整运行显示总共缓存了四个不同的 SQL 字符串。这表明缓存大小为 **4** 就足够了。这显然是一个非常小的尺寸,默认大小为 500,可以保留默认值。

缓存占用多少内存?

上一节详细介绍了一些检查 create_engine.query_cache_size 是否需要更大的技术。我们如何知道缓存是否太大?我们可能想要将 create_engine.query_cache_size 设置为不高于某个数字的原因是,我们的应用程序可能会使用非常大量的不同语句,例如从搜索 UX 动态构建查询的应用程序,并且我们不希望主机在内存不足的情况下运行,例如,过去 24 小时内运行了十万个不同的查询,并且它们都被缓存了。

测量 Python 数据结构占用多少内存非常困难,但是使用一个进程通过 top 来测量内存增长,随着 250 个新语句被添加到缓存中的连续序列,表明一个中等大小的 Core 语句大约占用 12K,而一个小的 ORM 语句大约占用 20K,包括结果获取结构,对于 ORM 而言,结果获取结构将更大。

禁用或使用备用字典缓存部分(或全部)语句

使用的内部缓存称为 LRUCache,但这基本上只是一个字典。通过使用 Connection.execution_options.compiled_cache 选项作为执行选项,任何字典都可以用作任何一系列语句的缓存。执行选项可以在语句、EngineConnection 上设置,以及在使用 ORM Session.execute() 方法进行 SQLAlchemy-2.0 风格的调用时设置。例如,要运行一系列 SQL 语句并将它们缓存到特定字典中

my_cache = {}
with engine.connect().execution_options(compiled_cache=my_cache) as conn:
    conn.execute(table.select())

SQLAlchemy ORM 使用上述技术在工作单元“flush”过程中保留每个映射器的缓存,这些缓存与在 Engine 上配置的默认缓存分开,以及用于一些关系加载器查询。

还可以通过发送 None 值来使用此参数禁用缓存。

# disable caching for this connection
with engine.connect().execution_options(compiled_cache=None) as conn:
    conn.execute(table.select())

第三方方言的缓存

缓存功能要求方言的编译器生成安全的 SQL 字符串,这些字符串可以在许多语句调用中重复使用,前提是给定一个与该 SQL 字符串匹配的特定缓存键。这意味着语句中的任何文字值,例如 SELECT 的 LIMIT/OFFSET 值,不能在方言的编译方案中硬编码,因为编译后的字符串将无法重复使用。SQLAlchemy 支持使用 BindParameter.render_literal_execute() 方法呈现绑定的参数,该方法可以应用于现有的 Select._limit_clauseSelect._offset_clause 属性,方法由自定义编译器应用,这些属性在本节后面进行了说明。

由于存在许多第三方方言,其中许多方言可能会从 SQL 语句中生成文字值,而没有使用较新的“文字执行”功能,因此 SQLAlchemy 从 1.4.5 版本开始在方言中添加了一个名为 Dialect.supports_statement_cache 的属性。在运行时,直接对特定方言类的存在进行检查,即使它已存在于超类中,因此即使是子类化现有可缓存 SQLAlchemy 方言(例如 sqlalchemy.dialects.postgresql.PGDialect)的第三方方言也必须显式包含此属性才能启用缓存。只有在对方言进行必要的更改并测试了具有不同参数的编译 SQL 语句的可重用性后,才能启用此属性。

对于不支持此属性的所有第三方方言,该方言的日志记录将显示 dialect does not support caching

当方言已经针对缓存进行了测试,特别是 SQL 编译器已经更新为不在 SQL 字符串中直接呈现任何文字 LIMIT / OFFSET 时,方言作者可以按如下方式应用该属性

from sqlalchemy.engine.default import DefaultDialect


class MyDialect(DefaultDialect):
    supports_statement_cache = True

该标志也需要应用于方言的所有子类。

class MyDBAPIForMyDialect(MyDialect):
    supports_statement_cache = True

版本 1.4.5 中的新增内容: 添加了 Dialect.supports_statement_cache 属性。

方言修改的典型情况如下。

示例:使用编译后参数呈现 LIMIT / OFFSET

例如,假设方言覆盖了 SQLCompiler.limit_clause() 方法,该方法会生成 SQL 语句的“LIMIT / OFFSET”子句,如下所示

# pre 1.4 style code
def limit_clause(self, select, **kw):
    text = ""
    if select._limit is not None:
        text += " \n LIMIT %d" % (select._limit,)
    if select._offset is not None:
        text += " \n OFFSET %d" % (select._offset,)
    return text

上面的例程将 Select._limitSelect._offset 整数值呈现为嵌入在 SQL 语句中的文字整数。这是不支持在 SELECT 语句的 LIMIT/OFFSET 子句中使用绑定参数的数据库的常见要求。但是,在初始编译阶段呈现整数会在很大程度上与缓存不兼容,因为 Select 对象的 limit 和 offset 整数值不是缓存键的一部分,因此许多具有不同 limit/offset 值的 Select 语句将无法以正确的值呈现。

上述代码的修正方法是将文字整数移入 SQLAlchemy 的 编译后 功能,该功能将在初始编译阶段之外呈现文字整数,而是在语句发送到 DBAPI 之前在执行时呈现。这是在编译阶段使用 BindParameter.render_literal_execute() 方法访问的,并结合使用 Select._limit_clauseSelect._offset_clause 属性,这些属性表示 LIMIT/OFFSET 作为完整的 SQL 表达式,如下所示

# 1.4 cache-compatible code
def limit_clause(self, select, **kw):
    text = ""

    limit_clause = select._limit_clause
    offset_clause = select._offset_clause

    if select._simple_int_clause(limit_clause):
        text += " \n LIMIT %s" % (
            self.process(limit_clause.render_literal_execute(), **kw)
        )
    elif limit_clause is not None:
        # assuming the DB doesn't support SQL expressions for LIMIT.
        # Otherwise render here normally
        raise exc.CompileError(
            "dialect 'mydialect' can only render simple integers for LIMIT"
        )
    if select._simple_int_clause(offset_clause):
        text += " \n OFFSET %s" % (
            self.process(offset_clause.render_literal_execute(), **kw)
        )
    elif offset_clause is not None:
        # assuming the DB doesn't support SQL expressions for OFFSET.
        # Otherwise render here normally
        raise exc.CompileError(
            "dialect 'mydialect' can only render simple integers for OFFSET"
        )

    return text

上述方法将生成一个编译后的 SELECT 语句,看起来像

SELECT x FROM y
LIMIT __[POSTCOMPILE_param_1]
OFFSET __[POSTCOMPILE_param_2]

在上面,__[POSTCOMPILE_param_1]__[POSTCOMPILE_param_2] 指示符将在语句执行时使用它们相应的整数值填充,在从缓存中检索到 SQL 字符串之后。

在对上述内容进行了必要的更改之后,应将 Dialect.supports_statement_cache 标志设置为 True。强烈建议第三方方言使用 方言第三方测试套件,该套件将断言具有 LIMIT/OFFSET 的 SELECT 操作已正确呈现和缓存。

使用 Lambda 为语句生成添加显著的提速

深度炼金术

此技术通常只有在性能非常密集的情况下才需要,并且适用于经验丰富的 Python 程序员。虽然相当简单,但它涉及到对于 Python 初学者来说不适合的元编程概念。Lambda 方法可以在稍后时间轻松地应用于现有代码。

Python 函数,通常用 lambda 表达式表示,可用于生成 SQL 表达式,这些表达式根据 lambda 函数本身的 Python 代码位置以及 lambda 中的闭包变量进行缓存。其原理是允许不仅缓存 SQL 表达式的 SQL 字符串编译形式(这是 SQLAlchemy 在不使用 lambda 系统时正常的行为),而且还缓存 Python 中的 SQL 表达式构建本身,这在一定程度上也有 Python 开销。

lambda SQL 表达式功能作为一项性能增强功能提供,并且还可选地在 with_loader_criteria() ORM 选项中使用,以提供通用的 SQL 片段。

概要

Lambda 语句使用 lambda_stmt() 函数构建,该函数返回 StatementLambdaElement 的实例,它本身是一个可执行的语句构建。使用 Python 加号运算符 +StatementLambdaElement.add_criteria() 方法(允许更多选项)将其他修饰符和条件添加到对象中。

假设 lambda_stmt() 构建是在一个包含函数或方法中调用的,该函数或方法预期在应用程序中多次使用,以便后续执行(除第一次执行外)可以利用已编译的 SQL 被缓存。当 lambda 在 Python 中的包含函数内构建时,它也会受闭包变量的影响,这些变量对整个方法都很重要。

from sqlalchemy import lambda_stmt


def run_my_statement(connection, parameter):
    stmt = lambda_stmt(lambda: select(table))
    stmt += lambda s: s.where(table.c.col == parameter)
    stmt += lambda s: s.order_by(table.c.id)

    return connection.execute(stmt)


with engine.connect() as conn:
    result = run_my_statement(some_connection, "some parameter")

在上面,用于定义 SELECT 语句结构的三个 lambda 可调用对象仅调用一次,并且生成的 SQL 字符串在引擎的编译缓存中缓存。从那时起,run_my_statement() 函数可以被调用任意次数,其中的 lambda 可调用对象不会被调用,只用作缓存键来检索已编译的 SQL。

注意

重要的是要注意,当不使用 lambda 系统时,已经存在 SQL 缓存。lambda 系统只是在每个由 SQL 语句调用的 SQL 语句上增加一层工作量减少,方法是缓存 SQL 构建本身,并使用更简单的缓存键。

lambda 的快速指南

最重要的是,lambda SQL 系统中的重点是确保 lambda 生成的缓存键与其生成的 SQL 字符串之间永远不会出现不匹配。 LambdaElement 及其相关对象将运行并分析给定的 lambda 以计算它应该在每次运行时如何缓存,并尝试检测任何潜在问题。基本准则包括

  • 支持任何类型的语句 - 虽然预计 select() 构建是 lambda_stmt() 的主要用例,但 DML 语句(如 insert()update())同样可用

    def upd(id_, newname):
        stmt = lambda_stmt(lambda: users.update())
        stmt += lambda s: s.values(name=newname)
        stmt += lambda s: s.where(users.c.id == id_)
        return stmt
    
    
    with engine.begin() as conn:
        conn.execute(upd(7, "foo"))
  • 直接支持 ORM 用例 - lambda_stmt() 可以完全适应 ORM 功能,并直接与 Session.execute() 一起使用

    def select_user(session, name):
        stmt = lambda_stmt(lambda: select(User))
        stmt += lambda s: s.where(User.name == name)
    
        row = session.execute(stmt).first()
        return row
  • 自动适应绑定参数 - 与 SQLAlchemy 之前的“烘焙查询”系统相比,lambda SQL 系统适应了 Python 字面量值,这些值会自动成为 SQL 绑定参数。这意味着即使给定的 lambda 仅运行一次,在每次运行时都会从 lambda 的闭包中提取成为绑定参数的值。

    >>> def my_stmt(x, y):
    ...     stmt = lambda_stmt(lambda: select(func.max(x, y)))
    ...     return stmt
    >>> engine = create_engine("sqlite://", echo=True)
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(5, 10)))
    ...     print(conn.scalar(my_stmt(12, 8)))
    
    SELECT max(?, ?) AS max_1 [generated in 0.00057s] (5, 10)
    10
    SELECT max(?, ?) AS max_1 [cached since 0.002059s ago] (12, 8)
    12

    在上面,StatementLambdaElement 从每次调用 my_stmt() 时生成的 lambda 的闭包中提取了 xy 的值;这些值被替换为缓存的 SQL 构建中作为参数的值。

  • lambda 应该在所有情况下都理想地生成相同的 SQL 结构 - 避免在 lambda 中使用条件或自定义可调用对象,这些对象可能使其根据输入生成不同的 SQL;如果函数可能根据条件使用两个不同的 SQL 片段,请使用两个单独的 lambda。

    # **Don't** do this:
    
    
    def my_stmt(parameter, thing=False):
        stmt = lambda_stmt(lambda: select(table))
        stmt += lambda s: (
            s.where(table.c.x > parameter) if thing else s.where(table.c.y == parameter)
        )
        return stmt
    
    
    # **Do** do this:
    
    
    def my_stmt(parameter, thing=False):
        stmt = lambda_stmt(lambda: select(table))
        if thing:
            stmt += lambda s: s.where(table.c.x > parameter)
        else:
            stmt += lambda s: s.where(table.c.y == parameter)
        return stmt

    如果 lambda 无法生成一致的 SQL 构建,可能会出现各种错误,其中一些错误目前无法轻易检测到。

  • 不要在 lambda 中使用函数来生成绑定值 - 绑定值跟踪方法要求 SQL 语句中使用的实际值在 lambda 的闭包中本地存在。如果值是从其他函数生成的,则无法做到这一点,并且 LambdaElement 通常会在尝试这样做时引发错误。

    >>> def my_stmt(x, y):
    ...     def get_x():
    ...         return x
    ...
    ...     def get_y():
    ...         return y
    ...
    ...     stmt = lambda_stmt(lambda: select(func.max(get_x(), get_y())))
    ...     return stmt
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(5, 10)))
    Traceback (most recent call last):
      # ...
    sqlalchemy.exc.InvalidRequestError: Can't invoke Python callable get_x()
    inside of lambda expression argument at
    <code object <lambda> at 0x7fed15f350e0, file "<stdin>", line 6>;
    lambda SQL constructs should not invoke functions from closure variables
    to produce literal values since the lambda SQL system normally extracts
    bound values without actually invoking the lambda or any functions within it.

    在上面,get_x()get_y() 的使用(如果需要)应该发生在lambda 之外并分配给一个本地闭包变量。

    >>> def my_stmt(x, y):
    ...     def get_x():
    ...         return x
    ...
    ...     def get_y():
    ...         return y
    ...
    ...     x_param, y_param = get_x(), get_y()
    ...     stmt = lambda_stmt(lambda: select(func.max(x_param, y_param)))
    ...     return stmt
  • 避免在 lambda 中引用非 SQL 构建,因为它们默认情况下不可缓存 - 此问题指的是 LambdaElement 如何从语句中的其他闭包变量创建缓存键。为了提供最准确的缓存键保证,lambda 闭包中的所有对象都被认为很重要,默认情况下不会假设任何对象适合作为缓存键。因此,以下示例也会引发一条非常详细的错误消息。

    >>> class Foo:
    ...     def __init__(self, x, y):
    ...         self.x = x
    ...         self.y = y
    >>> def my_stmt(foo):
    ...     stmt = lambda_stmt(lambda: select(func.max(foo.x, foo.y)))
    ...     return stmt
    >>> with engine.connect() as conn:
    ...     print(conn.scalar(my_stmt(Foo(5, 10))))
    Traceback (most recent call last):
      # ...
    sqlalchemy.exc.InvalidRequestError: Closure variable named 'foo' inside of
    lambda callable <code object <lambda> at 0x7fed15f35450, file
    "<stdin>", line 2> does not refer to a cacheable SQL element, and also
    does not appear to be serving as a SQL literal bound value based on the
    default SQL expression returned by the function.  This variable needs to
    remain outside the scope of a SQL-generating lambda so that a proper cache
    key may be generated from the lambda's state.  Evaluate this variable
    outside of the lambda, set track_on=[<elements>] to explicitly select
    closure elements to track, or set track_closure_variables=False to exclude
    closure variables from being part of the cache key.

    上面的错误表明 LambdaElement 不会假设传入的 Foo 对象在所有情况下都会保持一致的行为。它也不会假设它可以默认使用 Foo 作为缓存键的一部分;如果它使用 Foo 对象作为缓存键的一部分,如果存在许多不同的 Foo 对象,这将用重复信息填满缓存,还会对所有这些对象保持持久引用。

    解决上述情况的最佳方法是在 lambda 中不引用 foo,而是在外部引用它。

    >>> def my_stmt(foo):
    ...     x_param, y_param = foo.x, foo.y
    ...     stmt = lambda_stmt(lambda: select(func.max(x_param, y_param)))
    ...     return stmt

    在某些情况下,如果 lambda 的 SQL 结构保证不会根据输入而改变,则传递 track_closure_variables=False,这将禁用对除绑定参数之外的所有闭包变量的任何跟踪。

    >>> def my_stmt(foo):
    ...     stmt = lambda_stmt(
    ...         lambda: select(func.max(foo.x, foo.y)), track_closure_variables=False
    ...     )
    ...     return stmt

    还可以使用 track_on 参数将对象添加到元素中以明确地形成缓存键的一部分;使用此参数允许特定值作为缓存键,还会阻止其他闭包变量被考虑。这对于 SQL 构建部分源自某种上下文对象的用例很有用,而该上下文对象可能具有许多不同的值。在下面的示例中,SELECT 语句的第一段将禁用对 foo 变量的跟踪,而第二段将明确地跟踪 self 作为缓存键的一部分。

    >>> def my_stmt(self, foo):
    ...     stmt = lambda_stmt(
    ...         lambda: select(*self.column_expressions), track_closure_variables=False
    ...     )
    ...     stmt = stmt.add_criteria(lambda: self.where_criteria, track_on=[self])
    ...     return stmt

    使用 track_on 意味着给定的对象将在 lambda 的内部缓存中长期存储,并且只要缓存没有清除那些对象(默认情况下使用 1000 个条目的 LRU 方案)就会具有强引用。

缓存键生成

为了理解 lambda SQL 构建中出现的一些选项和行为,了解缓存系统将很有帮助。

SQLAlchemy 的缓存系统通常通过生成一个表示构建中所有状态的结构来为给定的 SQL 表达式构建生成缓存键。

>>> from sqlalchemy import select, column
>>> stmt = select(column("q"))
>>> cache_key = stmt._generate_cache_key()
>>> print(cache_key)  # somewhat paraphrased
CacheKey(key=(
  '0',
  <class 'sqlalchemy.sql.selectable.Select'>,
  '_raw_columns',
  (
    (
      '1',
      <class 'sqlalchemy.sql.elements.ColumnClause'>,
      'name',
      'q',
      'type',
      (
        <class 'sqlalchemy.sql.sqltypes.NullType'>,
      ),
    ),
  ),
  # a few more elements are here, and many more for a more
  # complicated SELECT statement
),)

上面的键存储在缓存中,本质上是一个字典,而值是一个构建,它除了其他东西之外还存储了 SQL 语句的字符串形式,在本例中是短语“SELECT q”。我们可以观察到,即使对于一个非常短的查询,缓存键也是相当冗长的,因为它必须表示可能导致渲染和潜在执行发生变化的所有内容。

相比之下,lambda 构建系统创建了一种不同类型的缓存键。

>>> from sqlalchemy import lambda_stmt
>>> stmt = lambda_stmt(lambda: select(column("q")))
>>> cache_key = stmt._generate_cache_key()
>>> print(cache_key)
CacheKey(key=(
  <code object <lambda> at 0x7fed1617c710, file "<stdin>", line 1>,
  <class 'sqlalchemy.sql.lambdas.StatementLambdaElement'>,
),)

上面,我们看到一个缓存键,它比非 lambda 语句的缓存键短很多,而且额外的 select(column("q")) 结构的生成本身甚至没有必要;Python lambda 本身包含一个名为 __code__ 的属性,它引用一个 Python 代码对象,该对象在应用程序运行时是不可变且永久的。

当 lambda 也包含闭包变量时,在这些变量通常引用 SQL 结构(如列对象)的情况下,它们会成为缓存键的一部分,或者如果它们引用将作为绑定参数的文字值,则它们会被放置在缓存键的单独元素中。

>>> def my_stmt(parameter):
...     col = column("q")
...     stmt = lambda_stmt(lambda: select(col))
...     stmt += lambda s: s.where(col == parameter)
...     return stmt

上面的 StatementLambdaElement 包含两个 lambda,它们都引用了 col 闭包变量,因此缓存键将表示这两个段以及 column() 对象。

>>> stmt = my_stmt(5)
>>> key = stmt._generate_cache_key()
>>> print(key)
CacheKey(key=(
  <code object <lambda> at 0x7f07323c50e0, file "<stdin>", line 3>,
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
  <code object <lambda> at 0x7f07323c5190, file "<stdin>", line 4>,
  <class 'sqlalchemy.sql.lambdas.LinkedLambdaElement'>,
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
  (
    '0',
    <class 'sqlalchemy.sql.elements.ColumnClause'>,
    'name',
    'q',
    'type',
    (
      <class 'sqlalchemy.sql.sqltypes.NullType'>,
    ),
  ),
),)

缓存键的第二部分检索了在调用语句时将使用的绑定参数。

>>> key.bindparams
[BindParameter('%(139668884281280 parameter)s', 5, type_=Integer())]

有关“lambda”缓存的性能比较示例系列,请参阅 性能 性能示例中的“short_selects”测试套件。

INSERT 语句的“插入多个值”行为

版本 2.0 中新增: 有关更改的背景信息,包括性能测试示例,请参阅 为除 MySQL 外的所有后端实现优化的 ORM 批量插入

提示

insertmanyvalues 功能是一个**透明可用的**性能功能,它不需要最终用户干预即可根据需要进行。本节描述了该功能的架构以及如何衡量其性能并调整其行为以优化批量 INSERT 语句的速度,特别是 ORM 使用的语句。

随着越来越多的数据库添加了对 INSERT..RETURNING 的支持,SQLAlchemy 在处理需要获取服务器生成的数值的 INSERT 语句方面经历了重大变化,最重要的是服务器生成的 主键值,这些值允许在后续操作中引用新行。特别是,这种情况长期以来一直是 ORM 中一个重大的性能问题,ORM 依赖于能够检索服务器生成的 主键值以正确填充 身份映射

随着最近对 SQLite 和 MariaDB 添加了对 RETURNING 的支持,SQLAlchemy 不再需要依赖于大多数后端提供的仅单行 cursor.lastrowid 属性;现在可以使用 RETURNING 来处理所有 SQLAlchemy 包含的 后端,除了 MySQL。剩余的性能限制是,cursor.executemany() DBAPI 方法不允许获取行,对于大多数后端,通过放弃使用 executemany() 以及改用 cursor.execute() 调用单个 INSERT 语句来重新构建单个 INSERT 语句,以在单个语句中容纳大量行来解决此问题。这种方法源于 psycopg2 快速执行帮助程序 psycopg2 DBAPI 功能,SQLAlchemy 在最近的发布系列中逐步增加了对该功能的支持。

当前支持

该功能适用于包含在 SQLAlchemy 中的所有支持 RETURNING 的后端,除了 Oracle,因为 cx_Oracle 和 OracleDB 驱动程序都提供了自己的等效功能。该功能通常发生在使用 Insert.returning() 方法时 Insert 结构,与 executemany 执行结合使用,这发生在将字典列表传递给 Connection.execute.parameters 参数 Connection.execute()Session.execute() 方法时(以及 asyncio 下的等效方法和简写方法,例如 Session.scalars())。它还在 ORM 工作单元 过程中发生,当使用 Session.add()Session.add_all() 等方法添加行时。

对于 SQLAlchemy 包含的方言,当前支持或等效支持如下

  • SQLite - 支持 SQLite 3.35 及更高版本

  • PostgreSQL - 所有支持的 Postgresql 版本(9 及更高版本)

  • SQL Server - 所有支持的 SQL Server 版本 [1]

  • MariaDB - 支持 MariaDB 10.5 及更高版本

  • MySQL - 不支持,没有 RETURNING 功能

  • Oracle - 使用本机 cx_Oracle / OracleDB API 支持使用 executemany 的 RETURNING,适用于所有支持的 Oracle 9 及更高版本,使用多行 OUT 参数。这与“executemanyvalues”的实现不同,但具有相同的用法模式和等效的性能优势。

版本 2.0.10 中的更改

禁用该功能

要为给定后端禁用 Engine 的整体“insertmanyvalues”功能,请将 create_engine.use_insertmanyvalues 参数传递给 create_engine() 作为 False

engine = create_engine(
    "mariadb+mariadbconnector://scott:tiger@host/db", use_insertmanyvalues=False
)

也可以通过将 Table.implicit_returning 参数传递给 False 来禁用该功能被隐式地用于特定 Table 对象。

t = Table(
    "t",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("x", Integer),
    implicit_returning=False,
)

可能想要为特定表禁用 RETURNING 的原因是解决后端特定的限制。

批处理模式操作

该功能有两种操作模式,这些模式在每个方言、每个 Table 基础上透明地选择。一种是**批处理模式**,它通过重写以下形式的 INSERT 语句来减少数据库往返次数

INSERT INTO a (data, x, y) VALUES (%(data)s, %(x)s, %(y)s) RETURNING a.id

转换为“批处理”形式,例如

INSERT INTO a (data, x, y) VALUES
    (%(data_0)s, %(x_0)s, %(y_0)s),
    (%(data_1)s, %(x_1)s, %(y_1)s),
    (%(data_2)s, %(x_2)s, %(y_2)s),
    ...
    (%(data_78)s, %(x_78)s, %(y_78)s)
RETURNING a.id

上面,语句针对输入数据的子集(“批次”)进行组织,该子集的大小由数据库后端以及每个批次的 参数数量决定,以对应于语句大小/ 参数数量的已知限制。然后,该功能对输入数据的每个批次执行一次 INSERT 语句,直到消耗所有记录,并将每个批次的 RETURNING 结果连接到一个单独的大型行集中,该行集可从单个 Result 对象中获得。

这种“批处理”形式允许使用更少的数据库往返次数插入大量行,并且已被证明可以为大多数支持它的后端带来显著的性能提升。

将 RETURNING 行与参数集关联

版本 2.0.10 中新增。

在上一节中说明的“批处理”模式查询不能保证返回的记录顺序与输入数据的顺序一致。当 SQLAlchemy ORM 工作单元 处理以及用于将返回的服务器生成的与输入数据相关联的应用程序使用时,Insert.returning()UpdateBase.return_defaults() 方法包含一个选项 Insert.returning.sort_by_parameter_order,它指示“insertmanyvalues”模式应保证此对应关系。这与数据库后端实际插入记录的顺序**无关**,在任何情况下都不假设此顺序;只是返回的记录应在接收时进行组织,以对应于传递原始输入数据的顺序。

Insert.returning.sort_by_parameter_order 参数存在时,对于使用服务器生成的整数主键值(如 IDENTITY、PostgreSQL SERIAL、MariaDB AUTO_INCREMENT 或 SQLite 的 ROWID 方案)的表,“批处理”模式可能会选择使用更复杂的 INSERT..RETURNING 形式,结合根据返回的值对行进行执行后排序,或者如果此形式不可用,“insertmanyvalues”功能可能会优雅地降级为“非批处理”模式,该模式为每个参数集运行单个 INSERT 语句。

例如,在 SQL Server 上,当使用自动递增的 IDENTITY 列作为主键时,将使用以下 SQL 形式

INSERT INTO a (data, x, y)
OUTPUT inserted.id, inserted.id AS id__1
SELECT p0, p1, p2 FROM (VALUES
    (?, ?, ?, 0), (?, ?, ?, 1), (?, ?, ?, 2),
    ...
    (?, ?, ?, 77)
) AS imp_sen(p0, p1, p2, sen_counter) ORDER BY sen_counter

当主键列使用 SERIAL 或 IDENTITY 时,PostgreSQL 也使用类似的形式。上述形式**不能**保证插入行的顺序。但是,它确实保证 IDENTITY 或 SERIAL 值将按每个参数集的顺序创建[2]。“insertmanyvalues”功能然后按递增的整数标识对上述 INSERT 语句返回的行进行排序。

对于 SQLite 数据库,没有合适的 INSERT 形式可以将新 ROWID 值的生成与参数集传递的顺序相关联。因此,当使用服务器生成的主键值时,SQLite 后端将在请求排序的 RETURNING 时降级为“非批处理”模式。对于 MariaDB,insertmanyvalues 使用的默认 INSERT 形式就足够了,因为当使用 InnoDB 时,此数据库后端将 AUTO_INCREMENT 的顺序与输入数据的顺序对齐[3]

对于客户端生成的键,例如当使用 Python uuid.uuid4() 函数为 Uuid 列生成新值时,“insertmanyvalues”功能会透明地在 RETURNING 记录中包含此列并将它的值与给定输入记录的值相关联,从而在输入记录和结果行之间保持对应关系。由此可以得出,所有后端都允许使用客户端生成的键进行批处理、参数相关的 RETURNING 顺序。

“insertmanyvalues”的“批处理”模式如何确定用作输入参数和 RETURNING 行之间的对应点的一列或多列,称为 插入哨兵,它是用于跟踪此类值的特定列或多列。“插入哨兵”通常是自动选择的,但也可以在极少数情况下进行用户配置;配置哨兵列 部分对此进行了描述。

对于不能提供合适的 INSERT 形式来提供与输入值确定性对齐的服务器生成的值的数据库后端,或者对于 Table 配置,这些配置具有其他类型的服务器生成的键值,“insertmanyvalues”模式将在请求保证 RETURNING 顺序时使用**非批处理**模式。

另见

非批处理模式操作

对于 Table 配置,这些配置没有客户端生成的键,并且提供服务器生成的键(或没有键),该数据库无法以确定性或可排序的方式相对于多个参数集来调用,当“insertmanyvalues”功能在满足 Insert.returning.sort_by_parameter_orderInsert 语句的要求时,可能会选择使用**非批处理模式**。

在此模式下,将保留 INSERT 的原始 SQL 形式,而“insertmanyvalues”功能将改为为每个参数集单独运行给定的语句,并将返回的行组织到一个完整的 结果集。与以前版本的 SQLAlchemy 不同,它是在一个紧密的循环中执行的,从而最大限度地减少了 Python 的开销。在某些情况下,例如在 SQLite 上,“非批处理”模式的性能与“批处理”模式完全相同。

语句执行模型

对于“批处理”和“非批处理”模式,该功能都必须使用 DBAPI cursor.execute() 方法在单个对 Core 级别 Connection.execute() 方法的调用范围内调用多个 INSERT 语句,每个语句最多包含一个固定数量的参数集。此限制是可配置的,如下文 控制批处理大小 中所述。对 cursor.execute() 的单独调用将被单独记录,并单独传递给事件侦听器(如 ConnectionEvents.before_cursor_execute()(请参阅下面的 日志记录和事件)。

配置哨兵列

在典型情况下,为了使用 INSERT..RETURNING 提供确定性行顺序,“insertmanyvalues”功能会自动从给定表的键中确定一个哨兵列,如果无法识别一个哨兵列,它将优雅地降级为“一次一行”模式。作为一个完全**可选**的功能,为了获得对具有服务器生成的键的表的全面的“insertmanyvalues”批量性能,这些表的默认生成器函数与“哨兵”用例不兼容,其他非键列可以被标记为“哨兵”列,前提是它们满足某些要求。一个典型的例子是非键 Uuid 列,它具有客户端的默认值,如 Python uuid.uuid4() 函数。还有一种结构可以创建具有面向“insertmanyvalues”用例的客户端整数计数器的简单整数列。

可以通过将 Column.insert_sentinel 添加到符合条件的列来指示哨兵列。最基本的“符合条件”列是非空、唯一的列,它具有客户端的默认值,例如 UUID 列,如下所示

import uuid

from sqlalchemy import Column
from sqlalchemy import FetchedValue
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid

my_table = Table(
    "some_table",
    metadata,
    # assume some arbitrary server-side function generates
    # primary key values, so cannot be tracked by a bulk insert
    Column("id", String(50), server_default=FetchedValue(), primary_key=True),
    Column("data", String(50)),
    Column(
        "uniqueid",
        Uuid(),
        default=uuid.uuid4,
        nullable=False,
        unique=True,
        insert_sentinel=True,
    ),
)

当使用 ORM Declarative 模型时,可以使用 mapped_column 结构来使用相同的形式

import uuid

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column


class Base(DeclarativeBase):
    pass


class MyClass(Base):
    __tablename__ = "my_table"

    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))
    uniqueid: Mapped[uuid.UUID] = mapped_column(
        default=uuid.uuid4, unique=True, insert_sentinel=True
    )

虽然默认生成器生成的**必须**是唯一的,但上述“哨兵”列上的实际 UNIQUE 约束(由 unique=True 参数指示)本身是可选的,如果不需要,可以省略。

还有一种特殊的“插入哨兵”形式,它是一个专用的可空整数列,使用特殊的默认整数计数器,该计数器仅在“insertmanyvalues”操作期间使用;作为附加行为,该列将从 SQL 语句和结果集中省略自身,并以一种几乎透明的方式运行。但是,它需要在实际的数据库表中物理存在。这种样式的 Column 可以使用函数 insert_sentinel() 来构建。

from sqlalchemy import Column
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Table
from sqlalchemy import Uuid
from sqlalchemy import insert_sentinel

Table(
    "some_table",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("data", String(50)),
    insert_sentinel("sentinel"),
)

使用 ORM Declarative 时,可以使用一个称为 orm_insert_sentinel()insert_sentinel() 的 Declarative 友好版本,它可以用于 Base 类或 mixin;如果使用 declared_attr() 进行打包,该列将应用于所有与表绑定的子类,包括联接继承层次结构中的子类。

from sqlalchemy.orm import declared_attr
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import orm_insert_sentinel


class Base(DeclarativeBase):
    @declared_attr
    def _sentinel(cls) -> Mapped[int]:
        return orm_insert_sentinel()


class MyClass(Base):
    __tablename__ = "my_table"

    id: Mapped[str] = mapped_column(primary_key=True, server_default=FetchedValue())
    data: Mapped[str] = mapped_column(String(50))


class MySubClass(MyClass):
    __tablename__ = "sub_table"

    id: Mapped[str] = mapped_column(ForeignKey("my_table.id"), primary_key=True)


class MySingleInhClass(MyClass):
    pass

在上面的示例中,“my_table” 和“sub_table” 将具有一个名为“_sentinel”的额外整数列,该列可供“insertmanyvalues”功能使用,以帮助优化 ORM 使用的批量插入操作。

控制批次大小

“insertmanyvalues” 的一个关键特征是 INSERT 语句的大小受到固定最大“values”子句数量以及单个 INSERT 语句中一次可以表示的方言特定固定总绑定参数数量的限制。当给定的参数字典数量超过固定限制,或者单个 INSERT 语句中要呈现的绑定参数总数超过固定限制时(这两个固定限制是分开的),将在单个 Connection.execute() 调用范围内调用多个 INSERT 语句,每个语句都包含参数字典的一部分,称为“批次”。然后,每个“批次”中表示的参数字典数量称为“批次大小”。例如,批次大小为 500 表示每个发出的 INSERT 语句最多会插入 500 行。

能够调整批次大小可能很重要,因为对于值集本身相对较小的 INSERT 操作,较大的批次大小可能性能更高,而对于使用非常大的值集的 INSERT 操作,较小的批次大小可能更合适,其中呈现的 SQL 大小以及在单个语句中传递的总数据大小都可以从基于后端行为和内存限制的特定大小限制中受益。为此,可以在每个 Engine 以及每个语句的基础上配置批次大小。另一方面,参数限制是根据所用数据库的已知特征固定的。

大多数后端默认批次大小为 1000,还有一个每个方言的“最大参数数量”限制因素,可以在每个语句的基础上进一步减少批次大小。最大参数数量因方言和服务器版本而异;最大值为 32700(选择了一个比 PostgreSQL 的 32767 限制和 SQLite 的现代 32766 限制更远的健康距离,同时为语句中的其他参数以及 DBAPI 异常提供了空间)。较旧版本的 SQLite(早于 3.32.0)会将此值设置为 999。MariaDB 没有既定的限制,但 32700 仍然是 SQL 消息大小的限制因素。

可以通过 Engine 范围内的 create_engine.insertmanyvalues_page_size 参数来影响“批次大小”的值。例如,要影响 INSERT 语句以在每个语句中包含最多 100 个参数集

e = create_engine("sqlite://", insertmanyvalues_page_size=100)

还可以使用 Connection.execution_options.insertmanyvalues_page_size 执行选项在每个语句的基础上影响批次大小,例如每个执行

with e.begin() as conn:
    result = conn.execute(
        table.insert().returning(table.c.id),
        parameterlist,
        execution_options={"insertmanyvalues_page_size": 100},
    )

或在语句本身配置

stmt = (
    table.insert()
    .returning(table.c.id)
    .execution_options(insertmanyvalues_page_size=100)
)
with e.begin() as conn:
    result = conn.execute(stmt, parameterlist)

日志记录和事件

“insertmanyvalues” 功能与 SQLAlchemy 的 语句日志记录 以及游标事件(例如 ConnectionEvents.before_cursor_execute())完全集成。当参数列表被分解为单独的批次时,每个 INSERT 语句都会单独记录并传递给事件处理程序。与 SQLAlchemy 1.x 系列中 psycopg2 专属功能的运作方式相比,这是一个重大变化,在该功能中,多个 INSERT 语句的生成隐藏在日志记录和事件之外。日志记录显示将截断较长的参数列表以提高可读性,并将指示每个语句的具体批次。以下示例说明了此日志记录的摘录

INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[generated in 0.00177s (insertmanyvalues) 1/10 (unordered)] ('d0', 0, 0, 'd1',  ...
INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 2/10 (unordered)] ('d100', 100, 1000, 'd101', ...

...

INSERT INTO a (data, x, y) VALUES (?, ?, ?), ... 795 characters truncated ...  (?, ?, ?), (?, ?, ?) RETURNING id
[insertmanyvalues 10/10 (unordered)] ('d900', 900, 9000, 'd901', ...

非批次模式 发生时,日志记录将指示此模式以及 insertmanyvalues 消息

...

INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 67/78 (ordered; batch not supported)] ('d66', 66, 66)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 68/78 (ordered; batch not supported)] ('d67', 67, 67)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 69/78 (ordered; batch not supported)] ('d68', 68, 68)
INSERT INTO a (data, x, y) VALUES (?, ?, ?) RETURNING id
[insertmanyvalues 70/78 (ordered; batch not supported)] ('d69', 69, 69)

...

Upsert 支持

PostgreSQL、SQLite 和 MariaDB 方言提供后端特定的“upsert”结构 insert()insert()insert(),它们都是 Insert 结构,它们具有额外的例如 on_conflict_do_update()` or ``on_duplicate_key() 的方法。当这些结构与 RETURNING 一起使用时,它们也支持“insertmanyvalues”行为,允许有效地进行具有 RETURNING 的 upsert。

引擎处置

The Engine 指的是连接池,这意味着在正常情况下,只要 Engine 对象驻留在内存中,就会存在打开的数据库连接。当 Engine 被垃圾回收时,其连接池将不再被该 Engine 引用,并且假设其没有连接被签出,池及其连接也将被垃圾回收,这将关闭实际的数据库连接。但在其他情况下,Engine 将保留打开的数据库连接,假设它使用通常默认的池实现 QueuePool

The Engine 通常旨在作为预先建立的永久固定装置,并在应用程序的生命周期中一直维护。它不是旨在按每个连接创建和处置;相反,它是一个注册表,维护着连接池以及所用数据库和 DBAPI 的配置信息,以及一定程度的针对数据库的资源的内部缓存。

但是,在许多情况下,希望由 Engine 引用的所有连接资源都完全关闭。通常不建议依赖 Python 垃圾回收来处理这种情况;相反,可以使用 Engine.dispose() 方法显式处置 Engine。这会处置引擎的底层连接池,并将其替换为一个新的空池。假设在此处放弃 Engine 并且不再使用,它引用的所有已签入连接也将完全关闭。

调用 Engine.dispose() 的有效用例包括

  • 当程序想要释放连接池持有的任何剩余已签入连接,并且预计在任何将来的操作中都不会再连接到该数据库时。

  • 当程序使用多处理或 fork() 并且 Engine 对象被复制到子进程时,应调用 Engine.dispose(),以便引擎创建特定于该 fork 的全新数据库连接。数据库连接通常不会跨越进程边界。在这种情况下,将 Engine.dispose.close 参数设置为 False。有关此用例的更多背景信息,请参阅部分 使用连接池进行多处理或 os.fork()

  • 在测试套件或多租户场景中,可能创建和处置许多临时短命的 Engine 对象。

当引擎被释放或垃圾回收时,签出的连接不会被丢弃,因为这些连接仍然被应用程序的其他地方强引用。但是,在调用Engine.dispose()之后,这些连接不再与该Engine关联;当它们关闭时,它们将被返回到其现在已成为孤儿连接池,一旦所有引用它的连接不再被任何地方引用,该连接池最终将被垃圾回收。由于此过程不易控制,强烈建议仅在所有签出连接被签入或以其他方式与池分离后才调用Engine.dispose()

对于受Engine对象使用连接池影响的应用程序,一个替代方案是完全禁用池。这通常只会对使用新连接造成适度的性能影响,这意味着当连接签入时,它将完全关闭并且不会保留在内存中。有关如何禁用池的指南,请参见切换池实现

使用驱动程序 SQL 和原始 DBAPI 连接

有关使用Connection.execute()的介绍使用了text()构造来说明如何调用文本 SQL 语句。在使用 SQLAlchemy 时,文本 SQL 实际上更多的是例外而不是规范,因为核心表达式语言和 ORM 都抽象了 SQL 的文本表示。但是,text()构造本身也对文本 SQL 提供了一些抽象,因为它规范了绑定参数的传递方式,以及它支持参数和结果集行的类型行为。

直接向驱动程序调用 SQL 字符串

对于希望直接调用传递给底层驱动程序(称为DBAPI)的文本 SQL,而不使用text()构造的用例,可以使用Connection.exec_driver_sql()方法。

with engine.connect() as conn:
    conn.exec_driver_sql("SET param='bar'")

版本 1.4 中的新功能:添加了 Connection.exec_driver_sql() 方法。

直接使用 DBAPI 游标

在某些情况下,SQLAlchemy 无法提供一种通用的方式来访问某些DBAPI函数,例如调用存储过程以及处理多个结果集。在这些情况下,直接处理原始 DBAPI 连接同样有效。

访问原始 DBAPI 连接最常见的方法是从已存在的Connection对象直接获取它。它使用Connection.connection属性存在。

connection = engine.connect()
dbapi_conn = connection.connection

此处的 DBAPI 连接实际上是相对于原始连接池而言的“代理”,但是这是一个实现细节,在大多数情况下可以忽略。由于此 DBAPI 连接仍然包含在拥有Connection对象的范围内,因此最好使用Connection对象来执行大多数功能,例如事务控制以及调用Connection.close()方法;如果直接在 DBAPI 连接上执行这些操作,则拥有Connection将不会知道这些状态变化。

为了克服由拥有Connection维护的 DBAPI 连接施加的限制,也可以在无需先获取Connection的情况下使用Engine.raw_connection()方法获得 DBAPI 连接Engine

dbapi_conn = engine.raw_connection()

此 DBAPI 连接再次是与以前一样形式的“代理”。这种代理的目的现在很明显,因为当我们调用此连接的.close()方法时,DBAPI 连接通常不会真正关闭,而是释放回引擎的连接池。

dbapi_conn.close()

虽然 SQLAlchemy 可能会在将来为更多 DBAPI 用例添加内置模式,但这些用例的收益正在减少,因为这些用例往往很少需要,并且它们也高度依赖于所使用的 DBAPI 类型,因此无论如何,直接 DBAPI 调用模式始终存在于需要这些用例的情况下。

另见

使用引擎时如何获得原始 DBAPI 连接? - 包括有关如何访问 DBAPI 连接以及使用 asyncio 驱动程序时的“驱动程序”连接的更多详细信息。

以下是 DBAPI 连接使用的一些示例。

调用存储过程和用户定义函数

SQLAlchemy 支持通过多种方式调用存储过程和用户定义函数。请注意,所有 DBAPI 都有不同的做法,因此您必须参考底层 DBAPI 的文档以了解与您的特定用法相关的详细信息。以下示例是假设性的,可能不适用于您的底层 DBAPI。

对于具有特殊语法或参数问题的存储过程或函数,可以使用 DBAPI 级别的callproc与您的 DBAPI 一起使用。此模式的一个示例是

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.callproc("my_procedure", ["x", "y", "z"])
    results = list(cursor_obj.fetchall())
    cursor_obj.close()
    connection.commit()
finally:
    connection.close()

注意

并非所有 DBAPI 都使用callproc,并且总体使用细节会有所不同。以上示例仅说明使用特定 DBAPI 函数的可能方式。

您的 DBAPI 可能没有callproc要求可能要求以另一种模式调用存储过程或用户定义函数,例如正常 SQLAlchemy 连接使用。此使用模式的一个示例是在撰写本文档时,在 PostgreSQL 数据库中使用 psycopg2 DBAPI 执行存储过程,这应该使用正常的连接使用进行调用

connection.execute("CALL my_procedure();")

以上示例是假设性的。底层数据库不保证在这些情况下支持“CALL”或“SELECT”,并且关键字可能会因函数是存储过程还是用户定义函数而异。在这些情况下,您应该参考底层 DBAPI 和数据库文档以确定要使用的正确语法和模式。

多个结果集

多个结果集支持可通过使用nextset方法从原始 DBAPI 游标获得。

connection = engine.raw_connection()
try:
    cursor_obj = connection.cursor()
    cursor_obj.execute("select * from table1; select * from table2")
    results_one = cursor_obj.fetchall()
    cursor_obj.nextset()
    results_two = cursor_obj.fetchall()
    cursor_obj.close()
finally:
    connection.close()

注册新的方言

create_engine()函数调用使用 setuptools 入口点查找给定的方言。这些入口点可以在第三方方言的 setup.py 脚本中建立。例如,要创建新的方言“foodialect://”,步骤如下

  1. 创建一个名为foodialect的包。

  2. 该包应该有一个包含方言类的模块,该类通常是sqlalchemy.engine.default.DefaultDialect的子类。在这个例子中,假设它被称为FooDialect,它的模块可以通过foodialect.dialect访问。

  3. 可以如下在setup.cfg中建立入口点

    [options.entry_points]
    sqlalchemy.dialects =
        foodialect = foodialect.dialect:FooDialect

如果方言是在现有 SQLAlchemy 支持的数据库之上为特定 DBAPI 提供支持,则可以给出包含数据库限定符的名称。例如,如果FooDialect实际上是 MySQL 方言,则可以像这样建立入口点

[options.entry_points]
sqlalchemy.dialects
    mysql.foodialect = foodialect.dialect:FooDialect

上面的入口点将通过create_engine("mysql+foodialect://")访问。

在进程内注册方言

SQLAlchemy 还允许在当前进程内注册方言,从而绕过单独安装的需要。使用register()函数如下

from sqlalchemy.dialects import registry


registry.register("mysql.foodialect", "myapp.dialect", "MyMySQLDialect")

以上将响应create_engine("mysql+foodialect://"),并从myapp.dialect模块加载MyMySQLDialect类。

连接 / 引擎 API

对象名称 描述

连接

为包装的 DB-API 连接提供高级功能。

CreateEnginePlugin

一组挂钩,旨在根据 URL 中的入口点名称来增强 Engine 对象的构建。

引擎

PoolDialect 结合起来,以提供数据库连接和行为的来源。

异常上下文

封装有关正在进行的错误条件的信息。

嵌套事务

表示一个“嵌套”或 SAVEPOINT 事务。

根事务

表示 Connection 上的“根”事务。

事务

表示正在进行的数据库事务。

两阶段事务

表示一个两阶段事务。

class sqlalchemy.engine.Connection

为包装的 DB-API 连接提供高级功能。

可以通过调用 Engine.connect() 方法获取 Connection 对象,它提供用于执行 SQL 语句以及事务控制的服务。

Connection 对象不是线程安全的。虽然可以使用适当同步的访问在多个线程之间共享 Connection,但仍然有可能底层的 DBAPI 连接不支持线程之间的共享访问。有关详细信息,请查看 DBAPI 文档。

Connection 对象表示从连接池中检出的单个 DBAPI 连接。在此状态下,连接池对连接没有任何影响,包括其过期或超时状态。为了使连接池能够正确管理连接,当连接不再使用时,应将其返回到连接池(即 connection.close())。

类签名

class sqlalchemy.engine.Connection (sqlalchemy.engine.interfaces.ConnectionEventsTarget, sqlalchemy.inspection.Inspectable)

method sqlalchemy.engine.Connection.__init__(engine: Engine, connection: PoolProxiedConnection | None = None, _has_events: bool | None = None, _allow_revalidate: bool = True, _allow_autobegin: bool = True)

构造一个新的 Connection。

method sqlalchemy.engine.Connection.begin() RootTransaction

在自动开始发生之前开始一个事务。

例如

with engine.connect() as conn:
    with conn.begin() as trans:
        conn.execute(table.insert(), {"username": "sandy"})

返回的对象是 RootTransaction 的实例。此对象表示事务的“范围”,它在调用 Transaction.rollback()Transaction.commit() 方法时完成;该对象还像上面示例中那样用作上下文管理器。

The Connection.begin() 方法开始一个事务,该事务通常在连接首次用于执行语句时开始。使用此方法的原因可能是要在特定时间调用 ConnectionEvents.begin() 事件,或者根据上下文管理块来组织连接签出范围内的代码,例如

with engine.connect() as conn:
    with conn.begin():
        conn.execute(...)
        conn.execute(...)

    with conn.begin():
        conn.execute(...)
        conn.execute(...)

以上代码与其行为与以下不使用 Connection.begin() 的代码在本质上没有区别;以下风格被称为“边走边提交”风格

with engine.connect() as conn:
    conn.execute(...)
    conn.execute(...)
    conn.commit()

    conn.execute(...)
    conn.execute(...)
    conn.commit()

从数据库的角度来看,Connection.begin() 方法不会发出任何 SQL 也不会以任何方式更改底层 DBAPI 连接的状态;Python DBAPI 没有显式事务开始的概念。

另见

使用事务和 DBAPI - 在 SQLAlchemy 统一教程

Connection.begin_nested() - 使用 SAVEPOINT

Connection.begin_twophase() - 使用两阶段 /XID 事务

Engine.begin() - 来自 Engine 的上下文管理器

method sqlalchemy.engine.Connection.begin_nested() NestedTransaction

开始一个嵌套事务(即 SAVEPOINT),并返回一个事务句柄,该句柄控制 SAVEPOINT 的范围。

例如

with engine.begin() as connection:
    with connection.begin_nested():
        connection.execute(table.insert(), {"username": "sandy"})

返回的对象是一个NestedTransaction 的实例,它包含事务方法 NestedTransaction.commit()NestedTransaction.rollback();对于嵌套事务,这些方法对应于操作“RELEASE SAVEPOINT <name>”和“ROLLBACK TO SAVEPOINT <name>”。保存点的名称在NestedTransaction 对象中是本地的,并且是自动生成的。与任何其他Transaction 一样,NestedTransaction 可以用作上下文管理器,如上所示,它将根据块中的操作是否成功或引发异常而“释放”或“回滚”。

嵌套事务需要底层数据库中的 SAVEPOINT 支持,否则行为将是未定义的。SAVEPOINT 通常用于在事务中运行可能失败的操作,同时继续外部事务。例如

from sqlalchemy import exc

with engine.begin() as connection:
    trans = connection.begin_nested()
    try:
        connection.execute(table.insert(), {"username": "sandy"})
        trans.commit()
    except exc.IntegrityError:  # catch for duplicate username
        trans.rollback()  # rollback to savepoint

    # outer transaction continues
    connection.execute( ... )

如果Connection.begin_nested() 在未首先调用Connection.begin()Engine.begin() 的情况下被调用,Connection 对象将首先“自动开始”外部事务。可以使用“边走边提交”的方式提交此外部事务,例如

with engine.connect() as connection:  # begin() wasn't called

    with connection.begin_nested(): will auto-"begin()" first
        connection.execute( ... )
    # savepoint is released

    connection.execute( ... )

    # explicitly commit outer transaction
    connection.commit()

    # can continue working with connection here

在 2.0 版中变更: Connection.begin_nested() 现在将参与连接“自动开始”行为,该行为是 2.0 中的新行为 / 1.4 中的“未来”样式连接。

另见

Connection.begin()

使用 SAVEPOINT - ORM 对 SAVEPOINT 的支持

method sqlalchemy.engine.Connection.begin_twophase(xid: Any | None = None) TwoPhaseTransaction

开始两阶段或 XA 事务并返回事务句柄。

返回的对象是一个TwoPhaseTransaction 的实例,除了Transaction 提供的方法外,还提供了一个TwoPhaseTransaction.prepare() 方法。

参数:

xid – 两阶段事务 ID。如果没有提供,将生成一个随机 ID。

method sqlalchemy.engine.Connection.close() None

关闭此Connection

这将导致释放底层数据库资源,即内部引用的 DBAPI 连接。DBAPI 连接通常会恢复到由产生此ConnectionPool 引用的连接持有Engine。DBAPI 连接上存在的任何事务状态也会通过 DBAPI 连接的rollback() 方法无条件地释放,而不管与此Connection 可能挂起任何Transaction 对象。

这将有调用Connection.rollback() 的效果,如果存在任何事务。

在调用Connection.close() 之后,Connection 将永久处于关闭状态,并且不允许任何进一步的操作。

attribute sqlalchemy.engine.Connection.closed

如果此连接已关闭,则返回 True。

method sqlalchemy.engine.Connection.commit() None

提交当前正在进行的事务。

此方法会提交当前的事务(如果已启动)。如果没有启动事务,则该方法没有任何效果,假设连接处于非失效状态。

只要首次执行语句或调用Connection.begin() 方法,就会自动在Connection 上开始事务。

注意

Connection.commit() 方法仅对与Connection 对象关联的主数据库事务进行操作。它不会对从Connection.begin_nested() 方法调用的 SAVEPOINT 进行操作;要控制 SAVEPOINT,请在由Connection.begin_nested() 方法本身返回的NestedTransaction 上调用NestedTransaction.commit()

attribute sqlalchemy.engine.Connection.connection

此连接管理的底层 DB-API 连接。

这是一个 SQLAlchemy 连接池代理连接,然后具有属性_ConnectionFairy.dbapi_connection,它引用实际的驱动程序连接。

attribute sqlalchemy.engine.Connection.default_isolation_level

与使用的Dialect 关联的初始连接时间隔离级别。

此值独立于Connection.execution_options.isolation_levelEngine.execution_options.isolation_level 执行选项,并且由Dialect 在创建第一个连接时确定,通过在发出任何其他命令之前对数据库执行 SQL 查询以获取当前隔离级别来确定。

调用此访问器不会调用任何新的 SQL 查询。

另见

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

method sqlalchemy.engine.Connection.detach() None

将底层 DB-API 连接与其连接池分离。

例如

with engine.connect() as conn:
    conn.detach()
    conn.execute(text("SET search_path TO schema1, schema2"))

    # work with connection

# connection is fully closed (since we used "with:", can
# also call .close())

Connection 实例将保持可用。当关闭(或从上面的上下文管理器上下文中退出)时,DB-API 连接将被实际关闭,并且不会返回到其原始池。

此方法可用于将应用程序的其余部分与连接上的修改状态(例如事务隔离级别或类似状态)隔离开来。

method sqlalchemy.engine.Connection.exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

直接在 DBAPI 游标上执行字符串 SQL 语句,没有任何 SQL 编译步骤。

这可用于将任何字符串直接传递到正在使用的 DBAPI 的 cursor.execute() 方法。

参数:
  • statement – 要执行的语句 str。绑定参数必须使用底层 DBAPI 的 paramstyle,例如“qmark”、“pyformat”、“format”等。

  • parameters – 表示要在执行中使用的绑定参数值。格式之一是:命名参数的字典、位置参数的元组或包含多个字典或元组的列表,用于多重执行支持。

返回值:

a CursorResult.

例如多个字典

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    [{"id":1, "value":"v1"}, {"id":2, "value":"v2"}]
)

单个字典

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (%(id)s, %(value)s)",
    dict(id=1, value="v1")
)

单个元组

conn.exec_driver_sql(
    "INSERT INTO table (id, value) VALUES (?, ?)",
    (1, 'v1')
)

另见

PEP 249

method sqlalchemy.engine.Connection.execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行 SQL 语句构造并返回一个 CursorResult.

参数:
  • statement

    要执行的语句。这始终是一个既在 ClauseElement 又在 Executable 层次结构中的对象,包括

  • parameters – 将绑定到语句的参数。这可以是参数名称到值的字典,也可以是可变序列(例如列表)字典。当传递字典列表时,底层语句执行将使用 DBAPI cursor.executemany() 方法。当传递单个字典时,将使用 DBAPI cursor.execute() 方法。

  • execution_options – 执行选项的可选字典,这些选项将与语句执行相关联。此字典可以提供 Connection.execution_options() 接受的选项的子集。

返回值:

a Result object.

method sqlalchemy.engine.Connection.execution_options(**opt: Any) Connection

设置连接的非 SQL 选项,这些选项在执行期间生效。

此方法会**就地修改**此 Connection;返回值是调用该方法的同一个 Connection 对象。请注意,这与其他对象(如 Engine.execution_options()Executable.execution_options())上的 execution_options 方法的行为形成对比。其原因是,许多此类执行选项必然会修改基础 DBAPI 连接的状态,因此没有可行的办法将这种选项的影响局限于“子”连接。

版本 2.0 中的变更: 与具有此方法的其他对象相比,Connection.execution_options() 方法会就地修改连接,而不会创建它的副本。

如其他地方所述,Connection.execution_options() 方法接受任何任意参数,包括用户定义的名称。所有给定的参数都可以通过多种方式使用,包括使用 Connection.get_execution_options() 方法。请参阅 Executable.execution_options()Engine.execution_options() 中的示例。

SQLAlchemy 本身当前识别的关键字包括 Executable.execution_options() 下列出的所有关键字,以及特定于 Connection 的其他关键字。

参数:
method sqlalchemy.engine.Connection.get_execution_options() _ExecuteOptions

获取将在执行期间生效的非 SQL 选项。

版本 1.3 中新增。

method sqlalchemy.engine.Connection.get_isolation_level() Literal['SERIALIZABLE', 'REPEATABLE READ', 'READ COMMITTED', 'READ UNCOMMITTED', 'AUTOCOMMIT']

返回此连接范围内数据库中存在的当前 **实际** 隔离级别。

此属性将对数据库执行实时 SQL 操作以获取当前隔离级别,因此返回的值是底层 DBAPI 连接上的实际级别,无论此状态是如何设置的。这将是四种实际隔离模式之一:READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE。它**不会**包含AUTOCOMMIT隔离级别设置。第三方方言也可能具有额外的隔离级别设置。

注意

此方法**不会报告**AUTOCOMMIT隔离级别,它是一个独立于**实际**隔离级别的单独dbapi设置。当AUTOCOMMIT在使用时,数据库连接仍然具有生效的“传统”隔离模式,通常是四种值之一:READ UNCOMMITTEDREAD COMMITTEDREPEATABLE READSERIALIZABLE

Connection.default_isolation_level访问器进行比较,该访问器返回初始连接时数据库上存在的隔离级别。

另见

Connection.default_isolation_level - 查看默认级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

method sqlalchemy.engine.Connection.get_nested_transaction() NestedTransaction | None

返回当前正在进行的嵌套事务(如果有)。

版本 1.4 中新增。

method sqlalchemy.engine.Connection.get_transaction() RootTransaction | None

返回当前正在进行的根事务(如果有)。

版本 1.4 中新增。

method sqlalchemy.engine.Connection.in_nested_transaction() bool

如果事务正在进行,则返回 True。

method sqlalchemy.engine.Connection.in_transaction() bool

如果事务正在进行,则返回 True。

attribute sqlalchemy.engine.Connection.info

与此Connection引用的底层 DBAPI 连接关联的信息字典,允许将用户定义的数据与连接相关联。

此处的 data 将与 DBAPI 连接一起,包括在它被返回到连接池并在Connection的后续实例中再次使用后。

method sqlalchemy.engine.Connection.invalidate(exception: BaseException | None = None) None

使与此Connection关联的底层 DBAPI 连接无效。

将尝试立即关闭底层 DBAPI 连接;但是,如果此操作失败,则会记录错误,但不会引发错误。然后,无论 close() 是否成功,连接都会被丢弃。

在下次使用时(其中“使用”通常意味着使用Connection.execute()方法或类似方法),此Connection将尝试使用Pool的服务作为连接源(例如“重新连接”)来获取新的 DBAPI 连接。

如果在调用Connection.invalidate()方法时事务正在进行(例如,已调用Connection.begin()方法),则在 DBAPI 级别,与该事务关联的所有状态都会丢失,因为 DBAPI 连接已关闭。直到调用Transaction.rollback()方法结束Transaction对象之前,Connection将不允许重新连接继续;在此之前,任何继续使用Connection的尝试都将引发InvalidRequestError。这是为了防止应用程序在事务因失效而丢失的情况下意外地继续进行正在进行的事务操作。

与自动失效一样,Connection.invalidate()方法将在连接池级别调用PoolEvents.invalidate()事件。

参数:

exception – 一个可选的Exception实例,它是失效的原因。传递给事件处理程序和日志记录函数。

attribute sqlalchemy.engine.Connection.invalidated

如果此连接已失效,则返回 True。

这并不表示连接是否在池级别失效,但是

method sqlalchemy.engine.Connection.rollback() None

回滚当前正在进行的事务。

此方法如果已启动事务,则回滚当前事务。如果未启动事务,则此方法无效。如果已启动事务,并且连接处于无效状态,则使用此方法清除事务。

只要首次执行语句或调用Connection.begin() 方法,就会自动在Connection 上开始事务。

注意

The Connection.rollback() 方法仅作用于与 Connection 对象关联的主数据库事务。它不作用于通过 Connection.begin_nested() 方法调用的 SAVEPOINT;要控制 SAVEPOINT,请在通过 Connection.begin_nested() 方法本身返回的 NestedTransaction 上调用 NestedTransaction.rollback()

method sqlalchemy.engine.Connection.scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) Any

执行 SQL 语句构造并返回标量对象。

此方法是调用 Result.scalar() 方法(在调用 Connection.execute() 方法之后)的简写。参数等效。

返回值:

表示返回的第一行第一列的标量 Python 值。

method sqlalchemy.engine.Connection.scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) ScalarResult[Any]

执行并返回一个标量结果集,该结果集从每行的第一列生成标量值。

此方法等效于调用 Connection.execute() 以接收 Result 对象,然后调用 Result.scalars() 方法来生成 ScalarResult 实例。

返回值:

一个 ScalarResult

1.4.24 版本新增。

method sqlalchemy.engine.Connection.schema_for_object(obj: HasSchemaAttr) str | None

返回给定架构项的架构名称,考虑到当前架构转换映射。

class sqlalchemy.engine.CreateEnginePlugin

一组挂钩,旨在根据 URL 中的入口点名称来增强 Engine 对象的构建。

CreateEnginePlugin 的目的是允许第三方系统应用引擎、池和方言级别的事件监听器,而无需修改目标应用程序;相反,插件名称可以添加到数据库 URL 中。 CreateEnginePlugin 的目标应用程序包括

  • 连接和 SQL 性能工具,例如使用事件来跟踪签出次数和/或语句花费的时间

  • 代理等连接插件

将记录器附加到 Engine 对象的基本 CreateEnginePlugin 可能如下所示

import logging

from sqlalchemy.engine import CreateEnginePlugin
from sqlalchemy import event

class LogCursorEventsPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        # consume the parameter "log_cursor_logging_name" from the
        # URL query
        logging_name = url.query.get("log_cursor_logging_name", "log_cursor")

        self.log = logging.getLogger(logging_name)

    def update_url(self, url):
        "update the URL to one that no longer includes our parameters"
        return url.difference_update_query(["log_cursor_logging_name"])

    def engine_created(self, engine):
        "attach an event listener after the new Engine is constructed"
        event.listen(engine, "before_cursor_execute", self._log_event)


    def _log_event(
        self,
        conn,
        cursor,
        statement,
        parameters,
        context,
        executemany):

        self.log.info("Plugin logged cursor event: %s", statement)

插件使用类似于方言的入口点注册

entry_points={
    'sqlalchemy.plugins': [
        'log_cursor_plugin = myapp.plugins:LogCursorEventsPlugin'
    ]

使用上述名称的插件将从数据库 URL 中调用,如下所示

from sqlalchemy import create_engine

engine = create_engine(
    "mysql+pymysql://scott:tiger@localhost/test?"
    "plugin=log_cursor_plugin&log_cursor_logging_name=mylogger"
)

plugin URL 参数支持多个实例,因此 URL 可以指定多个插件;它们按 URL 中指定的顺序加载

engine = create_engine(
  "mysql+pymysql://scott:tiger@localhost/test?"
  "plugin=plugin_one&plugin=plugin_twp&plugin=plugin_three")

插件名称也可以直接传递给 create_engine(),使用 create_engine.plugins 参数

engine = create_engine(
  "mysql+pymysql://scott:tiger@localhost/test",
  plugins=["myplugin"])

1.2.3 版本新增: 插件名称也可以作为列表指定给 create_engine()

插件可以从 URL 对象以及 kwargs 字典中使用插件特定的参数,该字典是传递给 create_engine() 调用的参数字典。“使用”这些参数包括在插件初始化时必须删除它们,以便这些参数不会传递给 Dialect 构造函数,在那里它们将引发 ArgumentError,因为它们不为方言所知。

从 SQLAlchemy 1.4 版本开始,参数应该继续直接从 kwargs 字典中使用,方法是使用诸如 dict.pop 之类的方法删除值。从 URL 对象中使用的参数应通过实现 CreateEnginePlugin.update_url() 方法来使用,该方法返回一个新的 URL 副本,其中删除了插件特定的参数

class MyPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        self.my_argument_one = url.query['my_argument_one']
        self.my_argument_two = url.query['my_argument_two']
        self.my_argument_three = kwargs.pop('my_argument_three', None)

    def update_url(self, url):
        return url.difference_update_query(
            ["my_argument_one", "my_argument_two"]
        )

诸如上面所示的参数将从诸如 create_engine() 之类的调用中使用

from sqlalchemy import create_engine

engine = create_engine(
  "mysql+pymysql://scott:tiger@localhost/test?"
  "plugin=myplugin&my_argument_one=foo&my_argument_two=bar",
  my_argument_three='bat'
)

1.4 版本变更: URL 对象现在是不可变的;需要更改 URLCreateEnginePlugin 应该实现新添加的 CreateEnginePlugin.update_url() 方法,该方法在插件构造后调用。

为了迁移,以以下方式构造插件,检查 CreateEnginePlugin.update_url() 方法是否存在以检测正在运行的版本

class MyPlugin(CreateEnginePlugin):
    def __init__(self, url, kwargs):
        if hasattr(CreateEnginePlugin, "update_url"):
            # detect the 1.4 API
            self.my_argument_one = url.query['my_argument_one']
            self.my_argument_two = url.query['my_argument_two']
        else:
            # detect the 1.3 and earlier API - mutate the
            # URL directly
            self.my_argument_one = url.query.pop('my_argument_one')
            self.my_argument_two = url.query.pop('my_argument_two')

        self.my_argument_three = kwargs.pop('my_argument_three', None)

    def update_url(self, url):
        # this method is only called in the 1.4 version
        return url.difference_update_query(
            ["my_argument_one", "my_argument_two"]
        )

另见

URL 对象现在是不可变的 - URL 的变化概述,其中也包含有关 CreateEnginePlugin 的注释。

当引擎创建过程完成并生成 Engine 对象时,它会再次通过 CreateEnginePlugin.engine_created() 钩子传递给插件。在这个钩子中,可以对引擎进行额外的更改,最常见的是设置事件(例如在 核心事件 中定义的事件)。

method sqlalchemy.engine.CreateEnginePlugin.__init__(url: URL, kwargs: Dict[str, Any])

构造一个新的 CreateEnginePlugin.

对于每次调用 create_engine(),都会单独实例化插件对象。单个 Engine 将传递给与该 URL 对应的 CreateEnginePlugin.engine_created() 方法。

参数:
method sqlalchemy.engine.CreateEnginePlugin.engine_created(engine: Engine) None

在完全构造后接收 Engine 对象。

插件可以对引擎进行额外的更改,例如注册引擎或连接池事件。

method sqlalchemy.engine.CreateEnginePlugin.handle_dialect_kwargs(dialect_cls: Type[Dialect], dialect_args: Dict[str, Any]) None

解析和修改方言 kwargs

method sqlalchemy.engine.CreateEnginePlugin.handle_pool_kwargs(pool_cls: Type[Pool], pool_args: Dict[str, Any]) None

解析和修改池 kwargs

method sqlalchemy.engine.CreateEnginePlugin.update_url(url: URL) URL

更新 URL.

应该返回一个新的 URL。此方法通常用于使用 URL 中的配置参数,这些参数必须被删除,因为方言无法识别它们。 URL.difference_update_query() 方法可用于删除这些参数。有关示例,请参见 CreateEnginePlugin 的文档字符串。

版本 1.4 中新增。

class sqlalchemy.engine.Engine

PoolDialect 结合起来,以提供数据库连接和行为的来源。

使用 create_engine() 函数公开实例化一个 Engine 对象。

类签名

class sqlalchemy.engine.Engine (sqlalchemy.engine.interfaces.ConnectionEventsTarget, sqlalchemy.log.Identified, sqlalchemy.inspection.Inspectable)

method sqlalchemy.engine.Engine.begin() Iterator[Connection]

返回一个上下文管理器,它提供一个带建立的TransactionConnection

例如

with engine.begin() as conn:
    conn.execute(
        text("insert into table (x, y, z) values (1, 2, 3)")
    )
    conn.execute(text("my_special_procedure(5)"))

在成功操作后,Transaction 会被提交。如果引发错误,Transaction 会被回滚。

另见

Engine.connect() - 从一个 Engine 中获取一个 Connection

Connection.begin() - 为特定 Connection 启动一个Transaction

method sqlalchemy.engine.Engine.clear_compiled_cache() None

清除与方言关联的已编译缓存。

这仅适用于通过 create_engine.query_cache_size 参数建立的内置缓存。它不会影响通过 Connection.execution_options.compiled_cache 参数传递的任何字典缓存。

版本 1.4 中新增。

method sqlalchemy.engine.Engine.connect() Connection

返回一个新的 Connection 对象。

Connection 充当 Python 上下文管理器,因此该方法的典型用法如下所示

with engine.connect() as connection:
    connection.execute(text("insert into table values ('foo')"))
    connection.commit()

在上面的代码中,块完成后,连接将被“关闭”,其底层 DBAPI 资源将返回到连接池。这还具有回滚任何显式启动或通过自动启动启动的事务的效果,并且如果启动了事务并且仍在进行中,将发出 ConnectionEvents.rollback() 事件。

method sqlalchemy.engine.Engine.dispose(close: bool = True) None

处理此 Engine 使用的连接池。

在处理完旧连接池后,会立即创建一个新的连接池。以前连接池的处理方式要么是主动的,通过关闭池中所有当前签入的连接,要么是被动的,通过失去对其的引用,但除此之外不关闭任何连接。后者策略更适合于分叉 Python 进程中的初始化程序。

参数:

close

如果保留其默认值 True,其作用是完全关闭所有当前签入的数据库连接。仍签出的连接不会关闭,但它们不再与此 Engine 关联,因此当它们单独关闭时,最终它们所关联的 Pool 会被垃圾回收,如果还没有在签入时关闭,它们会完全关闭。

如果设置为 False,则以前连接池将被解除引用,并且除此之外不会以任何方式触碰。

New in version 1.4.33: 添加了 Engine.dispose.close 参数以允许在子进程中替换连接池,而不会干扰父进程使用的连接。

attribute sqlalchemy.engine.Engine.driver

Engine 使用的 Dialect 的驱动程序名称。

attribute sqlalchemy.engine.Engine.engine

返回此 Engine

用于接受同一个变量内的 Connection / Engine 对象的旧方案。

method sqlalchemy.engine.Engine.execution_options(**opt: Any) OptionEngine

返回一个新的 Engine,它将提供具有给定执行选项的 Connection 对象。

返回的 Engine 与原始 Engine 相关联,因为它共享相同的连接池和其他状态

  • 新创建的 Pool 与父 Engine 使用的是同一个实例。 Engine.dispose() 方法会同时替换父引擎和当前引擎的连接池实例。

  • 事件监听器是“级联”的,这意味着新的 Engine 会继承父引擎的事件,并且可以单独与新的 Engine 关联新的事件。

  • 日志配置和 logging_name 会从父 Engine 复制。

Engine.execution_options() 方法的目的是实现这样的方案:多个 Engine 对象引用同一个连接池,但通过影响每个引擎的一些执行级别行为的选项来进行区分。一个这样的例子是将 Engine 实例分成独立的“读取器”和“写入器”,其中一个 Engine隔离级别 设置较低,甚至使用“autocommit”禁用事务。这种配置的示例见 为单个引擎维护多个隔离级别.

另一个例子是使用自定义选项 shard_id,该选项由事件处理,以便在数据库连接上更改当前模式。

from sqlalchemy import event
from sqlalchemy.engine import Engine

primary_engine = create_engine("mysql+mysqldb://")
shard1 = primary_engine.execution_options(shard_id="shard1")
shard2 = primary_engine.execution_options(shard_id="shard2")

shards = {"default": "base", "shard_1": "db1", "shard_2": "db2"}

@event.listens_for(Engine, "before_cursor_execute")
def _switch_shard(conn, cursor, stmt,
        params, context, executemany):
    shard_id = conn.get_execution_options().get('shard_id', "default")
    current_shard = conn.info.get("current_shard", None)

    if current_shard != shard_id:
        cursor.execute("use %s" % shards[shard_id])
        conn.info["current_shard"] = shard_id

上面的示例展示了两个 Engine 对象,它们各自充当 Connection 对象的工厂,这些对象已经预先建立了“shard_id”执行选项。然后,ConnectionEvents.before_cursor_execute() 事件处理程序解释此执行选项,以发出 MySQL use 语句,在语句执行之前切换数据库,同时使用 Connection.info 字典跟踪我们已建立的数据库。

另见

Connection.execution_options() - 更新 Connection 对象上的执行选项。

Engine.update_execution_options() - 更新给定 Engine 的执行选项。

Engine.get_execution_options()

method sqlalchemy.engine.Engine.get_execution_options() _ExecuteOptions

获取将在执行期间生效的非 SQL 选项。

attribute sqlalchemy.engine.Engine.name

Engine 使用的 Dialect 的名称。

method sqlalchemy.engine.Engine.raw_connection() PoolProxiedConnection

从连接池返回一个“原始”的 DBAPI 连接。

返回的对象是底层驱动程序使用的 DBAPI 连接对象的代理版本。该对象将具有与真实 DBAPI 连接相同的行为,只是它的 close() 方法会导致连接返回到池,而不是真正关闭。

Connection 对象已存在时,可以使用 Connection.connection 访问器获取 DBAPI 连接。

method sqlalchemy.engine.Engine.update_execution_options(**opt: Any) None

更新此 Engine 的默认 execution_options 字典。

在 **opt 中给定的键值对将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 create_engine()execution_options 参数发送。

class sqlalchemy.engine.ExceptionContext

封装有关正在进行的错误条件的信息。

此对象仅存在于传递给 DialectEvents.handle_error() 事件,支持可以扩展而不会出现向后不兼容性的接口。

attribute sqlalchemy.engine.ExceptionContext.chained_exception: BaseException | None

异常链中前一个处理程序返回的异常,如果有的话。

如果存在,此异常将是 SQLAlchemy 最终抛出的异常,除非后续处理程序将其替换。

可能是 None。

属性 sqlalchemy.engine.ExceptionContext.connection: Connection | None

异常发生期间使用的 Connection

除了首次连接失败的情况外,此成员始终存在。

属性 sqlalchemy.engine.ExceptionContext.cursor: DBAPICursor | None

DBAPI 游标对象。

可能是 None。

属性 sqlalchemy.engine.ExceptionContext.dialect: Dialect

正在使用的 Dialect

此成员在所有事件钩子调用中都存在。

版本 2.0 中新增。

属性 sqlalchemy.engine.ExceptionContext.engine: Engine | None

异常发生期间使用的 Engine

除了处理连接池 “pre-ping” 过程中的错误之外,此成员在所有情况下都存在。

属性 sqlalchemy.engine.ExceptionContext.execution_context: ExecutionContext | None

与正在进行的执行操作相对应的 ExecutionContext

这对于语句执行操作存在,但不适用于事务开始/结束等操作。当异常在 ExecutionContext 构建之前引发时,它也不存在。

请注意,ExceptionContext.statementExceptionContext.parameters 成员可能表示与 ExecutionContext 不同的值,这可能是由于 ConnectionEvents.before_cursor_execute() 事件或类似事件修改了要发送的语句/参数造成的。

可能是 None。

属性 sqlalchemy.engine.ExceptionContext.invalidate_pool_on_disconnect: bool

表示当 “断开连接” 条件生效时,池中的所有连接是否应该失效。

DialectEvents.handle_error() 事件范围内将此标志设置为 False 将使池中的所有连接在断开连接期间不会失效;实际上,只有发生错误的当前连接才会失效。

此标志的目的是用于自定义断开连接处理方案,在这些方案中,池中其他连接的失效将基于其他条件执行,甚至可以基于每个连接执行。

属性 sqlalchemy.engine.ExceptionContext.is_disconnect: bool

表示发生的异常是否代表 “断开连接” 条件。

DialectEvents.handle_error() 处理程序范围内,此标志始终为 True 或 False。

SQLAlchemy 将依赖此标志来确定连接是否应随后失效。也就是说,通过分配给此标志,“断开连接” 事件(随后导致连接和池失效)可以通过更改此标志来调用或阻止。

注意

使用 create_engine.pool_pre_ping 参数启用的池 “pre_ping” 处理程序在决定 “ping” 是否返回 false 之前,**不会** 参考此事件,而是会接收未处理的错误。对于这种情况,可以使用 基于 engine_connect() 的传统方案 。未来的 API 将允许更全面地自定义所有函数中的 “断开连接” 检测机制。

属性 sqlalchemy.engine.ExceptionContext.is_pre_ping: bool

指示此错误是否发生在 create_engine.pool_pre_ping 设置为 True 时的 “pre-ping” 步骤中。在此模式下,ExceptionContext.engine 属性将为 None 。可以使用 ExceptionContext.dialect 属性访问正在使用的方言。

2.0.5 版新增。

属性 sqlalchemy.engine.ExceptionContext.original_exception: BaseException

捕获的异常对象。

此成员始终存在。

属性 sqlalchemy.engine.ExceptionContext.parameters: _DBAPIAnyExecuteParams | None

直接发送到 DBAPI 的参数集合。

可能是 None。

attribute sqlalchemy.engine.ExceptionContext.sqlalchemy_exception: StatementError | None

包裹原始异常的 sqlalchemy.exc.StatementError,如果异常处理没有被事件规避,它将被抛出。

可能为 None,因为并非所有异常类型都被 SQLAlchemy 包裹。对于子类化 dbapi’s Error 类的 DBAPI 级异常,此字段始终存在。

attribute sqlalchemy.engine.ExceptionContext.statement: str | None

直接发送到 DBAPI 的字符串 SQL 语句。

可能是 None。

class sqlalchemy.engine.NestedTransaction

表示一个“嵌套”或 SAVEPOINT 事务。

通过调用 Connection.begin_nested() 方法创建 NestedTransaction 对象 Connection

当使用 NestedTransaction 时,“begin” / “commit” / “rollback” 的语义如下

模仿外部事务的语义,以保存点的形式进行处理,以便代码可以以不可知的方式处理“保存点”事务和“外部”事务。

另见

使用 SAVEPOINT - SAVEPOINT API 的 ORM 版本。

method sqlalchemy.engine.NestedTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基础事务,则事务将回滚。否则,该方法将返回。

这用于取消事务,而不会影响外层事务的范围。

method sqlalchemy.engine.NestedTransaction.commit() None

提交此 Transaction

此实现可能因所用事务类型而异

method sqlalchemy.engine.NestedTransaction.rollback() None

回滚此 Transaction

此实现可能因所用事务类型而异

class sqlalchemy.engine.RootTransaction

表示 Connection 上的“根”事务。

这对应于 Connection 正在进行的当前 “BEGIN/COMMIT/ROLLBACK”。The RootTransaction 是通过调用 Connection.begin() 方法创建的,并在其整个活动跨度内与 Connection 相关联。当前正在使用的 RootTransaction 可通过 Connection.get_transaction 方法访问 Connection

2.0 风格 使用中,The Connection 还采用 “autobegin” 行为,该行为将在非事务状态下的连接用于在 DBAPI 连接上发出命令时创建新的 RootTransaction。在 2.0 风格使用中,The RootTransaction 的范围可以使用 Connection.commit()Connection.rollback() 方法控制。

方法 sqlalchemy.engine.RootTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基础事务,则事务将回滚。否则,该方法将返回。

这用于取消事务,而不会影响外层事务的范围。

方法 sqlalchemy.engine.RootTransaction.commit() None

提交此 Transaction

此实现可能因所用事务类型而异

方法 sqlalchemy.engine.RootTransaction.rollback() None

回滚此 Transaction

此实现可能因所用事务类型而异

sqlalchemy.engine.Transaction

表示正在进行的数据库事务。

通过调用 Connection.begin() 方法 Connection 获取 Transaction 对象。

from sqlalchemy import create_engine
engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
connection = engine.connect()
trans = connection.begin()
connection.execute(text("insert into x (a, b) values (1, 2)"))
trans.commit()

该对象提供 rollback()commit() 方法来控制事务边界。它还实现了上下文管理器接口,以便可以使用 Python with 语句与 Connection.begin() 方法一起使用。

with connection.begin():
    connection.execute(text("insert into x (a, b) values (1, 2)"))

Transaction 对象不是线程安全的。

类签名

sqlalchemy.engine.Transaction (sqlalchemy.engine.util.TransactionalContext)

方法 sqlalchemy.engine.Transaction.close() None

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基础事务,则事务将回滚。否则,该方法将返回。

这用于取消事务,而不会影响外层事务的范围。

方法 sqlalchemy.engine.Transaction.commit() None

提交此 Transaction

此实现可能因所用事务类型而异

方法 sqlalchemy.engine.Transaction.rollback() None

回滚此 Transaction

此实现可能因所用事务类型而异

sqlalchemy.engine.TwoPhaseTransaction

表示一个两阶段事务。

可以使用 Connection.begin_twophase() 方法获取新的 TwoPhaseTransaction 对象。

该接口与 Transaction 相同,增加了 prepare() 方法。

方法 sqlalchemy.engine.TwoPhaseTransaction.close() None

继承自 Transaction.close() 方法 Transaction

关闭此 Transaction

如果此事务是 begin/commit 嵌套中的基础事务,则事务将回滚。否则,该方法将返回。

这用于取消事务,而不会影响外层事务的范围。

方法 sqlalchemy.engine.TwoPhaseTransaction.commit() None

提交此 Transaction

此实现可能因所用事务类型而异

方法 sqlalchemy.engine.TwoPhaseTransaction.prepare() None

准备此 TwoPhaseTransaction

在 PREPARE 之后,可以提交事务。

方法 sqlalchemy.engine.TwoPhaseTransaction.rollback() None

回滚此 Transaction

此实现可能因所用事务类型而异

结果集 API

对象名称 描述

ChunkedIteratorResult

从生成迭代器的可调用对象工作的 IteratorResult

游标结果

表示来自 DBAPI 游标的状态的结果。

FilterResult

Result 的包装,返回除 Row 对象之外的对象,例如字典或标量对象。

FrozenResult

表示 Result 对象的“冻结”状态,适用于缓存。

IteratorResult

一个 Result,从 Python 迭代器获取 Row 对象或类似行数据。

MappingResult

Result 的包装,返回字典值而不是 Row 值。

MergedResult

一个 Result,从任意数量的 Result 对象合并。

结果

表示一组数据库结果。

表示单个结果行。

RowMapping

一个 Mapping,将列名和对象映射到 Row 值。

ScalarResult

Result 的包装,返回标量值而不是 Row 值。

TupleResult

一个 Result,其类型为返回普通 Python 元组而不是行。

sqlalchemy.engine.ChunkedIteratorResult

从生成迭代器的可调用对象工作的 IteratorResult

给定的 chunks 参数是一个函数,该函数接受每个块中要返回的行数,或 None 表示所有行。然后,该函数应返回一个未消耗的列表迭代器,每个列表都是请求的大小。

该函数可以随时再次被调用,在这种情况下,它应该从同一个结果集继续,但根据所给定的调整块大小。

版本 1.4 中新增。

成员

yield_per()

方法 sqlalchemy.engine.ChunkedIteratorResult.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

这会影响结果对象迭代时的底层行为,或者其他方法,例如 Result.fetchone() 每次返回一行。来自底层游标或其他数据源的数据将被缓冲到内存中的这些行,然后缓冲的集合将被逐行或按请求的行数生成。每次缓冲区清除后,它将被刷新到这些行,或者如果剩余的行更少,则刷新到剩余的行。

方法 Result.yield_per() 通常与 Connection.execution_options.stream_results 执行选项一起使用,这将允许正在使用的数据库方言使用服务器端游标,如果 DBAPI 支持其默认操作模式之外的特定“服务器端游标”模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,它将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法一次建立固定行缓冲区大小。

执行选项 Connection.execution_options.yield_per 可用于 ORM 操作,Session 导向的使用在 使用 Yield Per 提取大型结果集 中描述。仅核心版本与 Connection 一起使用,从 SQLAlchemy 1.4.40 开始。

版本 1.4 中新增。

参数:

num – 每次缓冲区重新填充时要提取的行数。如果设置为小于 1 的值,则会提取下一个缓冲区的全部行。

sqlalchemy.engine.CursorResult

表示来自 DBAPI 游标的状态的结果。

在版本 1.4 中更改: CursorResult` 替换了之前的 ResultProxy 接口。这些类基于 Result 调用 API,它为 SQLAlchemy Core 和 SQLAlchemy ORM 提供了更新的使用模型和调用外观。

通过类 Row 返回数据库行,该类在 DBAPI 返回的原始数据之上提供了额外的 API 功能和行为。通过使用过滤器,例如方法 Result.scalars(),也可以返回其他类型的对象。

另见

使用 SELECT 语句 - 访问 CursorResultRow 对象的入门资料。

方法 sqlalchemy.engine.CursorResult.all() Sequence[Row[_TP]]

继承自 Result.all() Result 的方法

以序列形式返回所有行。

调用后会关闭结果集。后续调用将返回空序列。

版本 1.4 中新增。

返回值:

一个 Row 对象的序列。

另见

使用服务器端游标(也称为流结果) - 如何在不完全加载到 Python 中的情况下流式传输大型结果集。

方法 sqlalchemy.engine.CursorResult.close() Any

关闭此 CursorResult

如果存在,这将关闭与语句执行相对应的底层 DBAPI 游标。请注意,当CursorResult耗尽所有可用行时,DBAPI 游标会自动释放。CursorResult.close()通常是可选方法,除非在丢弃CursorResult且仍有额外的行等待获取时。

在此方法被调用后,再次调用获取方法将不再有效,这将在后续使用中引发ResourceClosedError

method sqlalchemy.engine.CursorResult.columns(*col_expressions: _KeyIndexType) Self

继承自 Result.columns() 方法 Result

确定每行中应返回的列。

此方法可用于限制返回的列,以及重新排序它们。给定的表达式列表通常是一系列整数或字符串键名。它们也可能是适当的ColumnElement对象,对应于给定的语句结构。

在 2.0 版本中更改: 由于 1.4 中的错误,Result.columns()方法的行为不正确,使用单个索引调用该方法会导致Result对象产生标量值,而不是Row对象。在 2.0 版本中,此行为已得到修正,以便使用单个索引调用Result.columns()将产生一个Result对象,该对象继续产生Row对象,其中只包含一列。

例如

statement = select(table.c.x, table.c.y, table.c.z)
result = connection.execute(statement)

for z, y in result.columns('z', 'y'):
    # ...

使用语句本身中的列对象的示例

for z, y in result.columns(
        statement.selected_columns.c.z,
        statement.selected_columns.c.y
):
    # ...

版本 1.4 中新增。

参数:

*col_expressions – 指示要返回的列。元素可以是整数行索引、字符串列名,或者适当的ColumnElement对象,对应于选择结构。

返回值:

Result对象,并带有给定的修改。

method sqlalchemy.engine.CursorResult.fetchall() Sequence[Row[_TP]]

继承自 Result.fetchall() 方法 Result

它是Result.all()方法的同义词。

method sqlalchemy.engine.CursorResult.fetchmany(size: int | None = None) Sequence[Row[_TP]]

继承自 Result.fetchmany() 方法 Result

获取多行。

当所有行都被耗尽时,将返回一个空序列。

此方法是为了与 SQLAlchemy 1.x.x 向后兼容而提供的。

要按组获取行,请使用Result.partitions()方法。

返回值:

一个 Row 对象的序列。

method sqlalchemy.engine.CursorResult.fetchone() Row[_TP] | None

继承自 Result.fetchone() 方法 Result

获取一行。

当所有行都被耗尽时,将返回 None。

此方法是为了与 SQLAlchemy 1.x.x 向后兼容而提供的。

要仅获取结果的第一行,请使用Result.first()方法。要遍历所有行,请直接遍历Result对象。

返回值:

如果未应用任何过滤器,则为Row对象;如果没有任何剩余行,则为None

method sqlalchemy.engine.CursorResult.first() Row[_TP] | None

继承自 Result.first() 方法 Result

获取第一行,如果不存在行,则返回None

关闭结果集并丢弃剩余行。

注意

默认情况下,此方法返回一行,例如元组。要返回一个唯一的标量值,即第一行的第一列,请使用 Result.scalar() 方法,或者将 Result.scalars()Result.first() 结合使用。

此外,与旧版 ORM Query.first() 方法的行为相反,没有对 SQL 查询应用任何限制,该查询被调用以生成此 Result;对于在生成行之前将结果缓冲到内存中的 DBAPI 驱动程序,所有行都将被发送到 Python 进程,除了第一行以外的所有行都将被丢弃。

返回值:

一个 Row 对象,如果没有任何行,则为 None。

method sqlalchemy.engine.CursorResult.freeze() FrozenResult[_TP]

继承自 Result.freeze() 方法 Result

返回一个可调用对象,该对象将在调用时生成此 Result 的副本。

返回的可调用对象是 FrozenResult 的实例。

这用于结果集缓存。此方法必须在结果未被使用时在结果上调用,并且调用此方法将完全使用结果。当从缓存中检索 FrozenResult 时,可以对其调用任意次数,每次它都会针对其存储的行集生成一个新的 Result 对象。

另见

重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。

attribute sqlalchemy.engine.CursorResult.inserted_primary_key

返回刚刚插入的行的主键。

返回值是一个 Row 对象,它表示主键值的命名元组,主键列的顺序与源 Table 中的配置顺序一致。

版本 1.4.8 中的变更: - CursorResult.inserted_primary_key 值现在是通过 Row 类表示的命名元组,而不是普通元组。

此访问器仅适用于没有显式指定 Insert.returning() 的单行 insert() 结构。对多行插入的支持(虽然大多数后端尚不可用)将使用 CursorResult.inserted_primary_key_rows 访问器访问。

请注意,指定 server_default 子句的主键列,或者不符合“自动递增”列条件(请参阅 Column 中的说明),并使用数据库端默认值生成,将在此列表中显示为 None,除非后端支持“returning”并且插入语句使用“隐式 returning”启用执行。

如果执行的语句不是编译的表达式结构或不是 insert() 结构,则会引发 InvalidRequestError

attribute sqlalchemy.engine.CursorResult.inserted_primary_key_rows

以列表中包含的行形式返回 CursorResult.inserted_primary_key 的值;一些方言也可能支持多行形式。

注意

如下所示,在当前 SQLAlchemy 版本中,仅当使用 psycopg2 方言时,此访问器才比 CursorResult.inserted_primary_key 提供更多信息。未来版本希望将此功能推广到更多方言。

添加此访问器是为了支持那些提供当前由 Psycopg2 快速执行帮助器 功能实现的功能的方言,目前只有 psycopg2 方言,它支持一次插入多行,同时仍然保留能够返回服务器生成的“主键”值的行为。

  • 当使用 psycopg2 方言或其他可能在即将发布的版本中支持“快速 executemany”风格插入的方言时:当调用 INSERT 语句时,将行列表作为第二个参数传递给 Connection.execute(),此访问器将提供一个行列表,其中每一行包含为每个插入的行返回的“主键”值。

  • 当使用所有其他尚未支持此功能的方言/后端时:此访问器仅适用于单行 INSERT 语句,并返回与 CursorResult.inserted_primary_key 中的信息相同的信息,该信息包含在单元素列表中。当与要插入的行列表一起执行 INSERT 语句时,列表将包含每个在语句中插入的行的单行,但是它将包含 None,用于任何服务器生成的“主键”值。

SQLAlchemy 的未来版本将进一步推广 psycopg2 的“快速执行帮助器”功能,以适应其他方言,从而使此访问器可以更广泛地使用。

版本 1.4 中新增。

attribute sqlalchemy.engine.CursorResult.is_insert

如果此 CursorResult 是执行表达式语言编译的 insert() 结构的结果,则为 True。

当为 True 时,这意味着 inserted_primary_key 属性是可访问的,假设语句没有包含用户定义的“returning”结构。

method sqlalchemy.engine.CursorResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法 sqlalchemy.engine._WithKeys

返回一个可迭代视图,它生成每个 Row 所表示的字符串键。

这些键可以表示核心语句返回的列的标签,或者 ORM 执行返回的 ORM 类的名称。

视图还可以使用 Python in 运算符测试键包含性,它将测试视图中表示的字符串键,以及列对象等备用键。

版本 1.4 中的变更: 返回一个键视图对象,而不是一个普通列表。

method sqlalchemy.engine.CursorResult.last_inserted_params()

返回此执行的插入参数集合。

如果执行的语句不是编译的表达式结构或不是 insert() 结构,则会引发 InvalidRequestError

method sqlalchemy.engine.CursorResult.last_updated_params()

返回此执行的更新参数集合。

如果执行的语句不是编译的表达式结构或不是 update() 结构,则引发 InvalidRequestError

method sqlalchemy.engine.CursorResult.lastrow_has_defaults()

从底层 ExecutionContext 返回 lastrow_has_defaults()

有关详细信息,请参阅 ExecutionContext

attribute sqlalchemy.engine.CursorResult.lastrowid

返回 DBAPI 游标上的“lastrowid”访问器。

这是一种 DBAPI 特定方法,仅适用于支持它的那些后端,适用于适当的语句。它的行为在不同的后端并不一致。

在使用 insert() 表达式结构时,通常不需要使用此方法;CursorResult.inserted_primary_key 属性提供新插入行的主键值的元组,而与数据库后端无关。

method sqlalchemy.engine.CursorResult.mappings() MappingResult

继承自 Result.mappings() 方法 Result

对返回的行应用映射过滤器,返回 MappingResult 的实例。

应用此过滤器时,获取行将返回 RowMapping 对象而不是 Row 对象。

版本 1.4 中新增。

返回值:

一个新的 MappingResult 过滤对象,引用此 Result 对象。

method sqlalchemy.engine.CursorResult.merge(*others: Result[Any]) MergedResult[Any]

将此 Result 与其他兼容的结果对象合并。

返回的对象是 MergedResult 的实例,它将由给定结果对象的迭代器组成。

新结果将使用此结果对象的元数据。后续的结果对象必须针对相同的结果/游标元数据集,否则行为将是未定义的。

method sqlalchemy.engine.CursorResult.one() Row[_TP]

继承自 Result.one() 方法 Result

返回正好一行,否则引发异常。

如果结果返回零行,则引发 NoResultFound;如果将返回多行,则引发 MultipleResultsFound

注意

此方法默认返回一行,例如元组。要返回正好一个标量值,即第一行的第一列,请使用 Result.scalar_one() 方法,或者将 Result.scalars()Result.one() 结合使用。

版本 1.4 中新增。

返回值:

第一个 Row

引发:

MultipleResultsFoundNoResultFound

method sqlalchemy.engine.CursorResult.one_or_none() Row[_TP] | None

继承自 Result.one_or_none() 方法 Result

返回最多一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound

版本 1.4 中新增。

返回值:

第一个 RowNone(如果没有行可用)。

引发:

MultipleResultsFound

method sqlalchemy.engine.CursorResult.partitions(size: int | None = None) Iterator[Sequence[Row[_TP]]]

继承自 Result.partitions() 方法 Result

遍历给定大小的行子列表。

每个列表都将具有给定的大小,不包括要生成的最后一个列表,最后一个列表可能只有少量行。不会生成空列表。

当迭代器完全消耗时,结果对象将自动关闭。

请注意,除非使用 Connection.execution_options.stream_results 执行选项指示驱动程序不应预缓冲结果(如果可能),否则后端驱动程序通常会预先缓冲整个结果。并非所有驱动程序都支持此选项,并且对于不支持的驱动程序,此选项会被静默忽略。

在使用 ORM 时,Result.partitions() 方法与 yield_per 执行选项 结合使用通常在内存方面更有效。这会指示 DBAPI 驱动程序(如果可用)使用服务器端游标,并指示 ORM 加载内部机制一次只构建一定数量的 ORM 对象,然后将其生成出来。

版本 1.4 中新增。

参数:

size – 指示每个生成的列表中最多存在的行数。如果为 None,则使用由 Result.yield_per() 方法设置的值(如果已调用),或 Connection.execution_options.yield_per 执行选项,在这方面等效。如果未设置 yield_per,则使用 Result.fetchmany() 的默认值,该值可能是后端特定的,并且定义不明确。

返回值:

列表迭代器

method sqlalchemy.engine.CursorResult.postfetch_cols()

从底层的 ExecutionContext 返回 postfetch_cols()

有关详细信息,请参阅 ExecutionContext

如果执行的语句不是编译的表达式构造,或不是 insert() 或 update() 构造,则引发 InvalidRequestError

method sqlalchemy.engine.CursorResult.prefetch_cols()

从底层的 ExecutionContext 返回 prefetch_cols()

有关详细信息,请参阅 ExecutionContext

如果执行的语句不是编译的表达式构造,或不是 insert() 或 update() 构造,则引发 InvalidRequestError

attribute sqlalchemy.engine.CursorResult.returned_defaults

返回使用 ValuesBase.return_defaults() 功能获取的默认列的值。

该值是一个 Row 实例,或者如果未使用 ValuesBase.return_defaults() 或后端不支持 RETURNING,则为 None

另见

ValuesBase.return_defaults()

attribute sqlalchemy.engine.CursorResult.returned_defaults_rows

返回一个包含使用 ValuesBase.return_defaults() 功能获取的默认列值的行的列表。

返回值是 Row 对象的列表。

版本 1.4 中新增。

attribute sqlalchemy.engine.CursorResult.returns_rows

如果此 CursorResult 返回零行或更多行,则为 True。

即,如果可以调用方法 CursorResult.fetchone()CursorResult.fetchmany() CursorResult.fetchall()

总的来说,CursorResult.returns_rows 的值应该始终与 DBAPI 游标是否具有 .description 属性(指示结果列的存在)相一致,注意,如果发出的是返回行的语句,则返回零行的游标仍然具有 .description

对于针对 SELECT 语句的所有结果,以及使用 RETURNING 的 DML 语句 INSERT/UPDATE/DELETE,此属性应为 True。对于未使用 RETURNING 的 INSERT/UPDATE/DELETE 语句,该值通常为 False,但是有一些特定于方言的例外情况,例如,当使用 MSSQL/pyodbc 方言时,会内联发出一个 SELECT 以检索插入的主键值。

attribute sqlalchemy.engine.CursorResult.rowcount

返回此结果的“rowcount”。

“rowcount” 的主要目的是报告一次执行(即对于单个参数集)的 UPDATE 或 DELETE 语句的 WHERE 条件匹配的行数,然后可以将该行数与预期更新或删除的行数进行比较,以此作为断言数据完整性的一种手段。

此属性在游标关闭之前从 DBAPI 的 cursor.rowcount 属性中传递,以支持在游标关闭后不提供此值的 DBAPI。某些 DBAPI 可能会为其他类型的语句(例如 INSERT 和 SELECT 语句)提供有意义的值。为了检索这些语句的 cursor.rowcount,将 Connection.execution_options.preserve_rowcount 执行选项设置为 True,这将导致在返回任何结果或关闭游标之前,无论语句类型如何,都会无条件地记忆 cursor.rowcount 值。

对于 DBAPI 不支持特定类型语句的 rowcount 情况,或者/或执行时,返回的值将为 -1,该值直接来自 DBAPI,并且是 PEP 249 的一部分。但是,所有 DBAPI 都应支持针对单个参数集 UPDATE 和 DELETE 语句的 rowcount。

注意

关于 CursorResult.rowcount 的说明

  • 此属性返回匹配的行数,这并不一定与实际修改的行数相同。例如,如果给定的 SET 值与行中已存在的值相同,则 UPDATE 语句可能对给定行没有净变化。这样的行将被匹配,但不会被修改。在支持这两种样式的后端(例如 MySQL)上,rowcount 配置为在所有情况下返回匹配计数。

  • CursorResult.rowcount 在默认情况下与 UPDATE 或 DELETE 语句一起使用,并且仅与单个参数集一起使用。对于其他类型的语句,除非使用 Connection.execution_options.preserve_rowcount 执行选项,否则 SQLAlchemy 不会尝试预先记忆该值。注意,与 PEP 249 相反,许多 DBAPI 不支持针对并非 UPDATE 或 DELETE 的语句的 rowcount 值,尤其是在返回未完全预先缓冲的行时。对于不支持特定类型语句的 rowcount 的 DBAPI,应针对此类语句返回 -1 值。

  • CursorResult.rowcount 在使用多个参数集执行单个语句时(即 executemany)可能没有意义。大多数 DBAPI 不会跨多个参数集累加“rowcount”值,并在访问时返回 -1

  • SQLAlchemy 的 “针对 INSERT 语句的插入多个值”行为 功能在 Connection.execution_options.preserve_rowcount 执行选项设置为 True 时,确实支持对 CursorResult.rowcount 进行正确填充。

  • 使用 RETURNING 的语句可能不支持 rowcount,而是返回 -1 值。

方法 sqlalchemy.engine.CursorResult.scalar() Any

继承自 Result.scalar() 方法 of Result

获取第一行第一列的值,并关闭结果集。

如果不存在要获取的行,则返回 None

不执行任何验证来测试是否还有其他行。

调用此方法后,对象将完全关闭,例如 CursorResult.close() 方法将被调用。

返回值:

一个 Python 标量值,或者如果不存在其他行,则为 None

方法 sqlalchemy.engine.CursorResult.scalar_one() Any

继承自 Result.scalar_one() 方法 of Result

返回一个标量结果,或者抛出异常。

这等效于调用 Result.scalars() 然后调用 ScalarResult.one().

方法 sqlalchemy.engine.CursorResult.scalar_one_or_none() Any | None

继承自 Result.scalar_one_or_none() 方法 of Result

返回一个标量结果,或者 None

这等效于调用 Result.scalars() 然后调用 ScalarResult.one_or_none().

方法 sqlalchemy.engine.CursorResult.scalars(index: _KeyIndexType = 0) ScalarResult[Any]

继承自 Result.scalars() 方法 of Result

返回一个 ScalarResult 过滤对象,该对象将返回单个元素而不是 Row 对象。

例如

>>> result = conn.execute(text("select int_id from table"))
>>> result.scalars().all()
[1, 2, 3]

当从 ScalarResult 过滤对象中获取结果时,Result 将返回的单个列行将作为列的值返回。

版本 1.4 中新增。

参数:

index – 整数或行键,指示要从每行获取的列,默认为 0,表示第一列。

返回值:

一个新的 ScalarResult 过滤对象,引用此 Result 对象。

方法 sqlalchemy.engine.CursorResult.splice_horizontally(other)

返回一个新的 CursorResult,该对象将此 CursorResult 的行与另一个 CursorResult 的行“水平拼接”在一起。

提示

此方法是为了 SQLAlchemy ORM 的利益,不适合一般用途。

“水平拼接”意味着对于第一个和第二个结果集中的每一行,都会产生一个新的行,该行将两个行连接在一起,然后成为新的行。传入的 CursorResult 必须具有相同数量的行。通常预期两个结果集也来自相同的排序顺序,因为结果行是根据它们在结果中的位置拼接在一起的。

这里预期的用例是,多个针对不同表的 INSERT..RETURNING 语句(肯定需要排序)可以生成一个看起来像这两个表连接的单个结果。

例如

r1 = connection.execute(
    users.insert().returning(
        users.c.user_name,
        users.c.user_id,
        sort_by_parameter_order=True
    ),
    user_values
)

r2 = connection.execute(
    addresses.insert().returning(
        addresses.c.address_id,
        addresses.c.address,
        addresses.c.user_id,
        sort_by_parameter_order=True
    ),
    address_values
)

rows = r1.splice_horizontally(r2).all()
assert (
    rows ==
    [
        ("john", 1, 1, "[email protected]", 1),
        ("jack", 2, 2, "[email protected]", 2),
    ]
)

版本 2.0 中新增。

方法 sqlalchemy.engine.CursorResult.splice_vertically(other)

返回一个新的 CursorResult,该对象将此 CursorResult 的行与另一个 CursorResult 的行“垂直拼接”,即“扩展”。

提示

此方法是为了 SQLAlchemy ORM 的利益,不适合一般用途。

“垂直拼接”意味着给定结果的行将附加到此游标结果的行。传入的 CursorResult 必须具有代表相同列列表的行,这些行与它们在该 CursorResult 中的顺序相同。

版本 2.0 中新增。

方法 sqlalchemy.engine.CursorResult.supports_sane_multi_rowcount()

从方言返回 supports_sane_multi_rowcount

有关背景信息,请参阅 CursorResult.rowcount

方法 sqlalchemy.engine.CursorResult.supports_sane_rowcount()

从方言返回 supports_sane_rowcount

有关背景信息,请参阅 CursorResult.rowcount

属性 sqlalchemy.engine.CursorResult.t

继承自 Result.t 属性 of Result

将“类型化元组”类型过滤器应用于返回的行。

Result.t 属性是调用 Result.tuples() 方法的同义词。

版本 2.0 中新增。

method sqlalchemy.engine.CursorResult.tuples() TupleResult[_TP]

继承自 Result.tuples() 方法 Result

将“类型化元组”类型过滤器应用于返回的行。

此方法在运行时返回相同的 Result 对象,但标注为返回一个 TupleResult 对象,该对象将向 PEP 484 类型工具指示返回的是普通类型 Tuple 实例,而不是行。这允许对 Row 对象进行元组解包和 __getitem__ 访问,并对其进行类型化,适用于那些调用语句本身包含类型信息的用例。

版本 2.0 中新增。

返回值:

在类型化时 TupleResult 类型。

另见

Result.t - 更短的同义词

Row._t - Row 版本

method sqlalchemy.engine.CursorResult.unique(strategy: Callable[[Any], Any] | None = None) Self

继承自 Result.unique() 方法 Result

对由此 Result 返回的对象应用唯一过滤。

当应用此过滤器且没有参数时,返回的行或对象将被过滤,以便仅返回每行一次。用于确定此唯一性的算法默认情况下是整个元组的 Python 散列标识。在某些情况下,可能会使用专门的每个实体散列方案,例如在使用 ORM 时,会应用一个针对返回对象的 主键标识 的方案。

唯一过滤器在所有其他过滤器之后应用,这意味着如果使用诸如 Result.columns()Result.scalars() 方法的方法精炼了返回的列,则唯一化将仅应用于返回的列或列。无论在 Result 对象上调用这些方法的顺序如何,都会发生这种情况。

唯一过滤器还会改变诸如 Result.fetchmany()Result.partitions() 的方法使用的计算。在使用 Result.unique() 时,这些方法将在应用唯一化后继续生成请求的行数或对象数。但是,这必然会影响底层游标或数据源的缓冲行为,因此可能需要多次调用底层 cursor.fetchmany() 才能累积足够的对象,以提供请求大小的唯一集合。

参数:

strategy – 一个可调用对象,它将应用于正在迭代的行或对象,该对象应该返回一个表示行唯一值的 对象。一个 Python set() 用于存储这些标识。如果没有传递,将使用默认的唯一性策略,该策略可能是由此 Result 对象的源组装的。

method sqlalchemy.engine.CursorResult.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

这会影响结果对象迭代时的底层行为,或者其他方法,例如 Result.fetchone() 每次返回一行。来自底层游标或其他数据源的数据将被缓冲到内存中的这些行,然后缓冲的集合将被逐行或按请求的行数生成。每次缓冲区清除后,它将被刷新到这些行,或者如果剩余的行更少,则刷新到剩余的行。

方法 Result.yield_per() 通常与 Connection.execution_options.stream_results 执行选项一起使用,这将允许正在使用的数据库方言使用服务器端游标,如果 DBAPI 支持其默认操作模式之外的特定“服务器端游标”模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,它将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法一次建立固定行缓冲区大小。

执行选项 Connection.execution_options.yield_per 可用于 ORM 操作,Session 导向的使用在 使用 Yield Per 提取大型结果集 中描述。仅核心版本与 Connection 一起使用,从 SQLAlchemy 1.4.40 开始。

版本 1.4 中新增。

参数:

num – 每次缓冲区重新填充时要获取的行数。如果设置为小于 1 的值,则会获取下一个缓冲区的所有行。

class sqlalchemy.engine.FilterResult

Result 的包装,返回除 Row 对象之外的对象,例如字典或标量对象。

FilterResult 是其他结果 API 的公共基础,包括 MappingResultScalarResultAsyncResult

类签名

class sqlalchemy.engine.FilterResult (sqlalchemy.engine.ResultInternal)

method sqlalchemy.engine.FilterResult.close() None

关闭此 FilterResult

1.4.43 版本新增。

attribute sqlalchemy.engine.FilterResult.closed

如果底层的 Result 报告已关闭,则返回 True

1.4.43 版本新增。

method sqlalchemy.engine.FilterResult.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。请参考该方法的文档了解使用说明。

版本 1.4.40 新增: - 添加了 FilterResult.yield_per(),使该方法在所有结果集实现中可用

class sqlalchemy.engine.FrozenResult

表示 Result 对象的“冻结”状态,适用于缓存。

FrozenResult 对象是由任何 Result 对象的 Result.freeze() 方法返回。

每次将 FrozenResult 作为可调用对象调用时,都会从一组固定数据中生成一个新的可迭代 Result 对象

result = connection.execute(query)

frozen = result.freeze()

unfrozen_result_one = frozen()

for row in unfrozen_result_one:
    print(row)

unfrozen_result_two = frozen()
rows = unfrozen_result_two.all()

# ... etc

版本 1.4 中新增。

另见

重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。

merge_frozen_result() - ORM 函数,用于将冻结结果合并回 Session

类签名

class sqlalchemy.engine.FrozenResult (typing.Generic)

class sqlalchemy.engine.IteratorResult

一个 Result,从 Python 迭代器获取 Row 对象或类似行数据。

版本 1.4 中新增。

成员

closed

attribute sqlalchemy.engine.IteratorResult.closed

如果此 IteratorResult 已关闭,则返回 True

1.4.43 版本新增。

class sqlalchemy.engine.MergedResult

一个 Result,从任意数量的 Result 对象合并。

Result.merge() 方法返回。

版本 1.4 中新增。

class sqlalchemy.engine.Result

表示一组数据库结果。

版本 1.4 新增: Result 对象为 SQLAlchemy Core 和 SQLAlchemy ORM 提供了完全更新的使用模型和调用外观。在 Core 中,它构成了 CursorResult 对象的基础,该对象取代了之前的 ResultProxy 接口。使用 ORM 时,通常会使用名为 ChunkedIteratorResult 的更高级对象。

注意

在 SQLAlchemy 1.4 及更高版本中,此对象用于由 Session.execute() 返回的 ORM 结果,它可以单独或在元组状行中生成 ORM 映射对象的实例。请注意,Result 对象不会像传统的 Query 对象那样自动对实例或行进行去重。要对实例或行进行 Python 内部去重,请使用 Result.unique() 修改器方法。

类签名

class sqlalchemy.engine.Result (sqlalchemy.engine._WithKeys, sqlalchemy.engine.ResultInternal)

method sqlalchemy.engine.Result.all() Sequence[Row[_TP]]

以序列形式返回所有行。

调用后会关闭结果集。后续调用将返回空序列。

版本 1.4 中新增。

返回值:

一个 Row 对象的序列。

另见

使用服务器端游标(也称为流结果) - 如何在不完全加载到 Python 中的情况下流式传输大型结果集。

method sqlalchemy.engine.Result.close() None

关闭此 Result

此方法的行为取决于具体实现,默认情况下未实现。该方法通常应结束结果对象正在使用的资源,并使后续迭代或行获取抛出 ResourceClosedError

版本 1.4.27 中新增: - .close() 以前并非所有 Result 类都普遍可用,而只在用于 Core 语句执行返回的 CursorResult 上可用。 由于大多数其他结果对象(即 ORM 使用的对象)在任何情况下都代理了 CursorResult,这使得可以从外部外观关闭底层游标结果,这种情况发生在 ORM 查询使用 yield_per 执行选项时,该选项不会立即耗尽并自动关闭数据库游标。

属性 sqlalchemy.engine.Result.closed

如果此 Result 报告 .closed,则返回 True

1.4.43 版本新增。

方法 sqlalchemy.engine.Result.columns(*col_expressions: _KeyIndexType) Self

确定每行中应返回的列。

此方法可用于限制返回的列,以及重新排序它们。给定的表达式列表通常是一系列整数或字符串键名。它们也可能是适当的ColumnElement对象,对应于给定的语句结构。

在 2.0 版本中更改: 由于 1.4 中的错误,Result.columns()方法的行为不正确,使用单个索引调用该方法会导致Result对象产生标量值,而不是Row对象。在 2.0 版本中,此行为已得到修正,以便使用单个索引调用Result.columns()将产生一个Result对象,该对象继续产生Row对象,其中只包含一列。

例如

statement = select(table.c.x, table.c.y, table.c.z)
result = connection.execute(statement)

for z, y in result.columns('z', 'y'):
    # ...

使用语句本身中的列对象的示例

for z, y in result.columns(
        statement.selected_columns.c.z,
        statement.selected_columns.c.y
):
    # ...

版本 1.4 中新增。

参数:

*col_expressions – 指示要返回的列。 元素可以是整型行索引、字符串列名或对应于 select 结构的适当 ColumnElement 对象。

返回值:

Result对象,并带有给定的修改。

方法 sqlalchemy.engine.Result.fetchall() Sequence[Row[_TP]]

它是Result.all()方法的同义词。

方法 sqlalchemy.engine.Result.fetchmany(size: int | None = None) Sequence[Row[_TP]]

获取多行。

当所有行都被耗尽时,将返回一个空序列。

此方法是为了与 SQLAlchemy 1.x.x 向后兼容而提供的。

要按组获取行,请使用Result.partitions()方法。

返回值:

一个 Row 对象的序列。

方法 sqlalchemy.engine.Result.fetchone() Row[_TP] | None

获取一行。

当所有行都被耗尽时,将返回 None。

此方法是为了与 SQLAlchemy 1.x.x 向后兼容而提供的。

要仅获取结果的第一行,请使用Result.first()方法。要遍历所有行,请直接遍历Result对象。

返回值:

如果未应用任何过滤器,则为Row对象;如果没有任何剩余行,则为None

方法 sqlalchemy.engine.Result.first() Row[_TP] | None

获取第一行,如果不存在行,则返回None

关闭结果集并丢弃剩余行。

注意

默认情况下,此方法返回一行,例如元组。要返回一个唯一的标量值,即第一行的第一列,请使用 Result.scalar() 方法,或者将 Result.scalars()Result.first() 结合使用。

此外,与旧版 ORM Query.first() 方法的行为相反,没有对 SQL 查询应用任何限制,该查询被调用以生成此 Result;对于在生成行之前将结果缓冲到内存中的 DBAPI 驱动程序,所有行都将被发送到 Python 进程,除了第一行以外的所有行都将被丢弃。

返回值:

一个 Row 对象,如果没有任何行,则为 None。

方法 sqlalchemy.engine.Result.freeze() FrozenResult[_TP]

返回一个可调用对象,该对象将在调用时生成此 Result 的副本。

返回的可调用对象是 FrozenResult 的实例。

这用于结果集缓存。此方法必须在结果未被使用时在结果上调用,并且调用此方法将完全使用结果。当从缓存中检索 FrozenResult 时,可以对其调用任意次数,每次它都会针对其存储的行集生成一个新的 Result 对象。

另见

重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。

方法 sqlalchemy.engine.Result.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法 sqlalchemy.engine._WithKeys

返回一个可迭代视图,它生成每个 Row 所表示的字符串键。

这些键可以表示核心语句返回的列的标签,或者 ORM 执行返回的 ORM 类的名称。

视图还可以使用 Python in 运算符测试键包含性,它将测试视图中表示的字符串键,以及列对象等备用键。

版本 1.4 中的变更: 返回一个键视图对象,而不是一个普通列表。

方法 sqlalchemy.engine.Result.mappings() MappingResult

对返回的行应用映射过滤器,返回 MappingResult 的实例。

应用此过滤器时,获取行将返回 RowMapping 对象而不是 Row 对象。

版本 1.4 中新增。

返回值:

一个新的 MappingResult 过滤对象,引用此 Result 对象。

方法 sqlalchemy.engine.Result.merge(*others: Result[Any]) MergedResult[_TP]

将此 Result 与其他兼容的结果对象合并。

返回的对象是 MergedResult 的实例,它将由给定结果对象的迭代器组成。

新结果将使用此结果对象的元数据。后续的结果对象必须针对相同的结果/游标元数据集,否则行为将是未定义的。

方法 sqlalchemy.engine.Result.one() Row[_TP]

返回正好一行,否则引发异常。

如果结果返回零行,则引发 NoResultFound;如果将返回多行,则引发 MultipleResultsFound

注意

此方法默认返回一行,例如元组。要返回正好一个标量值,即第一行的第一列,请使用 Result.scalar_one() 方法,或者将 Result.scalars()Result.one() 结合使用。

版本 1.4 中新增。

返回值:

第一个 Row

引发:

MultipleResultsFoundNoResultFound

方法 sqlalchemy.engine.Result.one_or_none() Row[_TP] | None

返回最多一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound

版本 1.4 中新增。

返回值:

第一个 RowNone(如果没有行可用)。

引发:

MultipleResultsFound

方法 sqlalchemy.engine.Result.partitions(size: int | None = None) Iterator[Sequence[Row[_TP]]]

遍历给定大小的行子列表。

每个列表都将具有给定的大小,不包括要生成的最后一个列表,最后一个列表可能只有少量行。不会生成空列表。

当迭代器完全消耗时,结果对象将自动关闭。

请注意,除非使用 Connection.execution_options.stream_results 执行选项指示驱动程序不应预缓冲结果(如果可能),否则后端驱动程序通常会预先缓冲整个结果。并非所有驱动程序都支持此选项,并且对于不支持的驱动程序,此选项会被静默忽略。

在使用 ORM 时,Result.partitions() 方法与 yield_per 执行选项 结合使用通常在内存方面更有效。这会指示 DBAPI 驱动程序(如果可用)使用服务器端游标,并指示 ORM 加载内部机制一次只构建一定数量的 ORM 对象,然后将其生成出来。

版本 1.4 中新增。

参数:

size – 指示每个列表中最多可包含的行数。 如果为 None,则使用由 Result.yield_per() 方法(如果已调用)设置的值或 Connection.execution_options.yield_per 执行选项,这在本质上是等效的。 如果未设置 yield_per,则使用 Result.fetchmany() 默认值,该值可能是特定于后端的,并且没有明确定义。

返回值:

列表迭代器

方法 sqlalchemy.engine.Result.scalar() Any

获取第一行第一列的值,并关闭结果集。

如果不存在要获取的行,则返回 None

不执行任何验证来测试是否还有其他行。

调用此方法后,对象将完全关闭,例如 CursorResult.close() 方法将被调用。

返回值:

一个 Python 标量值,或者如果不存在其他行,则为 None

方法 sqlalchemy.engine.Result.scalar_one() Any

返回一个标量结果,或者抛出异常。

这等效于调用 Result.scalars() 然后调用 ScalarResult.one().

方法 sqlalchemy.engine.Result.scalar_one_or_none() Any | None

返回一个标量结果,或者 None

这等效于调用 Result.scalars() 然后调用 ScalarResult.one_or_none().

方法 sqlalchemy.engine.Result.scalars(index: _KeyIndexType = 0) ScalarResult[Any]

返回一个 ScalarResult 过滤对象,该对象将返回单个元素而不是 Row 对象。

例如

>>> result = conn.execute(text("select int_id from table"))
>>> result.scalars().all()
[1, 2, 3]

当从 ScalarResult 过滤对象中获取结果时,Result 将返回的单个列行将作为列的值返回。

版本 1.4 中新增。

参数:

index – 指示从每行中获取的列的整数或行键,默认为 0,表示第一列。

返回值:

一个新的 ScalarResult 过滤对象,引用此 Result 对象。

属性 sqlalchemy.engine.Result.t

将“类型化元组”类型过滤器应用于返回的行。

Result.t 属性是调用 Result.tuples() 方法的同义词。

版本 2.0 中新增。

方法 sqlalchemy.engine.Result.tuples() TupleResult[_TP]

将“类型化元组”类型过滤器应用于返回的行。

此方法在运行时返回相同的 Result 对象,但注释为返回 TupleResult 对象,该对象将指示 PEP 484 类型提示工具返回的是普通类型的 Tuple 实例,而不是行。这允许对 Row 对象的元组解包和 __getitem__ 访问进行类型化,适用于调用语句本身包含类型信息的情况。

版本 2.0 中新增。

返回值:

在类型化时 TupleResult 类型。

另见

Result.t - 更短的同义词

Row._t - Row 版本

方法 sqlalchemy.engine.Result.unique(strategy: Callable[[Any], Any] | None = None) Self

对由此 Result 返回的对象应用唯一过滤。

当应用此过滤器且没有参数时,返回的行或对象将被过滤,以便仅返回每行一次。用于确定此唯一性的算法默认情况下是整个元组的 Python 散列标识。在某些情况下,可能会使用专门的每个实体散列方案,例如在使用 ORM 时,会应用一个针对返回对象的 主键标识 的方案。

唯一过滤器在所有其他过滤器之后应用,这意味着如果使用诸如 Result.columns()Result.scalars() 方法的方法精炼了返回的列,则唯一化将仅应用于返回的列或列。无论在 Result 对象上调用这些方法的顺序如何,都会发生这种情况。

唯一过滤器还会改变诸如 Result.fetchmany()Result.partitions() 的方法使用的计算。在使用 Result.unique() 时,这些方法将在应用唯一化后继续生成请求的行数或对象数。但是,这必然会影响底层游标或数据源的缓冲行为,因此可能需要多次调用底层 cursor.fetchmany() 才能累积足够的对象,以提供请求大小的唯一集合。

参数:

strategy – 一个可调用对象,它将应用于正在迭代的行或对象,应该返回一个表示行唯一值的的对象。Python set() 用于存储这些标识。如果没有传递,则使用默认的唯一性策略,该策略可能是由此 Result 对象的来源组装的。

方法 sqlalchemy.engine.Result.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

这会影响结果对象迭代时的底层行为,或者其他方法,例如 Result.fetchone() 每次返回一行。来自底层游标或其他数据源的数据将被缓冲到内存中的这些行,然后缓冲的集合将被逐行或按请求的行数生成。每次缓冲区清除后,它将被刷新到这些行,或者如果剩余的行更少,则刷新到剩余的行。

方法 Result.yield_per() 通常与 Connection.execution_options.stream_results 执行选项一起使用,这将允许正在使用的数据库方言使用服务器端游标,如果 DBAPI 支持其默认操作模式之外的特定“服务器端游标”模式。

提示

考虑使用 Connection.execution_options.yield_per 执行选项,它将同时设置 Connection.execution_options.stream_results 以确保使用服务器端游标,并自动调用 Result.yield_per() 方法一次建立固定行缓冲区大小。

执行选项 Connection.execution_options.yield_per 可用于 ORM 操作,Session 导向的使用在 使用 Yield Per 提取大型结果集 中描述。仅核心版本与 Connection 一起使用,从 SQLAlchemy 1.4.40 开始。

版本 1.4 中新增。

参数:

num – 每次缓冲区被重新填充时要获取的行数。如果设置为小于 1 的值,则获取下一个缓冲区的所有行。

sqlalchemy.engine.ScalarResult

Result 的包装,返回标量值而不是 Row 值。

通过调用 Result.scalars() 方法获取 ScalarResult 对象。

ScalarResult 的一个特殊限制是它没有 fetchone() 方法;由于 fetchone() 的语义是 None 值表示没有更多结果,这与 ScalarResult 不兼容,因为无法区分 None 作为行值还是 None 作为指示器。使用 next(result) 来单独接收值。

方法 sqlalchemy.engine.ScalarResult.all() Sequence[_R]

将所有标量值返回到一个序列中。

等效于 Result.all(),除了返回的是标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.close() None

关闭此 FilterResult

1.4.43 版本新增。

属性 sqlalchemy.engine.ScalarResult.closed

继承自 FilterResult.closed 属性的 FilterResult

如果底层的 Result 报告已关闭,则返回 True

1.4.43 版本新增。

方法 sqlalchemy.engine.ScalarResult.fetchall() Sequence[_R]

ScalarResult.all() 方法的同义词。

方法 sqlalchemy.engine.ScalarResult.fetchmany(size: int | None = None) Sequence[_R]

获取多个对象。

等同于 Result.fetchmany(),只是返回标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.first() _R | None

获取第一个对象,如果没有对象则返回 None

等同于 Result.first(),只是返回标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.one() _R

返回一个且仅一个对象,否则抛出异常。

等同于 Result.one(),只是返回标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.one_or_none() _R | None

返回最多一个对象,否则抛出异常。

等同于 Result.one_or_none(),只是返回标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.partitions(size: int | None = None) Iterator[Sequence[_R]]

遍历给定大小的元素子列表。

等同于 Result.partitions(),只是返回标量值,而不是 Row 对象。

方法 sqlalchemy.engine.ScalarResult.unique(strategy: Callable[[Any], Any] | None = None) Self

对该 ScalarResult 返回的对象应用唯一过滤。

请参阅 Result.unique() 获取用法详细信息。

方法 sqlalchemy.engine.ScalarResult.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。请参考该方法的文档了解使用说明。

版本 1.4.40 新增: - 添加了 FilterResult.yield_per(),使该方法在所有结果集实现中可用

sqlalchemy.engine.MappingResult

Result 的包装,返回字典值而不是 Row 值。

MappingResult 对象是通过调用 Result.mappings() 方法获得的。

类签名

class sqlalchemy.engine.MappingResult (sqlalchemy.engine._WithKeys, sqlalchemy.engine.FilterResult)

方法 sqlalchemy.engine.MappingResult.all() Sequence[RowMapping]

将所有标量值返回到一个序列中。

等同于 Result.all(),只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.close() None

关闭此 FilterResult

1.4.43 版本新增。

属性 sqlalchemy.engine.MappingResult.closed

继承自 FilterResult.closed 属性的 FilterResult

如果底层的 Result 报告已关闭,则返回 True

1.4.43 版本新增。

方法 sqlalchemy.engine.MappingResult.columns(*col_expressions: _KeyIndexType) Self

确定每行中应返回的列。

方法 sqlalchemy.engine.MappingResult.fetchall() Sequence[RowMapping]

MappingResult.all() 方法同义。

方法 sqlalchemy.engine.MappingResult.fetchmany(size: int | None = None) Sequence[RowMapping]

获取多个对象。

等同于 Result.fetchmany() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.fetchone() RowMapping | None

获取一个对象。

等同于 Result.fetchone() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.first() RowMapping | None

获取第一个对象,如果没有对象则返回 None

等同于 Result.first() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法 sqlalchemy.engine._WithKeys

返回一个可迭代视图,它生成每个 Row 所表示的字符串键。

这些键可以表示核心语句返回的列的标签,或者 ORM 执行返回的 ORM 类的名称。

视图还可以使用 Python in 运算符测试键包含性,它将测试视图中表示的字符串键,以及列对象等备用键。

版本 1.4 中的变更: 返回一个键视图对象,而不是一个普通列表。

方法 sqlalchemy.engine.MappingResult.one() RowMapping

返回一个且仅一个对象,否则抛出异常。

等同于 Result.one() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.one_or_none() RowMapping | None

返回最多一个对象,否则抛出异常。

等同于 Result.one_or_none() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.partitions(size: int | None = None) Iterator[Sequence[RowMapping]]

遍历给定大小的元素子列表。

等同于 Result.partitions() ,只是返回 RowMapping 值,而不是 Row 对象。

方法 sqlalchemy.engine.MappingResult.unique(strategy: Callable[[Any], Any] | None = None) Self

MappingResult 返回的对象应用唯一过滤。

请参阅 Result.unique() 获取用法详细信息。

方法 sqlalchemy.engine.MappingResult.yield_per(num: int) Self

配置行提取策略,每次提取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。请参考该方法的文档了解使用说明。

版本 1.4.40 新增: - 添加了 FilterResult.yield_per(),使该方法在所有结果集实现中可用

sqlalchemy.engine.Row

表示单个结果行。

Row 对象表示数据库结果集中的行。在 SQLAlchemy 的 1.x 系列中,它通常与 CursorResult 对象相关联,但在 SQLAlchemy 1.4 中,ORM 也将它用于元组形式的结果。

The Row object strives to behave as much like a Python named tuple as possible. For mapping (i.e., dictionary) behavior on a row, such as testing for containment of keys, refer to the Row._mapping attribute.

另见

Using SELECT Statements - includes examples of selecting rows from SELECT statements.

Changed in version 1.4: Renamed RowProxy to Row. Row is no longer a “proxy” object in that it contains the final form of data within it, and now acts mostly like a named tuple. Mapping-like functionality is moved to the Row._mapping attribute. See RowProxy is no longer a “proxy”; is now called Row and behaves like an enhanced named tuple for background on this change.

类签名

class sqlalchemy.engine.Row (sqlalchemy.engine._py_row.BaseRow, collections.abc.Sequence, typing.Generic)

method sqlalchemy.engine.Row._asdict() Dict[str, Any]

返回一个新的字典,将字段名映射到其对应值。

此方法类似于 Python 具名元组的 ._asdict() 方法,它通过对 Row._mapping 属性应用 dict() 构造函数来工作。

版本 1.4 中新增。

另见

Row._mapping

attribute sqlalchemy.engine.Row._fields

返回由此 Row 表示的字符串键元组。

这些键可以表示核心语句返回的列的标签,或者 ORM 执行返回的 ORM 类的名称。

此属性类似于 Python 具名元组的 ._fields 属性。

版本 1.4 中新增。

另见

Row._mapping

attribute sqlalchemy.engine.Row._mapping

返回此 RowRowMapping

此对象为行中包含的数据提供一致的 Python 映射(即字典)接口。 Row 本身就像一个具名元组。

另见

Row._fields

版本 1.4 中新增。

attribute sqlalchemy.engine.Row._t

Row._tuple() 的同义词。

New in version 2.0.19: - The Row._t attribute supersedes the previous Row.t attribute, which is now underscored to avoid name conflicts with column names in the same way as other named-tuple methods on Row.

另见

Result.t

method sqlalchemy.engine.Row._tuple() _TP

返回此 Row 的“元组”形式。

在运行时,此方法返回“self”; Row 对象本身就是一个具名元组。但是,在类型级别,如果此 Row 被类型化,则“元组”返回类型将是一个 PEP 484 Tuple 数据类型,其中包含有关各个元素的类型信息,支持类型化解包和属性访问。

New in version 2.0.19: - The Row._tuple() method supersedes the previous Row.tuple() method, which is now underscored to avoid name conflicts with column names in the same way as other named-tuple methods on Row.

另见

Row._t - 简写属性表示法

Result.tuples()

attribute sqlalchemy.engine.Row.count
attribute sqlalchemy.engine.Row.index
attribute sqlalchemy.engine.Row.t

Row._tuple() 的同义词。

Deprecated since version 2.0.19: The Row.t attribute is deprecated in favor of Row._t; all Row methods and library-level attributes are intended to be underscored to avoid name conflicts. Please use Row._t.

版本 2.0 中新增。

method sqlalchemy.engine.Row.tuple() _TP

返回此 Row 的“元组”形式。

Deprecated since version 2.0.19: The Row.tuple() method is deprecated in favor of Row._tuple(); all Row methods and library-level attributes are intended to be underscored to avoid name conflicts. Please use Row._tuple().

版本 2.0 中新增。

class sqlalchemy.engine.RowMapping

一个 Mapping,将列名和对象映射到 Row 值。

The RowMapping is available from a Row via the Row._mapping attribute, as well as from the iterable interface provided by the MappingResult object returned by the Result.mappings() method.

RowMapping 为行内容提供 Python 映射(即字典)访问。这包括对特定键(字符串列名或对象)的包含测试的支持,以及键、值和项目的迭代。

for row in result:
    if 'a' in row._mapping:
        print("Column 'a': %s" % row._mapping['a'])

    print("Column b: %s" % row._mapping[table.c.b])

版本 1.4 中新增: The RowMapping 对象替换了以前由数据库结果行提供的类似映射的访问,现在它试图表现得更像一个命名元组。

类签名

class sqlalchemy.engine.RowMapping (sqlalchemy.engine._py_row.BaseRow, collections.abc.Mapping, typing.Generic)

method sqlalchemy.engine.RowMapping.items() ROMappingItemsView

返回底层 Row 中元素的键/值元组视图。

method sqlalchemy.engine.RowMapping.keys() RMKeyView

返回底层 Row 表示的字符串列名的“键”视图。

method sqlalchemy.engine.RowMapping.values() ROMappingKeysValuesView

返回底层 Row 中表示的值的视图。

class sqlalchemy.engine.TupleResult

一个 Result,其类型为返回普通 Python 元组而不是行。

由于 Row 已经以所有方式像元组一样工作,所以此类是一个仅类型类,在运行时仍然使用常规的 Result

类签名

class sqlalchemy.engine.TupleResult (sqlalchemy.engine.FilterResult, sqlalchemy.util.langhelpers.TypingOnly)