异步 I/O (asyncio)

支持 Python asyncio。支持核心和 ORM 使用,使用与 asyncio 兼容的方言。

版本 1.4 中的新功能。

警告

请阅读 Asyncio 平台安装说明 (包括 Apple M1),了解许多平台(包括 **Apple M1 架构**)的重要平台安装说明。

另请参见

核心和 ORM 的异步 IO 支持 - 初始功能公告

Asyncio 集成 - 说明核心和 ORM 在 asyncio 扩展中的工作示例的示例脚本。

Asyncio 平台安装说明 (包括 Apple M1)

asyncio 扩展需要 Python 3。它还依赖于 greenlet 库。此依赖项在常见的机器平台上默认安装,包括

x86_64 aarch64 ppc64le amd64 win32

对于上述平台,greenlet 能够提供预编译的轮子文件。对于其他平台,**greenlet 默认不会安装**;greenlet 的当前文件列表可以在 Greenlet - 下载文件 中查看。请注意,**许多架构被省略,包括 Apple M1**。

要安装 SQLAlchemy 并确保无论使用什么平台 greenlet 依赖项都存在,可以安装 [asyncio] setuptools 扩展,如下所示,这将包含并指示 pip 安装 greenlet

pip install sqlalchemy[asyncio]

请注意,在没有预编译的轮子文件的平台上安装 greenlet 意味着 greenlet 将从源代码构建,这需要 Python 的开发库也存在。

概要 - 核心

对于 Core 使用,create_async_engine() 函数创建一个 AsyncEngine 实例,然后提供传统 Engine API 的异步版本。 AsyncEngine 通过其 AsyncEngine.connect()AsyncEngine.begin() 方法提供一个 AsyncConnection,这两个方法都提供异步上下文管理器。 然后,AsyncConnection 可以使用 AsyncConnection.execute() 方法来调用语句,以提供一个缓冲的 Result,或使用 AsyncConnection.stream() 方法来提供一个流式服务器端 AsyncResult

>>> import asyncio

>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine

>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(meta.drop_all)
...         await conn.run_sync(meta.create_all)
...
...         await conn.execute(
...             t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
...         )
...
...     async with engine.connect() as conn:
...         # select a Result, which will be delivered with buffered
...         # results
...         result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
...         print(result.fetchall())
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE t1 ( name VARCHAR(50) NOT NULL, PRIMARY KEY (name) ) ... INSERT INTO t1 (name) VALUES (?) [...] [('some name 1',), ('some name 2',)] COMMIT BEGIN (implicit) SELECT t1.name FROM t1 WHERE t1.name = ? [...] ('some name 1',) [('some name 1',)] ROLLBACK

上面,AsyncConnection.run_sync() 方法可用于调用特殊的 DDL 函数,例如 MetaData.create_all(),它们不包含可等待的钩子。

提示

建议使用 await 调用 AsyncEngine.dispose() 方法,使用 AsyncEngine 对象在一个将超出上下文并被垃圾回收的范围内,如上面示例中的 async_main 函数所示。 这确保了连接池持有的任何连接将在可等待的上下文中被正确地释放。 与使用阻塞 IO 不同,SQLAlchemy 无法在 __del__ 或弱引用终结器等方法中正确释放这些连接,因为没有机会调用 await。 当引擎超出范围时,如果未显式释放引擎,可能会导致类似于 RuntimeError: Event loop is closed 的形式的警告输出到标准输出,在垃圾回收中。

AsyncConnection 还通过 AsyncConnection.stream() 方法提供一个“流”API,该方法返回一个 AsyncResult 对象。 此结果对象使用服务器端游标并提供异步/等待 API,例如异步迭代器

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))

概要 - ORM

使用 2.0 风格 查询, AsyncSession 类提供完整的 ORM 功能。

在默认使用模式下,必须特别注意避免 延迟加载 或涉及 ORM 关系和列属性的其他过期属性访问;下一节 使用 AsyncSession 时避免隐式 IO 详细说明了这一点。

警告

单个 AsyncSession 实例 **在多个并发任务中使用是不安全的**。 请参阅 使用 AsyncSession 同时执行任务Session 是线程安全的吗?AsyncSession 在并发任务中共享是否安全? 了解背景。

以下示例说明了包括映射器和会话配置在内的完整示例

>>> from __future__ import annotations

>>> import asyncio
>>> import datetime
>>> from typing import List

>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload


>>> class Base(AsyncAttrs, DeclarativeBase):
...     pass

>>> class B(Base):
...     __tablename__ = "b"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
...     data: Mapped[str]

>>> class A(Base):
...     __tablename__ = "a"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     data: Mapped[str]
...     create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
...     bs: Mapped[List[B]] = relationship()

>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
...     async with async_session() as session:
...         async with session.begin():
...             session.add_all(
...                 [
...                     A(bs=[B(data="b1"), B(data="b2")], data="a1"),
...                     A(bs=[], data="a2"),
...                     A(bs=[B(data="b3"), B(data="b4")], data="a3"),
...                 ]
...             )


>>> async def select_and_update_objects(
...     async_session: async_sessionmaker[AsyncSession],
... ) -> None:
...     async with async_session() as session:
...         stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
...         result = await session.execute(stmt)
...
...         for a in result.scalars():
...             print(a, a.data)
...             print(f"created at: {a.create_date}")
...             for b in a.bs:
...                 print(b, b.data)
...
...         result = await session.execute(select(A).order_by(A.id).limit(1))
...
...         a1 = result.scalars().one()
...
...         a1.data = "new data"
...
...         await session.commit()
...
...         # access attribute subsequent to commit; this is what
...         # expire_on_commit=False allows
...         print(a1.data)
...
...         # alternatively, AsyncAttrs may be used to access any attribute
...         # as an awaitable (new in 2.0.13)
...         for b1 in await a1.awaitable_attrs.bs:
...             print(b1, b1.data)


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     # async_sessionmaker: a factory for new AsyncSession objects.
...     # expire_on_commit - don't expire objects after transaction commit
...     async_session = async_sessionmaker(engine, expire_on_commit=False)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(Base.metadata.create_all)
...
...     await insert_objects(async_session)
...     await select_and_update_objects(async_session)
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE a ( id INTEGER NOT NULL, data VARCHAR NOT NULL, create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL, PRIMARY KEY (id) ) ... CREATE TABLE b ( id INTEGER NOT NULL, a_id INTEGER NOT NULL, data VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(a_id) REFERENCES a (id) ) ... COMMIT BEGIN (implicit) INSERT INTO a (data) VALUES (?) RETURNING id, create_date [...] ('a1',) ... INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id [...] (1, 'b2') ... COMMIT BEGIN (implicit) SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id [...] () SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data FROM b WHERE b.a_id IN (?, ?, ?) [...] (1, 2, 3) <A object at ...> a1 created at: ... <B object at ...> b1 <B object at ...> b2 <A object at ...> a2 created at: ... <A object at ...> a3 created at: ... <B object at ...> b3 <B object at ...> b4 SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id LIMIT ? OFFSET ? [...] (1, 0) UPDATE a SET data=? WHERE a.id = ? [...] ('new data', 1) COMMIT new data <B object at ...> b1 <B object at ...> b2

在上面的示例中, AsyncSession 使用可选的 async_sessionmaker 助手实例化,该助手为带有固定参数集的新 AsyncSession 对象提供了一个工厂,这里包括将它与特定数据库 URL 的 AsyncEngine 关联。 然后它被传递给其他方法,这些方法可以在 Python 异步上下文管理器(即 async with: 语句)中使用,以便在块结束时自动关闭;这等效于调用 AsyncSession.close() 方法。

使用 AsyncSession 同时执行任务

AsyncSession 对象是一个 **可变的、有状态的对象**,它代表 **正在进行的单个、有状态的数据库事务**。 使用 asyncio 同时执行任务,例如使用 asyncio.gather() 等 API,应该为 **每个单独的任务** 使用 **单独的** AsyncSession

请参阅 Session 是线程安全的吗?AsyncSession 在并发任务中共享是否安全?,了解关于 SessionAsyncSession 如何在并发工作负载中使用的一般说明。

使用 AsyncSession 时避免隐式 IO

使用传统的 asyncio,应用程序需要避免任何可能发生 IO-on-attribute 访问的点。 可以使用以下技术来帮助实现这一点,其中许多技术在前面的示例中进行了说明。

  • 延迟加载关系、延迟列或表达式,或在过期情况下被访问的属性可以利用 AsyncAttrs 混合。 此混合,当添加到特定类或更普遍地添加到声明性 Base 超类时,提供一个访问器 AsyncAttrs.awaitable_attrs,它将任何属性作为可等待对象提供

    from __future__ import annotations
    
    from typing import List
    
    from sqlalchemy.ext.asyncio import AsyncAttrs
    from sqlalchemy.orm import DeclarativeBase
    from sqlalchemy.orm import Mapped
    from sqlalchemy.orm import relationship
    
    
    class Base(AsyncAttrs, DeclarativeBase):
        pass
    
    
    class A(Base):
        __tablename__ = "a"
    
        # ... rest of mapping ...
    
        bs: Mapped[List[B]] = relationship()
    
    
    class B(Base):
        __tablename__ = "b"
    
        # ... rest of mapping ...

    当未使用预加载时,在 A 的新加载实例上访问 A.bs 集合通常会使用 延迟加载,为了成功,通常会向数据库发出 IO,这将在 asyncio 下失败,因为不允许隐式 IO。 为了在 asyncio 下直接访问此属性,而无需任何先前的加载操作,可以通过指示 AsyncAttrs.awaitable_attrs 前缀将属性作为可等待对象访问

    a1 = (await session.scalars(select(A))).one()
    for b1 in await a1.awaitable_attrs.bs:
        print(b1)

    AsyncAttrs 混合类提供了一个简洁的外部接口,用于访问内部方法,该方法也被 AsyncSession.run_sync() 方法使用。

    新添加于版本 2.0.13。

    另请参见

    AsyncAttrs

  • 可以使用 SQLAlchemy 2.0 中的 只写关系 功能将集合替换为 **只写集合**,这些集合永远不会隐式发出 IO。使用此功能,集合永远不会读取,只能使用显式 SQL 调用进行查询。有关在 asyncio 中使用只写集合的示例,请参阅 Asyncio 集成 部分中的示例 async_orm_writeonly.py

    当使用只写集合时,程序的行为在集合方面是简单且易于预测的。然而,缺点是,没有内置系统用于一次加载许多这样的集合,这需要手动执行。因此,以下许多要点都针对在 asyncio 中使用传统延迟加载关系时的特定技术,这需要更多小心。

  • 如果不使用 AsyncAttrs,关系可以使用 lazy="raise" 声明,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 预加载

  • 最有用的预加载策略是 selectinload() 预加载器,它在前面的示例中使用,以便在 await session.execute() 调用范围内预加载 A.bs 集合。

    stmt = select(A).options(selectinload(A.bs))
  • 在构造新对象时,**集合始终被分配一个默认的空集合**,例如上面示例中的列表。

    A(bs=[], data="a2")

    这允许在上面 A 对象上的 .bs 集合在 A 对象被刷新时存在且可读;否则,当 A 被刷新时,.bs 将被卸载,并在访问时引发错误。

  • AsyncSession 使用 Session.expire_on_commit 设置为 False 进行配置,以便我们可以在调用 AsyncSession.commit() 后访问对象上的属性,例如最后一行中我们访问属性的代码。

    # create AsyncSession with expire_on_commit=False
    async_session = AsyncSession(engine, expire_on_commit=False)
    
    # sessionmaker version
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    
    async with async_session() as session:
        result = await session.execute(select(A).order_by(A.id))
    
        a1 = result.scalars().first()
    
        # commit would normally expire all attributes
        await session.commit()
    
        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)

其他指南包括

  • 应该避免使用 AsyncSession.expire(),而应该使用 AsyncSession.refresh();**如果**绝对需要过期。过期通常**不**需要,因为当使用 asyncio 时,Session.expire_on_commit 通常应该设置为 False

  • 可以使用 AsyncSession.refresh() **显式加载** asyncio 下的延迟加载关系,**如果**将所需的属性名称显式传递给 Session.refresh.attribute_names,例如

    # assume a_obj is an A that has lazy loaded A.bs collection
    a_obj = await async_session.get(A, [1])
    
    # force the collection to load by naming it in attribute_names
    await async_session.refresh(a_obj, ["bs"])
    
    # collection is present
    print(f"bs collection: {a_obj.bs}")

    当然,最好事先使用预加载,以便在无需延迟加载的情况下设置集合。

    新添加于版本 2.0.4: 添加了对 AsyncSession.refresh() 和底层 Session.refresh() 方法的支持,以强制加载延迟加载的关系(如果它们在 Session.refresh.attribute_names 参数中被显式命名)。在以前的版本中,即使在参数中命名,关系也会被静默地跳过。

  • 避免使用在 级联 中记录的 all 级联选项,而应该显式列出所需的级联功能。 all 级联选项意味着除其他之外还意味着 refresh-expire 设置,这意味着 AsyncSession.refresh() 方法将使相关对象上的属性过期,但不一定刷新这些相关对象(假设在 relationship() 中未配置预加载),使它们处于过期状态。

  • 除了上面提到的 relationship() 结构之外,还应该为 deferred() 列(如果使用)采用适当的加载器选项。有关延迟加载列的背景信息,请参阅 使用列延迟加载限制加载的列

  • 动态关系加载器 中描述的“动态”关系加载器策略默认情况下与 asyncio 方法不兼容。它只能在 AsyncSession.run_sync() 方法(在 在 asyncio 下运行同步方法和函数 中描述)中调用,或使用其 .statement 属性获取一个普通的 select。

    user = await session.get(User, 42)
    addresses = (await session.scalars(user.addresses.statement)).all()
    stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
    addresses_filter = (await session.scalars(stmt)).all()

    在 SQLAlchemy 2.0 版本中引入的 只写 技术与 asyncio 完全兼容,应该优先使用。

    另请参见

    “动态”关系加载器被“只写”取代 - 有关迁移到 2.0 风格的说明

  • 如果在不支持 RETURNING 的数据库(例如 MySQL 8)中使用 asyncio,除非使用 Mapper.eager_defaults 选项,否则新刷新的对象上将无法使用服务器默认值(例如生成的 timestamps)。在 SQLAlchemy 2.0 中,此行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样的后端,这些后端使用 RETURNING 在插入行时获取新值。

在 asyncio 下运行同步方法和函数

深层 Alchemy

这种方法本质上是在公开 SQLAlchemy 能够提供 asyncio 接口的机制。虽然在技术上没有问题,但总体来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 被调用的编程语句**必须**具有 await 调用,否则程序不会明确地表明 IO 可能会在每一行发生。这种方法并没有改变这个基本思想,只是它允许将一系列同步 IO 指令在函数调用的范围内免除此规则,本质上是将其捆绑成一个可等待的单个单元。

作为在 asyncio 事件循环中集成传统 SQLAlchemy“延迟加载”的另一种方式,提供了一个名为 AsyncSession.run_sync()可选方法,该方法将在 greenlet 中运行任何 Python 函数,其中传统的同步编程概念将被转换为在到达数据库驱动程序时使用 await。一个假设的方法是,面向 asyncio 的应用程序可以将与数据库相关的代码打包到函数中,这些函数使用 AsyncSession.run_sync() 调用。

修改上面的示例,如果我们没有为 A.bs 集合使用 selectinload(),我们可以在一个单独的函数中完成对这些属性访问的处理。

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine


def fetch_and_update_objects(session):
    """run traditional sync-style ORM code in a function that will be
    invoked within an awaitable.

    """

    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.

    stmt = select(A)

    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)

        # lazy loads
        for b1 in a1.bs:
            print(b1)

    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()

    a1.data = "new data"


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        await session.run_sync(fetch_and_update_objects)

        await session.commit()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

在“同步”运行器中运行某些函数的方法与在基于事件的编程库(如 gevent)之上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。差异如下:

  1. 与使用 gevent 不同,我们可以继续使用标准 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到 gevent 事件循环中。

  2. 没有任何“猴子补丁”。上面的示例使用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用 Python 内置的 asyncio.Queue 来池化连接。

  3. 程序可以在异步/等待代码和使用同步代码的包含函数之间自由切换,几乎没有性能损失。没有“线程执行程序”或任何其他等待者或同步在使用中。

  4. 底层的网络驱动程序也使用纯 Python asyncio 概念,没有像 geventeventlet 提供的第三方网络库在使用中。

使用 asyncio 扩展的事件

SQLAlchemy 事件系统 未被 asyncio 扩展直接公开,这意味着还没有 SQLAlchemy 事件处理程序的“异步”版本。

但是,由于 asyncio 扩展围绕着通常的同步 SQLAlchemy API,因此常规的“同步”样式事件处理程序可以像未使用 asyncio 时一样自由使用。

如下所述,目前有两种针对面向 asyncio 的 API 注册事件的策略。

在 asyncio 上下文中运行的事件处理程序中,对象(如 Connection)将继续以它们通常的“同步”方式工作,无需使用 awaitasync;当消息最终由 asyncio 数据库适配器接收时,调用方式会透明地转换回 asyncio 调用方式。对于传递 DBAPI 级连接的事件,例如 PoolEvents.connect(),该对象是一个符合 pep-249 的“连接”对象,它将同步样式调用转换为 asyncio 驱动程序。

使用异步引擎/会话/会话制造商的事件侦听器示例

以下是一些与面向异步的 API 结构相关的同步样式事件处理程序的示例。

  • 异步引擎上的核心事件

    在此示例中,我们将访问 AsyncEngine.sync_engine 属性作为 ConnectionEventsPoolEvents 的目标。

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.engine import Engine
    from sqlalchemy.ext.asyncio import create_async_engine
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    
    # connect event on instance of Engine
    @event.listens_for(engine.sync_engine, "connect")
    def my_on_connect(dbapi_con, connection_record):
        print("New DBAPI connection:", dbapi_con)
        cursor = dbapi_con.cursor()
    
        # sync style API use for adapted DBAPI connection / cursor
        cursor.execute("select 'execute from event'")
        print(cursor.fetchone()[0])
    
    
    # before_execute event on all Engine instances
    @event.listens_for(Engine, "before_execute")
    def my_before_execute(
        conn,
        clauseelement,
        multiparams,
        params,
        execution_options,
    ):
        print("before execute!")
    
    
    async def go():
        async with engine.connect() as conn:
            await conn.execute(text("select 1"))
        await engine.dispose()
    
    
    asyncio.run(go())

    输出

    New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
    execute from event
    before execute!
  • 异步会话上的 ORM 事件

    在此示例中,我们将访问 AsyncSession.sync_session 作为 SessionEvents 的目标。

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.orm import Session
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    session = AsyncSession(engine)
    
    
    # before_commit event on instance of Session
    @event.listens_for(session.sync_session, "before_commit")
    def my_before_commit(session):
        print("before commit!")
    
        # sync style API use on Session
        connection = session.connection()
    
        # sync style API use on Connection
        result = connection.execute(text("select 'execute from event'"))
        print(result.first())
    
    
    # after_commit event on all Session instances
    @event.listens_for(Session, "after_commit")
    def my_after_commit(session):
        print("after commit!")
    
    
    async def go():
        await session.execute(text("select 1"))
        await session.commit()
    
        await session.close()
        await engine.dispose()
    
    
    asyncio.run(go())

    输出

    before commit!
    execute from event
    after commit!
  • 异步会话制造商上的 ORM 事件

    对于此用例,我们将 sessionmaker 作为事件目标,然后使用 async_sessionmaker.sync_session_class 参数将其分配给 async_sessionmaker

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy.ext.asyncio import async_sessionmaker
    from sqlalchemy.orm import sessionmaker
    
    sync_maker = sessionmaker()
    maker = async_sessionmaker(sync_session_class=sync_maker)
    
    
    @event.listens_for(sync_maker, "before_commit")
    def before_commit(session):
        print("before commit")
    
    
    async def main():
        async_session = maker()
    
        await async_session.commit()
    
    
    asyncio.run(main())

    输出

    before commit

在连接池和其他事件中使用仅可等待的驱动程序方法

如上一节所述,围绕 PoolEvents 事件处理程序的事件处理程序接收同步样式的“DBAPI”连接,这是 SQLAlchemy asyncio 方言提供的包装器对象,用于将底层的 asyncio“驱动程序”连接适配为 SQLAlchemy 内部可以使用的一个连接。当用户定义的事件处理程序实现需要直接使用最终的“驱动程序”连接时,就会出现一个特殊的用例,在该驱动程序连接上使用仅可等待的方法。一个这样的例子是 asyncpg 驱动程序提供的 .set_type_codec() 方法。

为了适应这种用例,SQLAlchemy 的 AdaptedConnection 类提供了一个方法 AdaptedConnection.run_async(),该方法允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待的函数。此方法与允许同步样式方法在异步下运行的 AsyncConnection.run_sync() 方法完全类似。

AdaptedConnection.run_async() 应该传递一个函数,该函数接受最内层的“驱动程序”连接作为单个参数,并返回一个可等待对象,该对象将由 AdaptedConnection.run_async() 方法调用。给定函数本身不需要声明为 async;它可以是 Python lambda:,因为返回的可等待值将在返回后被调用。

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(...)


@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )

在上面,传递给 register_custom_types 事件处理程序的对象是 AdaptedConnection 的实例,它提供了一个类似 DBAPI 的接口,用于访问底层的仅异步驱动程序级连接对象。 AdaptedConnection.run_async() 方法然后提供对可等待环境的访问,在该环境中,可以对底层的驱动程序级连接进行操作。

版本 1.4.30 中新增。

使用多个 asyncio 事件循环

在使用默认池实现时,使用多个事件循环(例如,在将 asyncio 与多线程结合使用的不常见情况下)的应用程序不应将相同的 AsyncEngine 与不同的事件循环共享。

如果将 AsyncEngine 从一个事件循环传递到另一个事件循环,则应在将其重新用于新的事件循环之前调用方法 AsyncEngine.dispose()。如果未这样做,可能会导致 RuntimeError,例如 Task <Task pending ...> got Future attached to a different loop

如果必须在不同的循环之间共享相同的引擎,则应将其配置为使用 NullPool 禁用池,以防止引擎多次使用任何连接。

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)

使用 asyncio 范围会话

在使用 scoped_session 对象的线程化 SQLAlchemy 中使用的“范围会话”模式也适用于 asyncio,使用一个称为 async_scoped_session 的调整版本。

提示

SQLAlchemy 通常不建议对新开发使用“范围”模式,因为它依赖于可变的全局状态,并且在线程或任务中的工作完成后,必须显式地拆除这些状态。尤其是在使用 asyncio 时,最好将 AsyncSession 直接传递给需要它的可等待函数。

在使用 async_scoped_session 时,由于 asyncio 上下文中没有“线程本地”的概念,因此必须将“scopefunc”参数提供给构造函数。下面的示例说明了为此目的使用 asyncio.current_task() 函数。

from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
)

async_session_factory = async_sessionmaker(
    some_async_engine,
    expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=current_task,
)
some_async_session = AsyncScopedSession()

警告

async_scoped_session 使用的“scopefunc”在任务中被调用**任意多次**,每次底层的 AsyncSession 被访问时都会被调用一次。因此,该函数应该**幂等**并且轻量级,并且不应尝试创建或修改任何状态,例如建立回调等。

警告

在范围内使用 current_task() 作为“键”要求从最外层的可等待函数中调用 async_scoped_session.remove() 方法,以确保在任务完成后从注册表中删除该键,否则任务句柄以及 AsyncSession 将保留在内存中,本质上会造成内存泄漏。请参见以下示例,其中说明了 async_scoped_session.remove() 的正确使用方法。

async_scoped_session 包含与 scoped_session 相似的**代理行为**,这意味着它可以直接作为 AsyncSession 来对待,请记住,通常的 await 关键字是必需的,包括 async_scoped_session.remove() 方法。

async def some_function(some_async_session, some_object):
    # use the AsyncSession directly
    some_async_session.add(some_object)

    # use the AsyncSession via the context-local proxy
    await AsyncScopedSession.commit()

    # "remove" the current proxied AsyncSession for the local context
    await AsyncScopedSession.remove()

版本 1.4.19 中新增。

使用 Inspector 检查模式对象

SQLAlchemy 尚未提供 Inspector 的 asyncio 版本(在 使用 Inspector 进行细粒度反射 中介绍),但是可以通过利用 AsyncConnection.run_sync() 方法在 asyncio 上下文中使用现有接口 AsyncConnection

import asyncio

from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")


def use_inspector(conn):
    inspector = inspect(conn)
    # use the inspector
    print(inspector.get_view_names())
    # return any value to the caller
    return inspector.get_table_names()


async def async_main():
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)


asyncio.run(async_main())

引擎 API 文档

对象名称 描述

async_engine_from_config(configuration[, prefix], **kwargs)

使用配置字典创建一个新的 AsyncEngine 实例。

AsyncConnection

AsyncConnection

AsyncEngine

AsyncEngine

AsyncTransaction

AsyncTransaction

create_async_engine(url, **kw)

创建一个新的异步引擎实例。

create_async_pool_from_url(url, **kwargs)

创建一个新的异步引擎实例。

function sqlalchemy.ext.asyncio.create_async_engine(url: str | URL, **kw: Any) AsyncEngine

创建一个新的异步引擎实例。

传递给 create_async_engine() 的参数与传递给 create_engine() 函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg

版本 1.4 中的新功能。

参数:

async_creator

一个异步可调用函数,它返回一个驱动程序级的 asyncio 连接。如果给定,该函数不应接受任何参数,并应从底层的 asyncio 数据库驱动程序返回一个新的 asyncio 连接;该连接将被包装在适当的结构中以与 AsyncEngine 一起使用。请注意,URL 中指定的参数不会在此处应用,并且创建者函数应使用它自己的连接参数。

此参数是 create_engine.creator 的 asyncio 等效参数,它是在 create_engine() 函数中使用的。

版本 2.0.16 中新增。

function sqlalchemy.ext.asyncio.async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any) AsyncEngine

使用配置字典创建一个新的 AsyncEngine 实例。

此函数类似于 SQLAlchemy Core 中的 engine_from_config() 函数,只是它要求的方言必须是与 asyncio 兼容的方言,比如 asyncpg。此函数的参数签名与 engine_from_config() 函数相同。

版本 1.4.29 中新增。

function sqlalchemy.ext.asyncio.create_async_pool_from_url(url: str | URL, **kwargs: Any) Pool

创建一个新的异步引擎实例。

传递给 create_async_pool_from_url() 的参数与传递给 create_pool_from_url() 函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,比如 asyncpg

版本 2.0.10 中新增。

class sqlalchemy.ext.asyncio.AsyncEngine

AsyncEngine

AsyncEngine 是使用 create_async_engine() 函数获取的。

from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncEngine (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.AsyncConnectable)

method sqlalchemy.ext.asyncio.AsyncEngine.begin() AsyncIterator[AsyncConnection]

返回一个上下文管理器,当进入时将提供一个带有已建立的 AsyncTransactionAsyncConnection

例如:

async with async_engine.begin() as conn:
    await conn.execute(
        text("insert into table (x, y, z) values (1, 2, 3)")
    )
    await conn.execute(text("my_special_procedure(5)"))
method sqlalchemy.ext.asyncio.AsyncEngine.clear_compiled_cache() None

清除与方言相关的编译缓存。

为了 AsyncEngine 类代理 Engine 类。

这 **仅** 适用于通过 create_engine.query_cache_size 参数建立的内置缓存。它不会影响通过 Connection.execution_options.compiled_cache 参数传递的任何字典缓存。

版本 1.4 中的新功能。

method sqlalchemy.ext.asyncio.AsyncEngine.connect() AsyncConnection

返回一个 AsyncConnection 对象。

AsyncConnection 作为异步上下文管理器进入时,它将从底层连接池中获取数据库连接。

async with async_engine.connect() as conn:
    result = await conn.execute(select(user_table))

也可以通过调用 AsyncConnection.start() 方法,在上下文管理器之外启动 AsyncConnection

attribute sqlalchemy.ext.asyncio.AsyncEngine.dialect

用于 AsyncEngine 类代表 Engine.dialect 属性的代理。

method sqlalchemy.ext.asyncio.AsyncEngine.async dispose(close: bool = True) None

释放此 AsyncEngine 使用的连接池。

参数:

close

如果保留其默认值 True,则具有完全关闭所有 **当前已检入** 数据库连接的效果。仍然签出的连接 **不会** 被关闭,但是它们将不再与此 Engine 相关联,因此,当它们被单独关闭时,最终它们关联的 Pool 将被垃圾回收,并且它们将被完全关闭,如果在检入时尚未关闭。

如果设置为 False,则先前连接池将被取消引用,否则不会以任何方式触碰。

另请参见

Engine.dispose()

attribute sqlalchemy.ext.asyncio.AsyncEngine.driver

Engine 使用的 Dialect 的驱动程序名称。

为了 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.echo

True 时,启用此元素的日志输出。

为了 AsyncEngine 类代理 Engine 类。

这将影响为此元素的类的命名空间和对象引用设置 Python 日志级别。布尔值 True 表示将为记录器设置日志级别 logging.INFO,而字符串值 debug 将将日志级别设置为 logging.DEBUG

attribute sqlalchemy.ext.asyncio.AsyncEngine.engine

返回此 Engine

为了 AsyncEngine 类代理 Engine 类。

用于接受 Connection / Engine 对象的旧方案。

method sqlalchemy.ext.asyncio.AsyncEngine.execution_options(**opt: Any) AsyncEngine

返回一个新的 AsyncEngine,它将提供具有给定执行选项的 AsyncConnection 对象。

Engine.execution_options() 代理。有关详细信息,请参见该方法。

method sqlalchemy.ext.asyncio.AsyncEngine.get_execution_options() _ExecuteOptions

获取执行期间生效的非 SQL 选项。

为了 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.name

Engine 使用的 Dialect 的字符串名称。

为了 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.pool

用于 AsyncEngine 类代表 Engine.pool 属性的代理。

method sqlalchemy.ext.asyncio.AsyncEngine.async raw_connection() PoolProxiedConnection

从连接池返回“原始”DBAPI 连接。

attribute sqlalchemy.ext.asyncio.AsyncEngine.sync_engine: Engine

对该 AsyncEngine 代理请求的同步样式 Engine 的引用。

此实例可用作事件目标。

method sqlalchemy.ext.asyncio.AsyncEngine.update_execution_options(**opt: Any) None

更新此 Engine 的默认 execution_options 字典。

为了 AsyncEngine 类代理 Engine 类。

**opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options 参数发送到 create_engine()

attribute sqlalchemy.ext.asyncio.AsyncEngine.url

用于 AsyncEngine 类代表 Engine.url 属性的代理。

class sqlalchemy.ext.asyncio.AsyncConnection

AsyncConnection

AsyncConnection 是使用 AsyncEngine.connect() 方法从 AsyncEngine 获取的。

from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

async with engine.connect() as conn:
    result = await conn.execute(select(table))

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncConnection (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.base.StartableContext, sqlalchemy.ext.asyncio.AsyncConnectable)

method sqlalchemy.ext.asyncio.AsyncConnection.async aclose() None

AsyncConnection.close() 的别名。

AsyncConnection.aclose() 的名称是为了专门支持 Python 标准库的 @contextlib.aclosing 上下文管理器函数。

新版本 2.0.20 中添加。

method sqlalchemy.ext.asyncio.AsyncConnection.begin() AsyncTransaction

在自动开始之前开始事务。

method sqlalchemy.ext.asyncio.AsyncConnection.begin_nested() AsyncTransaction

开始嵌套事务并返回事务句柄。

method sqlalchemy.ext.asyncio.AsyncConnection.async close() None

关闭此 AsyncConnection

这将使事务回滚,前提是事务已处于活动状态。

attribute sqlalchemy.ext.asyncio.AsyncConnection.closed

如果此连接已关闭,则返回 True。

代表 AsyncConnection 类为 Connection 类代理。

method sqlalchemy.ext.asyncio.AsyncConnection.async commit() None

提交当前正在进行的事务。

如果已启动事务,此方法会提交当前事务。如果未启动任何事务,则该方法不执行任何操作,前提是连接处于非无效状态。

每当首次执行语句或调用 Connection.begin() 方法时,都会在 Connection 上自动开始事务。

attribute sqlalchemy.ext.asyncio.AsyncConnection.connection

异步未实现;调用 AsyncConnection.get_raw_connection()

attribute sqlalchemy.ext.asyncio.AsyncConnection.default_isolation_level

与使用的 Dialect 关联的初始连接时隔离级别。

代表 AsyncConnection 类为 Connection 类代理。

此值与 Connection.execution_options.isolation_levelEngine.execution_options.isolation_level 执行选项无关,并且由 Dialect 在创建第一个连接时确定,通过在发出任何其他命令之前对数据库执行 SQL 查询以获取当前隔离级别。

调用此访问器不会调用任何新的 SQL 查询。

另请参见

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

attribute sqlalchemy.ext.asyncio.AsyncConnection.dialect

代表 AsyncConnection 类为 Connection.dialect 属性代理。

method sqlalchemy.ext.asyncio.AsyncConnection.async exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行驱动级别的 SQL 字符串并返回缓冲的 Result

method sqlalchemy.ext.asyncio.AsyncConnection.async execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行 SQL 语句构造并返回缓冲的 Result

参数:
  • object

    要执行的语句。这始终是一个同时处于 ClauseElementExecutable 层次结构中的对象,包括

  • parameters – 将绑定到语句的参数。这可以是参数名称到值的字典,也可以是可变序列(例如列表)字典。当传递字典列表时,底层语句执行将使用 DBAPI cursor.executemany() 方法。当传递单个字典时,将使用 DBAPI cursor.execute() 方法。

  • execution_options – 可选的执行选项字典,它将与语句执行相关联。此字典可以提供 Connection.execution_options() 接受的选项的子集。

返回值:

一个 Result 对象。

method sqlalchemy.ext.asyncio.AsyncConnection.async execution_options(**opt: Any) AsyncConnection

为连接设置非 SQL 选项,这些选项在执行期间生效。

这将返回此 AsyncConnection 对象,其中添加了新选项。

有关此方法的完整详细信息,请参阅 Connection.execution_options()

method sqlalchemy.ext.asyncio.AsyncConnection.get_nested_transaction() AsyncTransaction | None

返回一个 AsyncTransaction,表示当前嵌套(保存点)事务(如果有)。

这使用底层同步连接的 Connection.get_nested_transaction() 方法获取当前 Transaction,然后在新的 AsyncTransaction 对象中进行代理。

版本 1.4.0b2 中的新增功能。

method sqlalchemy.ext.asyncio.AsyncConnection.async get_raw_connection() PoolProxiedConnection

返回此 AsyncConnection 使用的池化的 DBAPI 级连接。

这是一个 SQLAlchemy 连接池代理的连接,然后具有属性 _ConnectionFairy.driver_connection,该属性引用实际的驱动程序连接。它的 _ConnectionFairy.dbapi_connection 反而引用一个 AdaptedConnection 实例,该实例将驱动程序连接适配到 DBAPI 协议。

method sqlalchemy.ext.asyncio.AsyncConnection.get_transaction() AsyncTransaction | None

返回一个 AsyncTransaction 对象,代表当前的事务(如果有)。

此方法使用底层同步连接的 Connection.get_transaction() 方法获取当前的 Transaction 对象,然后将其代理到新的 AsyncTransaction 对象中。

版本 1.4.0b2 中的新增功能。

method sqlalchemy.ext.asyncio.AsyncConnection.in_nested_transaction() bool

如果事务正在进行,则返回 True。

版本 1.4.0b2 中的新增功能。

method sqlalchemy.ext.asyncio.AsyncConnection.in_transaction() bool

如果事务正在进行,则返回 True。

attribute sqlalchemy.ext.asyncio.AsyncConnection.info

返回底层 Connection 对象的 Connection.info 字典。

此字典是自由可写的,用户可以将自定义状态与数据库连接相关联。

此属性仅在 AsyncConnection 对象处于连接状态时可用。如果 AsyncConnection.closed 属性为 True,则访问此属性将引发 ResourceClosedError 错误。

版本 1.4.0b2 中的新增功能。

method sqlalchemy.ext.asyncio.AsyncConnection.async invalidate(exception: BaseException | None = None) None

使与该 Connection 对象关联的底层 DBAPI 连接失效。

有关此方法的详细信息,请参阅 Connection.invalidate() 方法。

attribute sqlalchemy.ext.asyncio.AsyncConnection.invalidated

如果该连接已失效,则返回 True。

代表 AsyncConnection 类为 Connection 类代理。

这并不表示该连接是否在池级别失效,但是

method sqlalchemy.ext.asyncio.AsyncConnection.async rollback() None

回滚当前正在进行的事务。

此方法回滚当前的事务(如果已经启动)。如果没有启动事务,则该方法没有效果。如果已经启动事务并且连接处于失效状态,则使用此方法清除事务。

每当首次执行语句或调用 Connection.begin() 方法时,都会在 Connection 上自动开始事务。

method sqlalchemy.ext.asyncio.AsyncConnection.async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P) _T

调用给定的同步(即非异步)可调用对象,并将同步风格的 Connection 对象作为第一个参数传递。

此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。

例如:

def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
    '''A synchronous function that does not require awaiting

    :param conn: a Core SQLAlchemy Connection, used synchronously

    :return: an optional return value is supported

    '''
    conn.execute(
        some_table.insert().values(int_col=arg1, str_col=arg2)
    )
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    '''an async function that uses awaiting'''

    async with async_engine.begin() as async_conn:
        # run do_something_with_core() with a sync-style
        # Connection, proxied into an awaitable
        return_code = await async_conn.run_sync(do_something_with_core, 5, "strval")
        print(return_code)

此方法通过在专门的 instrument greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。

AsyncConnection.run_sync() 的最基本用法是调用像 MetaData.create_all() 这样的方法,给定一个 AsyncConnection 对象,该对象需要作为 Connection 对象提供给 MetaData.create_all() 方法。

# run metadata.create_all(conn) with a sync-style Connection,
# proxied into an awaitable
with async_engine.begin() as conn:
    await conn.run_sync(metadata.create_all)

注意

提供的可调用对象在 asyncio 事件循环中内联调用,并且将在传统的 IO 调用中阻塞。 此可调用对象中的 IO 应该只调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适配到 greenlet 上下文中。

method sqlalchemy.ext.asyncio.AsyncConnection.async scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) Any

执行一个 SQL 语句构造,并返回一个标量对象。

此方法是调用 Result.scalar() 方法后的简写形式,并在调用 Connection.execute() 方法之前。参数相同。

返回值:

表示返回的第一行第一列的标量 Python 值。

method sqlalchemy.ext.asyncio.AsyncConnection.async scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) ScalarResult[Any]

执行 SQL 语句构造并返回标量对象。

此方法是调用 Result.scalars() 方法后的简写形式,并在调用 Connection.execute() 方法之前。参数相同。

返回值:

一个 ScalarResult 对象。

版本 1.4.24 中新增。

method sqlalchemy.ext.asyncio.AsyncConnection.async start(is_ctxmanager: bool = False) AsyncConnection

在使用 Python with: 块之外,启动此 AsyncConnection 对象的上下文。

method sqlalchemy.ext.asyncio.AsyncConnection.stream(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) AsyncIterator[AsyncResult[Any]]

执行语句并返回一个可等待的,它会生成一个 AsyncResult 对象。

例如:

result = await conn.stream(stmt):
async for row in result:
    print(f"{row}")

The AsyncConnection.stream() 方法支持针对 AsyncResult 对象使用可选的上下文管理器,如

async with conn.stream(stmt) as result:
    async for row in result:
        print(f"{row}")

在上述模式中,即使迭代器因异常抛出而中断,AsyncResult.close() 方法也会无条件地调用。但是,上下文管理器使用仍然是可选的,该函数可以在 async with fn():await fn() 样式中调用。

版本 2.0.0b3 中新增: 添加了上下文管理器支持

返回值:

一个可等待的对象,它将生成一个 AsyncResult 对象。

method sqlalchemy.ext.asyncio.AsyncConnection.stream_scalars(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) AsyncIterator[AsyncScalarResult[Any]]

执行语句并返回一个可等待的,它会生成一个 AsyncScalarResult 对象。

例如:

result = await conn.stream_scalars(stmt)
async for scalar in result:
    print(f"{scalar}")

此方法是调用 AsyncResult.scalars() 方法后的简写形式,并在调用 Connection.stream() 方法之前。参数相同。

The AsyncConnection.stream_scalars() 方法支持针对 AsyncScalarResult 对象使用可选的上下文管理器,如

async with conn.stream_scalars(stmt) as result:
    async for scalar in result:
        print(f"{scalar}")

在上述模式中,即使迭代器因异常抛出而中断,AsyncScalarResult.close() 方法也会无条件地调用。但是,上下文管理器使用仍然是可选的,该函数可以在 async with fn():await fn() 样式中调用。

版本 2.0.0b3 中新增: 添加了上下文管理器支持

返回值:

一个可等待的对象,它将生成一个 AsyncScalarResult 对象。

版本 1.4.24 中新增。

attribute sqlalchemy.ext.asyncio.AsyncConnection.sync_connection: Connection | None

AsyncConnection 代理请求所指向的同步风格 Connection 的引用。

此实例可用作事件目标。

attribute sqlalchemy.ext.asyncio.AsyncConnection.sync_engine: Engine

AsyncConnection 通过其底层 Connection 与之关联的同步风格 Engine 的引用。

此实例可用作事件目标。

class sqlalchemy.ext.asyncio.AsyncTransaction

AsyncTransaction

类签名

class sqlalchemy.ext.asyncio.AsyncTransaction (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.base.StartableContext)

method sqlalchemy.ext.asyncio.AsyncTransaction.async close() None

关闭此 AsyncTransaction

如果此事务是 begin/commit 嵌套中的基本事务,则事务将回滚()。否则,该方法将返回。

此用于取消事务,而不会影响封闭事务的范围。

method sqlalchemy.ext.asyncio.AsyncTransaction.async commit() None

提交此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncTransaction.async rollback() None

回滚此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncTransaction.async start(is_ctxmanager: bool = False) AsyncTransaction

在不使用 Python with: 块的情况下启动此 AsyncTransaction 对象的上下文。

结果集 API 文档

AsyncResult 对象是 Result 对象的异步自适应版本。仅在使用 AsyncConnection.stream()AsyncSession.stream() 方法时返回,这些方法返回一个位于活动数据库游标之上的结果对象。

对象名称 描述

AsyncMappingResult

用于 AsyncResult 的包装器,它返回字典值而不是 Row 值。

AsyncResult

Result 对象的 asyncio 包装器。

AsyncScalarResult

用于 AsyncResult 的包装器,它返回标量值而不是 Row 值。

AsyncTupleResult

一个 AsyncResult,其类型为返回普通 Python 元组而不是行。

class sqlalchemy.ext.asyncio.AsyncResult

Result 对象的 asyncio 包装器。

AsyncResult 仅适用于使用服务器端游标的语句执行。它仅从 AsyncConnection.stream()AsyncSession.stream() 方法返回。

注意

Result 一样,此对象用于 AsyncSession.execute() 返回的 ORM 结果,它可以单独或在元组状行中产生 ORM 映射对象的实例。请注意,这些结果对象不会像传统 Query 对象那样自动对实例或行进行重复数据删除。对于实例或行的 Python 内重复数据删除,请使用 AsyncResult.unique() 修饰符方法。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncResult (sqlalchemy.engine._WithKeys, sqlalchemy.ext.asyncio.AsyncCommon)

method sqlalchemy.ext.asyncio.AsyncResult.async all() Sequence[Row[_TP]]

返回所有行,以列表形式。

调用后关闭结果集。后续调用将返回空列表。

返回值:

一个 Row 对象列表。

method sqlalchemy.ext.asyncio.AsyncResult.async close() None

继承自 AsyncCommon.close() 方法 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncResult.closed

继承自 AsyncCommon.closed 属性 AsyncCommon

代理底层结果对象的 .closed 属性,如果存在,否则引发 AttributeError

新版 2.0.0b3。

method sqlalchemy.ext.asyncio.AsyncResult.columns(*col_expressions: _KeyIndexType) Self

建立应在每行中返回的列。

请参考同步 SQLAlchemy API 中的 Result.columns() 以获取完整的行为描述。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchall() Sequence[Row[_TP]]

AsyncResult.all() 方法的同义词。

新版 2.0。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchmany(size: int | None = None) Sequence[Row[_TP]]

获取多行。

当所有行都已耗尽时,返回空列表。

此方法为了向后兼容 SQLAlchemy 1.x.x 版本提供。

要按组获取行,请使用 AsyncResult.partitions() 方法。

返回值:

一个 Row 对象列表。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchone() Row[_TP] | None

获取一行。

当所有行都已耗尽时,返回 None。

此方法为了向后兼容 SQLAlchemy 1.x.x 版本提供。

要仅获取结果的第一行,请使用 AsyncResult.first() 方法。要遍历所有行,请直接迭代 AsyncResult 对象。

返回值:

一个 Row 对象(如果未应用任何过滤器),或者 None(如果无剩余行)。

method sqlalchemy.ext.asyncio.AsyncResult.async first() Row[_TP] | None

获取第一行,或者如果不存在行,则返回 None

关闭结果集并丢弃剩余行。

注意

此方法默认返回一行,例如元组。要返回一个单一的标量值,即第一行的第一列,请使用 AsyncResult.scalar() 方法,或者将 AsyncResult.scalars()AsyncResult.first() 结合使用。

此外,与传统 ORM Query.first() 方法的行为不同,不会对用于生成此 AsyncResult 的 SQL 查询应用限制;对于在生成行之前将结果缓冲在内存中的 DBAPI 驱动程序,所有行都将发送到 Python 进程,除了第一行以外的所有行都将被丢弃。

返回值:

一个 Row 对象,或者如果无剩余行,则为 None。

method sqlalchemy.ext.asyncio.AsyncResult.async freeze() FrozenResult[_TP]

返回一个可调用对象,该对象在被调用时会生成此 AsyncResult 的副本。

返回的可调用对象是 FrozenResult 的实例。

用于结果集缓存。当结果未被使用时,必须在结果上调用此方法,调用此方法将完全使用结果。当从缓存中检索到 FrozenResult 时,可以对其进行多次调用,每次调用都会产生一个新的 Result 对象,该对象针对其存储的行集。

另请参见

重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。

method sqlalchemy.ext.asyncio.AsyncResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法的 sqlalchemy.engine._WithKeys

返回一个可迭代视图,该视图生成由每个 Row 表示的字符串键。

这些键可以表示核心语句返回的列的标签,或者表示 ORM 执行返回的 ORM 类的名称。

该视图还可以使用 Python in 运算符测试键包含,这将同时测试视图中表示的字符串键,以及列对象等备用键。

1.4 版中的变化: 返回一个键视图对象,而不是一个普通的列表。

method sqlalchemy.ext.asyncio.AsyncResult.mappings() AsyncMappingResult

将映射过滤器应用于返回的行,返回 AsyncMappingResult 的实例。

应用此过滤器后,获取的行将返回 RowMapping 对象,而不是 Row 对象。

返回值:

一个新的 AsyncMappingResult 过滤对象,引用底层的 Result 对象。

method sqlalchemy.ext.asyncio.AsyncResult.async one() Row[_TP]

返回正好一行,否则引发异常。

如果结果没有返回行,则引发 NoResultFound,如果返回多行,则引发 MultipleResultsFound

注意

此方法默认返回一个,例如元组。要返回正好一个标量值,即第一行的第一列,请使用 AsyncResult.scalar_one() 方法,或者结合 AsyncResult.scalars()AsyncResult.one()

版本 1.4 中的新功能。

返回值:

第一个 Row

引发:

MultipleResultsFound, NoResultFound

method sqlalchemy.ext.asyncio.AsyncResult.async one_or_none() Row[_TP] | None

返回最多一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound

版本 1.4 中的新功能。

返回值:

第一个 RowNone(如果没有行可用)。

引发:

MultipleResultsFound

method sqlalchemy.ext.asyncio.AsyncResult.async partitions(size: int | None = None) AsyncIterator[Sequence[Row[_TP]]]

遍历给定大小的行子列表。

返回异步迭代器

async def scroll_results(connection):
    result = await connection.stream(select(users_table))

    async for partition in result.partitions(100):
        print("list of rows: %s" % partition)

有关完整行为描述,请参考同步 SQLAlchemy API 中的 Result.partitions()

method sqlalchemy.ext.asyncio.AsyncResult.async scalar() Any

获取第一行的第一列,并关闭结果集。

如果要获取的行不存在,则返回 None

不执行任何验证以测试是否还有其他行。

调用此方法后,对象将完全关闭,例如,CursorResult.close() 方法将被调用。

返回值:

一个 Python 标量值,或 None(如果没有剩余的行)。

method sqlalchemy.ext.asyncio.AsyncResult.async scalar_one() Any

返回正好一个标量结果,否则引发异常。

这等效于调用 AsyncResult.scalars() 然后调用 AsyncScalarResult.one()

method sqlalchemy.ext.asyncio.AsyncResult.async scalar_one_or_none() Any | None

返回正好一个标量结果,或 None

这等同于调用 AsyncResult.scalars(),然后调用 AsyncScalarResult.one_or_none()

方法 sqlalchemy.ext.asyncio.AsyncResult.scalars(index: _KeyIndexType = 0) AsyncScalarResult[Any]

返回一个 AsyncScalarResult 过滤对象,它将返回单个元素而不是 Row 对象。

有关完整行为描述,请参阅同步 SQLAlchemy API 中的 Result.scalars()

参数:

index – 表示要从每行中获取的列的整数或行键,默认为 0,表示第一列。

返回值:

一个新的 AsyncScalarResult 过滤对象,它引用此 AsyncResult 对象。

属性 sqlalchemy.ext.asyncio.AsyncResult.t

对返回的行应用“类型化元组”类型过滤器。

AsyncResult.t 属性是调用 AsyncResult.tuples() 方法的同义词。

新版 2.0。

方法 sqlalchemy.ext.asyncio.AsyncResult.tuples() AsyncTupleResult[_TP]

对返回的行应用“类型化元组”类型过滤器。

在运行时,此方法返回与 AsyncResult 对象相同的对象,但将其注释为返回一个 AsyncTupleResult 对象,该对象将向 PEP 484 类型工具表明返回的是普通类型化的 Tuple 实例,而不是行。这允许类型化元组解包和 Row 对象的 __getitem__ 访问,适用于调用语句本身包含类型信息的情况。

新版 2.0。

返回值:

类型时 AsyncTupleResult 类型。

另请参见

AsyncResult.t - 简短的同义词

Row.t - Row 版本

方法 sqlalchemy.ext.asyncio.AsyncResult.unique(strategy: _UniqueFilterType | None = None) Self

对此 AsyncResult 返回的对象应用唯一过滤。

有关完整行为描述,请参阅同步 SQLAlchemy API 中的 Result.unique()

方法 sqlalchemy.ext.asyncio.AsyncResult.yield_per(num: int) Self

将行获取策略配置为一次获取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。有关用法说明,请参阅该方法的文档。

版本 1.4.40 新版: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

sqlalchemy.ext.asyncio.AsyncScalarResult

用于 AsyncResult 的包装器,它返回标量值而不是 Row 值。

AsyncScalarResult 对象是通过调用 AsyncResult.scalars() 方法获取的。

有关完整行为描述,请参阅同步 SQLAlchemy API 中的 ScalarResult 对象。

版本 1.4 中的新功能。

类签名

sqlalchemy.ext.asyncio.AsyncScalarResult (sqlalchemy.ext.asyncio.AsyncCommon)

方法 sqlalchemy.ext.asyncio.AsyncScalarResult.async all() Sequence[_R]

将所有标量值作为列表返回。

等同于 AsyncResult.all(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async close() None

继承自 AsyncCommon.close() 方法 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncScalarResult.closed

继承自 AsyncCommon.closed 属性 AsyncCommon

代理底层结果对象的 .closed 属性,如果存在,否则引发 AttributeError

新版 2.0.0b3。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async fetchall() Sequence[_R]

它是 AsyncScalarResult.all() 方法的同义词。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async fetchmany(size: int | None = None) Sequence[_R]

获取多个对象。

等同于 AsyncResult.fetchmany(),但返回标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async first() _R | None

获取第一个对象,如果不存在任何对象,则返回 None

等同于 AsyncResult.first(),但返回标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async one() _R

返回唯一的一个对象,否则抛出异常。

等同于 AsyncResult.one(),但返回标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async one_or_none() _R | None

返回最多一个对象,否则抛出异常。

等同于 AsyncResult.one_or_none(),但返回标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async partitions(size: int | None = None) AsyncIterator[Sequence[_R]]

迭代给定大小的元素子列表。

等同于 AsyncResult.partitions(),但返回标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.unique(strategy: _UniqueFilterType | None = None) Self

对由此 AsyncScalarResult 返回的对象应用唯一筛选。

有关使用方法的详细信息,请参阅 AsyncResult.unique()

method sqlalchemy.ext.asyncio.AsyncScalarResult.yield_per(num: int) Self

将行获取策略配置为一次获取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。有关用法说明,请参阅该方法的文档。

版本 1.4.40 新版: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.ext.asyncio.AsyncMappingResult

用于 AsyncResult 的包装器,它返回字典值而不是 Row 值。

调用 AsyncResult.mappings() 方法会获取 AsyncMappingResult 对象。

有关完整行为描述,请参阅同步 SQLAlchemy API 中的 MappingResult 对象。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncMappingResult (sqlalchemy.engine._WithKeys, sqlalchemy.ext.asyncio.AsyncCommon)

method sqlalchemy.ext.asyncio.AsyncMappingResult.async all() Sequence[RowMapping]

返回所有行,以列表形式。

等同于 AsyncResult.all(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async close() None

继承自 AsyncCommon.close() 方法 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncMappingResult.closed

继承自 AsyncCommon.closed 属性 AsyncCommon

代理底层结果对象的 .closed 属性,如果存在,否则引发 AttributeError

新版 2.0.0b3。

method sqlalchemy.ext.asyncio.AsyncMappingResult.columns(*col_expressions: _KeyIndexType) Self

建立应在每行中返回的列。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchall() Sequence[RowMapping]

AsyncMappingResult.all() 方法的同义词。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchmany(size: int | None = None) Sequence[RowMapping]

获取多行。

等同于 AsyncResult.fetchmany(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchone() RowMapping | None

获取一个对象。

等同于 AsyncResult.fetchone(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async first() RowMapping | None

获取第一个对象,如果不存在任何对象,则返回 None

等同于 AsyncResult.first(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法的 sqlalchemy.engine._WithKeys

返回一个可迭代视图,该视图生成由每个 Row 表示的字符串键。

这些键可以表示核心语句返回的列的标签,或者表示 ORM 执行返回的 ORM 类的名称。

该视图还可以使用 Python in 运算符测试键包含,这将同时测试视图中表示的字符串键,以及列对象等备用键。

1.4 版中的变化: 返回一个键视图对象,而不是一个普通的列表。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async one() RowMapping

返回唯一的一个对象,否则抛出异常。

等同于 AsyncResult.one(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async one_or_none() RowMapping | None

返回最多一个对象,否则抛出异常。

等效于 AsyncResult.one_or_none(),除了返回的是 RowMapping 值而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async partitions(size: int | None = None) AsyncIterator[Sequence[RowMapping]]

迭代给定大小的元素子列表。

等效于 AsyncResult.partitions(),除了返回的是 RowMapping 值而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.unique(strategy: _UniqueFilterType | None = None) Self

对这个 AsyncMappingResult 返回的对象应用唯一过滤。

有关使用方法的详细信息,请参阅 AsyncResult.unique()

method sqlalchemy.ext.asyncio.AsyncMappingResult.yield_per(num: int) Self

将行获取策略配置为一次获取 num 行。

FilterResult.yield_per() 方法是 Result.yield_per() 方法的直接调用。有关用法说明,请参阅该方法的文档。

版本 1.4.40 新版: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.ext.asyncio.AsyncTupleResult

一个 AsyncResult,其类型为返回普通 Python 元组而不是行。

由于 Row 本身已经像元组一样,所以这个类只是一个类型类,运行时仍然使用普通的 AsyncResult

类签名

class sqlalchemy.ext.asyncio.AsyncTupleResult (sqlalchemy.ext.asyncio.AsyncCommon, sqlalchemy.util.langhelpers.TypingOnly)

ORM 会话 API 文档

对象名称 描述

async_object_session(instance)

返回给定实例所属的 AsyncSession

async_scoped_session

提供 AsyncSession 对象的范围管理。

async_session(session)

返回代理给定 Session 对象的 AsyncSession,如果有的话。

async_sessionmaker

一个可配置的 AsyncSession 工厂。

AsyncAttrs

混合类,为所有属性提供一个可等待的访问器。

AsyncSession

Session 的异步版本。

AsyncSessionTransaction

ORM SessionTransaction 对象的包装器。

close_all_sessions()

关闭所有 AsyncSession 会话。

function sqlalchemy.ext.asyncio.async_object_session(instance: object) AsyncSession | None

返回给定实例所属的 AsyncSession

此函数使用同步 API 函数 object_session 来检索引用给定实例的 Session,并从中将其链接到原始的 AsyncSession

如果 AsyncSession 被垃圾回收,则返回值为 None

此功能也适用于 InstanceState.async_session 访问器。

参数:

instance – 一个 ORM 映射实例

返回值:

一个 AsyncSession 对象,或者 None

版本 1.4.18 中的新增内容。

function sqlalchemy.ext.asyncio.async_session(session: Session) AsyncSession | None

返回代理给定 Session 对象的 AsyncSession,如果有的话。

参数:

session – 一个 Session 实例。

返回值:

一个 AsyncSession 实例,或者 None

版本 1.4.18 中的新增内容。

function async sqlalchemy.ext.asyncio.close_all_sessions() None

关闭所有 AsyncSession 会话。

版本 2.0.23 中新增。

另请参见

close_all_sessions()

class sqlalchemy.ext.asyncio.async_sessionmaker

一个可配置的 AsyncSession 工厂。

async_sessionmaker 工厂以与 sessionmaker 工厂相同的方式工作,在调用时生成新的 AsyncSession 对象,根据此处建立的配置参数创建它们。

例如:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker

async def run_some_sql(async_session: async_sessionmaker[AsyncSession]) -> None:
    async with async_session() as session:
        session.add(SomeObject(data="object"))
        session.add(SomeOtherObject(name="other object"))
        await session.commit()

async def main() -> None:
    # an AsyncEngine, which the AsyncSession will use for connection
    # resources
    engine = create_async_engine('postgresql+asyncpg://scott:tiger@localhost/')

    # create a reusable factory for new AsyncSession instances
    async_session = async_sessionmaker(engine)

    await run_some_sql(async_session)

    await engine.dispose()

async_sessionmaker 非常有用,因此程序的不同部分可以使用预先建立的固定配置创建新的 AsyncSession 对象。请注意,如果不使用 async_sessionmaker,也可以直接实例化 AsyncSession 对象。

版本 2.0 中新增: async_sessionmaker 提供了一个 sessionmaker 类,专门用于 AsyncSession 对象,包括 pep-484 类型支持。

另请参见

概要 - ORM - 展示了示例用法

sessionmaker - sessionmaker 架构的总体概述

sessionmaker 架构

打开和关闭会话 - 使用 sessionmaker 创建会话的介绍性文本。

类签名

class sqlalchemy.ext.asyncio.async_sessionmaker (typing.Generic)

method sqlalchemy.ext.asyncio.async_sessionmaker.__call__(**local_kw: Any) _AS

使用此 async_sessionmaker 中建立的配置生成一个新的 AsyncSession 对象。

在 Python 中,当一个对象以与函数相同的方式被“调用”时,会调用它的 __call__ 方法。

AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False)
session = AsyncSession()  # invokes sessionmaker.__call__()
method sqlalchemy.ext.asyncio.async_sessionmaker.__init__(bind: Optional[_AsyncSessionBind] = None, *, class_: Type[_AS] = <class 'sqlalchemy.ext.asyncio.session.AsyncSession'>, autoflush: bool = True, expire_on_commit: bool = True, info: Optional[_InfoType] = None, **kw: Any)

构造一个新的 async_sessionmaker

除了 class_ 之外,这里的所有参数都对应于 Session 直接接受的参数。有关参数的更多详细信息,请参阅 AsyncSession.__init__() 文档字符串。

method sqlalchemy.ext.asyncio.async_sessionmaker.begin() _AsyncSessionContextManager[_AS]

生成一个上下文管理器,它既提供一个新的 AsyncSession,也提供一个提交事务。

例如:

async def main():
    Session = async_sessionmaker(some_engine)

    async with Session.begin() as session:
        session.add(some_object)

    # commits transaction, closes session
method sqlalchemy.ext.asyncio.async_sessionmaker.configure(**new_kw: Any) None

(重新)配置此 async_sessionmaker 的参数。

例如:

AsyncSession = async_sessionmaker(some_engine)

AsyncSession.configure(bind=create_async_engine('sqlite+aiosqlite://'))
class sqlalchemy.ext.asyncio.async_scoped_session

提供 AsyncSession 对象的范围管理。

有关用法详细信息,请参阅 使用 asyncio 范围会话 部分。

版本 1.4.19 中新增。

类签名

class sqlalchemy.ext.asyncio.async_scoped_session (typing.Generic)

method sqlalchemy.ext.asyncio.async_scoped_session.__call__(**kw: Any) _AS

返回当前的 AsyncSession,如果不存在则使用 scoped_session.session_factory 创建。

参数:

**kw – 关键字参数将传递给 scoped_session.session_factory 可调用对象,如果不存在现有的 AsyncSession。如果 AsyncSession 存在并且传递了关键字参数,则会引发 InvalidRequestError

method sqlalchemy.ext.asyncio.async_scoped_session.__init__(session_factory: async_sessionmaker[_AS], scopefunc: Callable[[], Any])

构造一个新的 async_scoped_session

参数:
  • session_factory – 用于创建新的 AsyncSession 实例的工厂。这通常是 async_sessionmaker 的实例,但并非必须。

  • scopefunc – 定义当前作用域的函数。像 asyncio.current_task 这样的函数在这里可能很有用。

method sqlalchemy.ext.asyncio.async_scoped_session.async aclose() None

AsyncSession.close() 的别名。

代表 async_scoped_session 类,为 AsyncSession 类代理。

AsyncSession.aclose() 的名称专门用于支持 Python 标准库 @contextlib.aclosing 上下文管理器函数。

新版本 2.0.20 中添加。

method sqlalchemy.ext.asyncio.async_scoped_session.add(instance: object, _warn: bool = True) None

将一个对象放入此 Session 中。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

当对象处于 瞬态 状态并传递给 Session.add() 方法时,它们会转入 待处理 状态,直到下次刷新,此时它们会转入 持久化 状态。

当对象处于 分离 状态并传递给 Session.add() 方法时,它们会直接转入 持久化 状态。

如果 Session 使用的事务回滚,则传递给 Session.add() 方法时处于瞬态状态的对象将被移回 瞬态 状态,并且将不再存在于此 Session 中。

方法 sqlalchemy.ext.asyncio.async_scoped_session.add_all(instances: Iterable[object]) None

将给定的实例集合添加到此 Session 中。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

有关一般行为描述,请参阅 Session.add() 的文档。

属性 sqlalchemy.ext.asyncio.async_scoped_session.autoflush

代表 AsyncSession 类的 Session.autoflush 属性的代理。

代表 async_scoped_session 类,为 AsyncSession 类代理。

方法 sqlalchemy.ext.asyncio.async_scoped_session.begin() AsyncSessionTransaction

返回一个 AsyncSessionTransaction 对象。

代表 async_scoped_session 类,为 AsyncSession 类代理。

当进入 AsyncSessionTransaction 对象时,底层 Session 会执行“begin”操作

async with async_session.begin():
    # .. ORM transaction is begun

请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是在按需基础上开始的。但是,begin 块是异步的,以适应可能执行 IO 的 SessionEvents.after_transaction_create() 事件钩子。

有关 ORM begin 的一般描述,请参阅 Session.begin()

方法 sqlalchemy.ext.asyncio.async_scoped_session.begin_nested() AsyncSessionTransaction

返回一个 AsyncSessionTransaction 对象,该对象将开始一个“嵌套”事务,例如 SAVEPOINT。

代表 async_scoped_session 类,为 AsyncSession 类代理。

行为与 AsyncSession.begin() 相同。

有关 ORM begin nested 的一般描述,请参阅 Session.begin_nested()

另请参见

可序列化隔离/保存点/事务性 DDL (asyncio 版本) - SQLite asyncio 驱动程序需要特殊的解决方法才能使 SAVEPOINT 正确工作。

属性 sqlalchemy.ext.asyncio.async_scoped_session.bind

代表 async_scoped_session 类的 AsyncSession.bind 属性的代理。

方法 sqlalchemy.ext.asyncio.async_scoped_session.async close() None

关闭此 AsyncSession 使用的事务资源和 ORM 对象。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.close() - “close” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 语义的详细信息。

async classmethod sqlalchemy.ext.asyncio.async_scoped_session.close_all() None

关闭所有 AsyncSession 会话。

代表 async_scoped_session 类,为 AsyncSession 类代理。

自版本 2.0 起弃用: AsyncSession.close_all() 方法已弃用,将在未来版本中删除。请参阅 close_all_sessions()

方法 sqlalchemy.ext.asyncio.async_scoped_session.async commit() None

提交当前进行的事务。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.commit() - “commit” 的主要文档

方法 sqlalchemy.ext.asyncio.async_scoped_session.configure(**kwargs: Any) None

重新配置此 scoped_session 使用的 sessionmaker

参见 sessionmaker.configure()

方法 sqlalchemy.ext.asyncio.async_scoped_session.async connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) AsyncConnection

返回对应于此 Session 对象的事务状态的 AsyncConnection 对象。

代表 async_scoped_session 类,为 AsyncSession 类代理。

此方法也可用于为当前事务使用的数据库连接建立执行选项。

版本 1.4.24 中的新内容: 添加了 **kw 参数,这些参数会传递给底层的 Session.connection() 方法。

另请参见

Session.connection() - “连接” 的主要文档

方法 sqlalchemy.ext.asyncio.async_scoped_session.async delete(instance: object) None

将实例标记为已删除。

代表 async_scoped_session 类,为 AsyncSession 类代理。

数据库删除操作在 flush() 时发生。

由于此操作可能需要沿着未加载的关系级联,因此它是可等待的,以允许这些查询发生。

另请参见

Session.delete() - 删除的主要文档

属性 sqlalchemy.ext.asyncio.async_scoped_session.deleted

Session 中标记为“已删除”的所有实例的集合。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

属性 sqlalchemy.ext.asyncio.async_scoped_session.dirty

所有被认为是脏的持久实例的集合。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

例如:

some_mapped_object in session.dirty

当实例被修改但未被删除时,它们被认为是脏的。

请注意,这种“脏”计算是“乐观”的;大多数属性设置或集合修改操作会将实例标记为“脏”并将其放入此集合中,即使属性的值没有净变化。在冲洗时,每个属性的值与其之前保存的值进行比较,如果不存在净变化,则不会发生 SQL 操作(这是一个更昂贵的操作,因此只在冲洗时执行)。

要检查实例是否对其属性具有可操作的净变化,请使用 Session.is_modified() 方法。

方法 sqlalchemy.ext.asyncio.async_scoped_session.async execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Result[Any]

执行语句并返回一个缓冲的 Result 对象。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.execute() - execute 的主要文档

方法 sqlalchemy.ext.asyncio.async_scoped_session.expire(instance: object, attribute_names: Iterable[str] | None = None) None

使实例上的属性失效。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

将实例的属性标记为已过期。下次访问过期的属性时,将向 Session 对象的当前事务上下文发出查询,以加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与在同一事务中先前读取的值相同的值,而不管该事务外部的数据库状态是否发生变化。

要使 Session 中的所有对象同时失效,请使用 Session.expire_all()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态失效,以便为新事务加载新状态。 因此,调用 Session.expire() 仅在当前事务中发出非 ORM SQL 语句的特定情况下才有意义。

参数:
  • instance – 要刷新的实例。

  • attribute_names – 可选的字符串属性名称列表,指示要使失效的属性子集。

method sqlalchemy.ext.asyncio.async_scoped_session.expire_all() None

使此 Session 中的所有持久实例失效。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

当持久实例上的任何属性在下一次访问时,将使用 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有失效属性。 请注意,高度隔离的事务将返回与在该事务中先前读取的值相同的值,无论该事务之外的数据库状态发生任何变化。

要使失效单个对象和这些对象上的单个属性,请使用 Session.expire()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态失效,以便为新事务加载新状态。 因此,通常不需要调用 Session.expire_all(),假设事务是隔离的。

method sqlalchemy.ext.asyncio.async_scoped_session.expunge(instance: object) None

从此 Session 中删除 instance

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

这将释放对该实例的所有内部引用。 级联将根据expunge级联规则应用。

method sqlalchemy.ext.asyncio.async_scoped_session.expunge_all() None

从此 Session 中删除所有对象实例。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

这等效于对此 Session 中的所有对象调用 expunge(obj)

method sqlalchemy.ext.asyncio.async_scoped_session.async flush(objects: Sequence[Any] | None = None) None

将所有对象更改刷新到数据库中。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.flush() - 刷新主文档

method sqlalchemy.ext.asyncio.async_scoped_session.async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O | None

返回基于给定主键标识符的实例,如果未找到,则返回 None

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.get() - 获取主文档

method sqlalchemy.ext.asyncio.async_scoped_session.get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) Engine | Connection

返回同步代理的 Session 绑定的“bind”。

代表 async_scoped_session 类,为 AsyncSession 类代理。

Session.get_bind() 方法不同,此方法目前用于此 AsyncSession 以任何方式来解析请求的引擎。

注意

此方法直接代理到 Session.get_bind() 方法,但是目前作为覆盖目标有用,与 Session.get_bind() 方法不同。以下示例说明了如何实现与 AsyncSessionAsyncEngine 一起使用的自定义 Session.get_bind() 方案。

自定义垂直分区 中介绍的模式说明了如何将自定义绑定查找方案应用于 Session,前提是给定一组 Engine 对象。要应用相应的 Session.get_bind() 实现以用于 AsyncSessionAsyncEngine 对象,继续子类化 Session 并将其应用于 AsyncSession 使用 AsyncSession.sync_session_class。内部方法必须继续返回 Engine 实例,这些实例可以从 AsyncEngine 中使用 AsyncEngine.sync_engine 属性获取。

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
    'other':create_async_engine("sqlite+aiosqlite:///other.db"),
    'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
    'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
}

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines['other'].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines['leader'].sync_engine
        else:
            return engines[
                random.choice(['follower1','follower2'])
            ].sync_engine

# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(
    sync_session_class=RoutingSession
)

Session.get_bind() 方法在与 ORM 事件钩子和通过 AsyncSession.run_sync() 调用的函数相同的非异步、隐式非阻塞上下文中调用,因此希望在 Session.get_bind() 中运行 SQL 命令的例程可以继续使用阻塞式代码,这将在调用数据库驱动程序上的 IO 时被转换为隐式异步调用。

method sqlalchemy.ext.asyncio.async_scoped_session.async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O

根据给定的主键标识符返回一个实例,如果未找到,则引发异常。

代表 async_scoped_session 类,为 AsyncSession 类代理。

如果查询未选择任何行,则引发 sqlalchemy.orm.exc.NoResultFound

..versionadded: 2.0.22

另请参见

Session.get_one() - get_one 的主要文档

classmethod sqlalchemy.ext.asyncio.async_scoped_session.identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) _IdentityKeyType[Any]

返回一个身份键。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

这是 identity_key() 的别名。

attribute sqlalchemy.ext.asyncio.async_scoped_session.identity_map

代表 AsyncSession 类代理 Session.identity_map 属性。

代表 async_scoped_session 类,为 AsyncSession 类代理。

attribute sqlalchemy.ext.asyncio.async_scoped_session.info

一个用户可修改的字典。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

可以使用 Session 构造函数或 sessionmaker 构造函数或工厂方法的 info 参数填充此字典的初始值。这里的字典始终是此 Session 本地的,可以独立于所有其他 Session 对象进行修改。

method sqlalchemy.ext.asyncio.async_scoped_session.async invalidate() None

使用连接失效关闭此 Session。

代表 async_scoped_session 类,为 AsyncSession 类代理。

有关完整说明,请参见 Session.invalidate()

attribute sqlalchemy.ext.asyncio.async_scoped_session.is_active

如果此 Session 不处于“部分回滚”状态,则为 True。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

在版本 1.4 中更改: Session 不会立即开始新的事务,因此当该 Session 首次实例化时,此属性将为 False。

“部分回滚”状态通常表示该 Session 的刷新过程已失败,并且必须发出该 Session.rollback() 方法才能完全回滚事务。

如果此 Session 根本不处于事务中,该 Session 将在首次使用时自动开始,因此在这种情况下 Session.is_active 将返回 True。

否则,如果此 Session 处于事务中,并且该事务未在内部回滚,则 Session.is_active 也将返回 True。

method sqlalchemy.ext.asyncio.async_scoped_session.is_modified(instance: object, include_collections: bool = True) bool

如果给定实例具有本地修改的属性,则返回 True

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

此方法检索实例上每个被检测属性的历史记录,并将其当前值与其之前刷新或提交的值(如果有)进行比较。

它实际上是检查给定实例在 Session.dirty 集合中的更昂贵且更准确的版本;将对每个属性的净“脏”状态执行完整测试。

例如:

return session.is_modified(someobject)

此方法有一些注意事项

  • 存在于 Session.dirty 集合中的实例在使用此方法测试时可能报告 False。这是因为该对象可能通过属性变异收到了更改事件,从而将其置于 Session.dirty 中,但最终该状态与从数据库加载的状态相同,导致此处没有净变化。

  • 如果在收到新值时未加载标量属性或已过期,则标量属性可能未记录先前设置的值。在这些情况下,假设该属性发生了更改,即使最终其数据库值没有净变化。在大多数情况下,SQLAlchemy 在设置事件发生时不需要“旧”值,因此如果旧值不存在,它会跳过昂贵的 SQL 调用,假设通常需要对标量值进行 UPDATE,并且在极少数情况下不需要时,比发出防御性 SELECT 更便宜。

    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件地获取“旧”值。此标志通常针对主键属性和不是简单一对多的标量对象引用设置。要为任何任意映射列设置此标志,请使用 active_history 参数与 column_property() 一同使用。

参数:
  • instance – 要测试其挂起更改的映射实例。

  • include_collections – 指示是否应将多值集合包含在操作中。将其设置为 False 是一种方法,可以仅检测在刷新时会导致对该实例进行 UPDATE 的基于本地列的属性(即标量列或多对一外键)。

method sqlalchemy.ext.asyncio.async_scoped_session.async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) _O

将给定实例的状态复制到此 AsyncSession 中的相应实例。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.merge() - 合并的主要文档

attribute sqlalchemy.ext.asyncio.async_scoped_session.new

Session 中标记为“新建”的所有实例的集合。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

attribute sqlalchemy.ext.asyncio.async_scoped_session.no_autoflush

返回禁用自动刷新的上下文管理器。

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

with: 块中进行的操作将不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时很有用,其中未完成的对象不应立即刷新。

classmethod sqlalchemy.ext.asyncio.async_scoped_session.object_session(instance: object) Session | None

返回对象所属的 Session

代表 async_scoped_session 类,为 AsyncSession 类代理。

代表 Session 类,作为 AsyncSession 类的代理。

这是 object_session() 的别名。

method sqlalchemy.ext.asyncio.async_scoped_session.async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) None

使给定实例的属性失效并刷新。

代表 async_scoped_session 类,为 AsyncSession 类代理。

将向数据库发出查询,所有属性将使用它们当前的数据库值进行刷新。

这是 Session.refresh() 方法的异步版本。有关所有选项的完整说明,请参阅该方法。

另请参见

Session.refresh() - refresh 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async remove() None

如果存在,则处理当前的 AsyncSession

与 scoped_session 的 remove 方法不同,此方法将使用 await 等待 AsyncSession 的 close 方法。

method sqlalchemy.ext.asyncio.async_scoped_session.async reset() None

关闭此 Session 使用的事务资源和 ORM 对象,将其重置为初始状态。

代表 async_scoped_session 类,为 AsyncSession 类代理。

版本 2.0.22 中的新增功能。

另请参见

Session.reset() - “reset” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 语义的详细信息。

method sqlalchemy.ext.asyncio.async_scoped_session.async rollback() None

回滚当前正在进行的事务。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.rollback() - “rollback” 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Any

执行语句并返回标量结果。

代表 async_scoped_session 类,为 AsyncSession 类代理。

另请参见

Session.scalar() - scalar 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) ScalarResult[Any]

执行语句并返回标量结果。

代表 async_scoped_session 类,为 AsyncSession 类代理。

返回值:

一个 ScalarResult 对象

版本 1.4.24 中的新增功能: 添加了 AsyncSession.scalars()

版本 1.4.26 中的新增功能: 添加了 async_scoped_session.scalars()

另请参见

Session.scalars() - scalars 的主要文档

AsyncSession.stream_scalars() - 流式版本

attribute sqlalchemy.ext.asyncio.async_scoped_session.session_factory: async_sessionmaker[_AS]

提供给 __init__session_factory 存储在此属性中,可以在以后访问。当需要新的非范围 AsyncSession 时,这很有用。

method sqlalchemy.ext.asyncio.async_scoped_session.async stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncResult[Any]

执行语句并返回一个流式 AsyncResult 对象。

代表 async_scoped_session 类,为 AsyncSession 类代理。

method sqlalchemy.ext.asyncio.async_scoped_session.async stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncScalarResult[Any]

执行语句并返回一个标量结果流。

代表 async_scoped_session 类,为 AsyncSession 类代理。

返回值:

一个 AsyncScalarResult 对象。

版本 1.4.24 中新增。

另请参见

Session.scalars() - scalars 的主要文档

AsyncSession.scalars() - 非流式版本。

class sqlalchemy.ext.asyncio.AsyncAttrs

混合类,为所有属性提供一个可等待的访问器。

例如:

from __future__ import annotations

from typing import List

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(AsyncAttrs, DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str]
    bs: Mapped[List[B]] = relationship()


class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[str]

在上面的例子中,AsyncAttrs mixin 应用于声明式 Base 类,它对所有子类都有效。这个 mixin 为所有类添加了一个新属性 AsyncAttrs.awaitable_attrs,它将以可等待的方式生成任何属性的值。这允许访问可能需要延迟加载或延迟/失效加载的属性,以便仍然可以发出 IO。

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

# use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs``
# interface, so that it may be awaited
for b1 in await a1.awaitable_attrs.bs:
    print(b1)

AsyncAttrs.awaitable_attrs 对属性执行一个调用,这与使用 AsyncSession.run_sync() 方法大致相同,例如:

for b1 in await async_session.run_sync(lambda sess: a1.bs):
    print(b1)

新添加于版本 2.0.13。

attribute sqlalchemy.ext.asyncio.AsyncAttrs.awaitable_attrs

提供此对象上所有属性作为可等待对象的命名空间。

例如:

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

some_attribute = await a1.awaitable_attrs.some_deferred_attribute
some_collection = await a1.awaitable_attrs.some_collection
class sqlalchemy.ext.asyncio.AsyncSession

Session 的异步版本。

AsyncSession 是传统 Session 实例的代理。

AsyncSession **不适用于并发任务。**。有关背景信息,请参阅 Session 是否线程安全?AsyncSession 在并发任务中是否安全共享?

版本 1.4 中的新功能。

要将 AsyncSession 与自定义 Session 实现一起使用,请参阅 AsyncSession.sync_session_class 参数。

类签名

class sqlalchemy.ext.asyncio.AsyncSession (sqlalchemy.ext.asyncio.base.ReversibleProxy)

attribute sqlalchemy.ext.asyncio.AsyncSession.sync_session_class: Type[Session] = <class 'sqlalchemy.orm.session.Session'>

用于特定 AsyncSession 的底层 Session 实例的类或可调用对象。

在类级别,此属性是 AsyncSession.sync_session_class 参数的默认值。 AsyncSession 的自定义子类可以覆盖它。

在实例级别,此属性表示当前用于提供此 AsyncSession 实例的 Session 实例的类或可调用对象。

版本 1.4.24 中新增。

method sqlalchemy.ext.asyncio.AsyncSession.__init__(bind: _AsyncSessionBind | None = None, *, binds: Dict[_SessionBindKey, _AsyncSessionBind] | None = None, sync_session_class: Type[Session] | None = None, **kw: Any)

构造一个新的 AsyncSession

除了 sync_session_class 之外的所有参数都直接传递给 sync_session_class 可调用对象以实例化一个新的 Session。有关参数文档,请参阅 Session.__init__()

参数:

sync_session_class

将用于构造将被代理的 SessionSession 子类或其他可调用对象。此参数可用于提供自定义 Session 子类。默认为 AsyncSession.sync_session_class 类级别属性。

版本 1.4.24 中新增。

method sqlalchemy.ext.asyncio.AsyncSession.async aclose() None

AsyncSession.close() 的别名。

AsyncSession.aclose() 的名称专门用于支持 Python 标准库 @contextlib.aclosing 上下文管理器函数。

新版本 2.0.20 中添加。

method sqlalchemy.ext.asyncio.AsyncSession.add(instance: object, _warn: bool = True) None

将一个对象放入此 Session 中。

代表 Session 类,作为 AsyncSession 类的代理。

当对象处于 瞬态 状态并传递给 Session.add() 方法时,它们会转入 待处理 状态,直到下次刷新,此时它们会转入 持久化 状态。

当对象处于 分离 状态并传递给 Session.add() 方法时,它们会直接转入 持久化 状态。

如果 Session 使用的事务回滚,则传递给 Session.add() 方法时处于瞬态状态的对象将被移回 瞬态 状态,并且将不再存在于此 Session 中。

method sqlalchemy.ext.asyncio.AsyncSession.add_all(instances: Iterable[object]) None

将给定的实例集合添加到此 Session 中。

代表 Session 类,作为 AsyncSession 类的代理。

有关一般行为描述,请参阅 Session.add() 的文档。

属性 sqlalchemy.ext.asyncio.AsyncSession.autoflush

代表 AsyncSession 类的 Session.autoflush 属性的代理。

方法 sqlalchemy.ext.asyncio.AsyncSession.begin() AsyncSessionTransaction

返回一个 AsyncSessionTransaction 对象。

当进入 AsyncSessionTransaction 对象时,底层 Session 会执行“begin”操作

async with async_session.begin():
    # .. ORM transaction is begun

请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是在按需基础上开始的。但是,begin 块是异步的,以适应可能执行 IO 的 SessionEvents.after_transaction_create() 事件钩子。

有关 ORM begin 的一般描述,请参阅 Session.begin()

方法 sqlalchemy.ext.asyncio.AsyncSession.begin_nested() AsyncSessionTransaction

返回一个 AsyncSessionTransaction 对象,该对象将开始一个“嵌套”事务,例如 SAVEPOINT。

行为与 AsyncSession.begin() 相同。

有关 ORM begin nested 的一般描述,请参阅 Session.begin_nested()

另请参见

可序列化隔离/保存点/事务性 DDL (asyncio 版本) - SQLite asyncio 驱动程序需要特殊的解决方法才能使 SAVEPOINT 正确工作。

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 close() None

关闭此 AsyncSession 使用的事务资源和 ORM 对象。

另请参见

Session.close() - “close” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 语义的详细信息。

异步 类方法 sqlalchemy.ext.asyncio.AsyncSession.close_all() None

关闭所有 AsyncSession 会话。

自版本 2.0 起弃用: AsyncSession.close_all() 方法已弃用,将在未来版本中删除。请参阅 close_all_sessions()

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 commit() None

提交当前进行的事务。

另请参见

Session.commit() - “commit” 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) AsyncConnection

返回对应于此 Session 对象的事务状态的 AsyncConnection 对象。

此方法也可用于为当前事务使用的数据库连接建立执行选项。

版本 1.4.24 中的新内容: 添加了 **kw 参数,这些参数会传递给底层的 Session.connection() 方法。

另请参见

Session.connection() - “连接” 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 delete(instance: object) None

将实例标记为已删除。

数据库删除操作在 flush() 时发生。

由于此操作可能需要沿着未加载的关系级联,因此它是可等待的,以允许这些查询发生。

另请参见

Session.delete() - 删除的主要文档

属性 sqlalchemy.ext.asyncio.AsyncSession.deleted

Session 中标记为“已删除”的所有实例的集合。

代表 Session 类,作为 AsyncSession 类的代理。

属性 sqlalchemy.ext.asyncio.AsyncSession.dirty

所有被认为是脏的持久实例的集合。

代表 Session 类,作为 AsyncSession 类的代理。

例如:

some_mapped_object in session.dirty

当实例被修改但未被删除时,它们被认为是脏的。

请注意,这种“脏”计算是“乐观”的;大多数属性设置或集合修改操作会将实例标记为“脏”并将其放入此集合中,即使属性的值没有净变化。在冲洗时,每个属性的值与其之前保存的值进行比较,如果不存在净变化,则不会发生 SQL 操作(这是一个更昂贵的操作,因此只在冲洗时执行)。

要检查实例是否对其属性具有可操作的净变化,请使用 Session.is_modified() 方法。

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Result[Any]

执行语句并返回一个缓冲的 Result 对象。

另请参见

Session.execute() - execute 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.expire(instance: object, attribute_names: Iterable[str] | None = None) None

使实例上的属性失效。

代表 Session 类,作为 AsyncSession 类的代理。

将实例的属性标记为已过期。下次访问过期的属性时,将向 Session 对象的当前事务上下文发出查询,以加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与在同一事务中先前读取的值相同的值,而不管该事务外部的数据库状态是否发生变化。

要使 Session 中的所有对象同时失效,请使用 Session.expire_all()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态失效,以便为新事务加载新状态。 因此,调用 Session.expire() 仅在当前事务中发出非 ORM SQL 语句的特定情况下才有意义。

参数:
  • instance – 要刷新的实例。

  • attribute_names – 可选的字符串属性名称列表,指示要过期的属性子集。

方法 sqlalchemy.ext.asyncio.AsyncSession.expire_all() None

使此 Session 中的所有持久实例失效。

代表 Session 类,作为 AsyncSession 类的代理。

当持久实例上的任何属性在下一次访问时,将使用 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有失效属性。 请注意,高度隔离的事务将返回与在该事务中先前读取的值相同的值,无论该事务之外的数据库状态发生任何变化。

要使失效单个对象和这些对象上的单个属性,请使用 Session.expire()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态失效,以便为新事务加载新状态。 因此,通常不需要调用 Session.expire_all(),假设事务是隔离的。

方法 sqlalchemy.ext.asyncio.AsyncSession.expunge(instance: object) None

从此 Session 中删除 instance

代表 Session 类,作为 AsyncSession 类的代理。

这将释放对该实例的所有内部引用。 级联将根据expunge级联规则应用。

方法 sqlalchemy.ext.asyncio.AsyncSession.expunge_all() None

从此 Session 中删除所有对象实例。

代表 Session 类,作为 AsyncSession 类的代理。

这等效于对此 Session 中的所有对象调用 expunge(obj)

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 flush(objects: Sequence[Any] | None = None) None

将所有对象更改刷新到数据库中。

另请参见

Session.flush() - 刷新主文档

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O | None

返回基于给定主键标识符的实例,如果未找到,则返回 None

另请参见

Session.get() - 获取主文档

方法 sqlalchemy.ext.asyncio.AsyncSession.get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) Engine | Connection

返回同步代理的 Session 绑定的“bind”。

Session.get_bind() 方法不同,此方法目前用于此 AsyncSession 以任何方式来解析请求的引擎。

注意

此方法直接代理到 Session.get_bind() 方法,但是目前作为覆盖目标有用,与 Session.get_bind() 方法不同。以下示例说明了如何实现与 AsyncSessionAsyncEngine 一起使用的自定义 Session.get_bind() 方案。

自定义垂直分区 中介绍的模式说明了如何将自定义绑定查找方案应用于 Session,前提是给定一组 Engine 对象。要应用相应的 Session.get_bind() 实现以用于 AsyncSessionAsyncEngine 对象,继续子类化 Session 并将其应用于 AsyncSession 使用 AsyncSession.sync_session_class。内部方法必须继续返回 Engine 实例,这些实例可以从 AsyncEngine 中使用 AsyncEngine.sync_engine 属性获取。

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    'leader':create_async_engine("sqlite+aiosqlite:///leader.db"),
    'other':create_async_engine("sqlite+aiosqlite:///other.db"),
    'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"),
    'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"),
}

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines['other'].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines['leader'].sync_engine
        else:
            return engines[
                random.choice(['follower1','follower2'])
            ].sync_engine

# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(
    sync_session_class=RoutingSession
)

Session.get_bind() 方法在与 ORM 事件钩子和通过 AsyncSession.run_sync() 调用的函数相同的非异步、隐式非阻塞上下文中调用,因此希望在 Session.get_bind() 中运行 SQL 命令的例程可以继续使用阻塞式代码,这将在调用数据库驱动程序上的 IO 时被转换为隐式异步调用。

方法 sqlalchemy.ext.asyncio.AsyncSession.get_nested_transaction() AsyncSessionTransaction | None

返回当前正在进行的嵌套事务(如果有)。

返回值:

一个 AsyncSessionTransaction 对象,或 None

版本 1.4.18 中的新增内容。

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O

根据给定的主键标识符返回一个实例,如果未找到,则引发异常。

如果查询未选择任何行,则引发 sqlalchemy.orm.exc.NoResultFound

..versionadded: 2.0.22

另请参见

Session.get_one() - get_one 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.get_transaction() AsyncSessionTransaction | None

返回当前正在进行的根事务(如果有)。

返回值:

一个 AsyncSessionTransaction 对象,或 None

版本 1.4.18 中的新增内容。

classmethod sqlalchemy.ext.asyncio.AsyncSession.identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) _IdentityKeyType[Any]

返回一个身份键。

代表 Session 类,作为 AsyncSession 类的代理。

这是 identity_key() 的别名。

attribute sqlalchemy.ext.asyncio.AsyncSession.identity_map

代表 AsyncSession 类代理 Session.identity_map 属性。

method sqlalchemy.ext.asyncio.AsyncSession.in_nested_transaction() bool

如果此 Session 已开始嵌套事务(例如 SAVEPOINT),则返回 True。

代表 Session 类,作为 AsyncSession 类的代理。

版本 1.4 中的新功能。

method sqlalchemy.ext.asyncio.AsyncSession.in_transaction() bool

如果此 Session 已开始事务,则返回 True。

代表 Session 类,作为 AsyncSession 类的代理。

版本 1.4 中的新功能。

另请参见

Session.is_active

attribute sqlalchemy.ext.asyncio.AsyncSession.info

一个用户可修改的字典。

代表 Session 类,作为 AsyncSession 类的代理。

可以使用 Session 构造函数或 sessionmaker 构造函数或工厂方法的 info 参数填充此字典的初始值。这里的字典始终是此 Session 本地的,可以独立于所有其他 Session 对象进行修改。

method sqlalchemy.ext.asyncio.AsyncSession.async invalidate() None

使用连接失效关闭此 Session。

有关完整说明,请参见 Session.invalidate()

attribute sqlalchemy.ext.asyncio.AsyncSession.is_active

如果此 Session 不处于“部分回滚”状态,则为 True。

代表 Session 类,作为 AsyncSession 类的代理。

在版本 1.4 中更改: Session 不会立即开始新的事务,因此当该 Session 首次实例化时,此属性将为 False。

“部分回滚”状态通常表示该 Session 的刷新过程已失败,并且必须发出该 Session.rollback() 方法才能完全回滚事务。

如果此 Session 根本不处于事务中,该 Session 将在首次使用时自动开始,因此在这种情况下 Session.is_active 将返回 True。

否则,如果此 Session 处于事务中,并且该事务未在内部回滚,则 Session.is_active 也将返回 True。

method sqlalchemy.ext.asyncio.AsyncSession.is_modified(instance: object, include_collections: bool = True) bool

如果给定实例具有本地修改的属性,则返回 True

代表 Session 类,作为 AsyncSession 类的代理。

此方法检索实例上每个被检测属性的历史记录,并将其当前值与其之前刷新或提交的值(如果有)进行比较。

它实际上是检查给定实例在 Session.dirty 集合中的更昂贵且更准确的版本;将对每个属性的净“脏”状态执行完整测试。

例如:

return session.is_modified(someobject)

此方法有一些注意事项

  • 存在于 Session.dirty 集合中的实例在使用此方法测试时可能报告 False。这是因为该对象可能通过属性变异收到了更改事件,从而将其置于 Session.dirty 中,但最终该状态与从数据库加载的状态相同,导致此处没有净变化。

  • 如果在收到新值时未加载标量属性或已过期,则标量属性可能未记录先前设置的值。在这些情况下,假设该属性发生了更改,即使最终其数据库值没有净变化。在大多数情况下,SQLAlchemy 在设置事件发生时不需要“旧”值,因此如果旧值不存在,它会跳过昂贵的 SQL 调用,假设通常需要对标量值进行 UPDATE,并且在极少数情况下不需要时,比发出防御性 SELECT 更便宜。

    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件地获取“旧”值。此标志通常针对主键属性和不是简单一对多的标量对象引用设置。要为任何任意映射列设置此标志,请使用 active_history 参数与 column_property() 一同使用。

参数:
  • instance – 要测试是否有待处理更改的映射实例。

  • include_collections – 指示是否应将多值集合包含在操作中。将其设置为 False 是一种方法,可以仅检测会导致在刷新时对该实例执行 UPDATE 的基于本地列的属性(即标量列或多对一外键)。

method sqlalchemy.ext.asyncio.AsyncSession.async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) _O

将给定实例的状态复制到此 AsyncSession 中的相应实例。

另请参见

Session.merge() - 合并的主要文档

attribute sqlalchemy.ext.asyncio.AsyncSession.new

Session 中标记为“新建”的所有实例的集合。

代表 Session 类,作为 AsyncSession 类的代理。

attribute sqlalchemy.ext.asyncio.AsyncSession.no_autoflush

返回禁用自动刷新的上下文管理器。

代表 Session 类,作为 AsyncSession 类的代理。

例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

with: 块中进行的操作将不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时很有用,其中未完成的对象不应立即刷新。

classmethod sqlalchemy.ext.asyncio.AsyncSession.object_session(instance: object) Session | None

返回对象所属的 Session

代表 Session 类,作为 AsyncSession 类的代理。

这是 object_session() 的别名。

method sqlalchemy.ext.asyncio.AsyncSession.async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) None

使给定实例的属性失效并刷新。

将向数据库发出查询,所有属性将使用它们当前的数据库值进行刷新。

这是 Session.refresh() 方法的异步版本。有关所有选项的完整说明,请参阅该方法。

另请参见

Session.refresh() - refresh 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async reset() None

关闭此 Session 使用的事务资源和 ORM 对象,将其重置为初始状态。

版本 2.0.22 中的新增功能。

另请参见

Session.reset() - “reset” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 语义的详细信息。

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 rollback()

回滚当前正在进行的事务。

另请参见

Session.rollback() - “rollback” 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.orm.session.Session, ~_P]], ~sqlalchemy.ext.asyncio.session._T], *arg: ~typing.~_P, **kw: ~typing.~_P) _T

调用给定的同步(即非异步)可调用对象,并将同步样式的 Session 作为第一个参数传递。

此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。

例如:

def some_business_method(session: Session, param: str) -> str:
    '''A synchronous function that does not require awaiting

    :param session: a SQLAlchemy Session, used synchronously

    :return: an optional return value is supported

    '''
    session.add(MyObject(param=param))
    session.flush()
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    '''an async function that uses awaiting'''

    with AsyncSession(async_engine) as async_session:
        # run some_business_method() with a sync-style
        # Session, proxied into an awaitable
        return_code = await async_session.run_sync(some_business_method, param="param1")
        print(return_code)

此方法通过在专门的 instrument greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。

提示

提供的可调用对象在 asyncio 事件循环中内联调用,并且将在传统的 IO 调用中阻塞。 此可调用对象中的 IO 应该只调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适配到 greenlet 上下文中。

另请参见

AsyncAttrs - 用于 ORM 映射类的混合类,它以更简洁的方式在每个属性的基础上提供类似的功能

AsyncConnection.run_sync()

在 asyncio 下运行同步方法和函数

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Any

执行语句并返回标量结果。

另请参见

Session.scalar() - scalar 的主要文档

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) ScalarResult[Any]

执行语句并返回标量结果。

返回值:

一个 ScalarResult 对象

版本 1.4.24 中的新增功能: 添加了 AsyncSession.scalars()

版本 1.4.26 中的新增功能: 添加了 async_scoped_session.scalars()

另请参见

Session.scalars() - scalars 的主要文档

AsyncSession.stream_scalars() - 流式版本

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncResult[Any]

执行语句并返回一个流式 AsyncResult 对象。

方法 sqlalchemy.ext.asyncio.AsyncSession.异步 stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncScalarResult[Any]

执行语句并返回一个标量结果流。

返回值:

一个 AsyncScalarResult 对象。

版本 1.4.24 中新增。

另请参见

Session.scalars() - scalars 的主要文档

AsyncSession.scalars() - 非流式版本。

属性 sqlalchemy.ext.asyncio.AsyncSession.sync_session: Session

对底层 Session 的引用,此 AsyncSession 将请求代理到该对象。

此实例可用作事件目标。

sqlalchemy.ext.asyncio.AsyncSessionTransaction

ORM SessionTransaction 对象的包装器。

提供此对象是为了允许为 AsyncSession.begin() 返回一个持有事务的对象。

此对象支持对 AsyncSessionTransaction.commit()AsyncSessionTransaction.rollback() 的显式调用,以及用作异步上下文管理器。

版本 1.4 中的新功能。

成员

commit()rollback()

类签名

class sqlalchemy.ext.asyncio.AsyncSessionTransaction (sqlalchemy.ext.asyncio.base.ReversibleProxy, sqlalchemy.ext.asyncio.base.StartableContext)

method sqlalchemy.ext.asyncio.AsyncSessionTransaction.async commit() None

提交此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncSessionTransaction.async rollback() None

回滚此 AsyncTransaction