异步 I/O (asyncio)

支持 Python asyncio。 包括对 Core 和 ORM 用法的支持,使用 asyncio 兼容的方言。

版本 1.4 中的新功能。

警告

请阅读 Asyncio 平台安装注意事项 (包括 Apple M1),了解包括 Apple M1 架构在内的许多平台的重要平台安装注意事项。

另请参阅

Core 和 ORM 的异步 IO 支持 - 初始功能公告

Asyncio 集成 - 示例脚本,说明 Core 和 ORM 在 asyncio 扩展中使用的工作示例。

Asyncio 平台安装注意事项 (包括 Apple M1)

asyncio 扩展仅需要 Python 3。 它还依赖于 greenlet 库。 默认情况下,此依赖项安装在常见的机器平台上,包括

x86_64 aarch64 ppc64le amd64 win32

对于上述平台,已知 greenlet 提供预构建的 wheel 文件。 对于其他平台,默认情况下不安装 greenlet; greenlet 的当前文件列表可以在 Greenlet - 下载文件 中查看。 请注意,省略了许多架构,包括 Apple M1

要在安装 SQLAlchemy 的同时确保 greenlet 依赖项存在,无论使用哪个平台,都可以安装 [asyncio] setuptools extra,如下所示,这也将指示 pip 安装 greenlet

pip install sqlalchemy[asyncio]

请注意,在没有预构建 wheel 文件的平台上安装 greenlet 意味着 greenlet 将从源代码构建,这需要 Python 的开发库也存在。

概要 - Core

对于 Core 用法,create_async_engine() 函数创建一个 AsyncEngine 的实例,它提供传统 Engine API 的异步版本。AsyncEngine 通过其 AsyncEngine.connect()AsyncEngine.begin() 方法传递 AsyncConnection,这两种方法都提供异步上下文管理器。AsyncConnection 然后可以使用 AsyncConnection.execute() 方法调用语句以传递缓冲的 Result,或者使用 AsyncConnection.stream() 方法传递流式服务器端 AsyncResult

>>> import asyncio

>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine

>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(meta.drop_all)
...         await conn.run_sync(meta.create_all)
...
...         await conn.execute(
...             t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
...         )
...
...     async with engine.connect() as conn:
...         # select a Result, which will be delivered with buffered
...         # results
...         result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
...         print(result.fetchall())
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE t1 ( name VARCHAR(50) NOT NULL, PRIMARY KEY (name) ) ... INSERT INTO t1 (name) VALUES (?) [...] [('some name 1',), ('some name 2',)] COMMIT BEGIN (implicit) SELECT t1.name FROM t1 WHERE t1.name = ? [...] ('some name 1',) [('some name 1',)] ROLLBACK

在上面,AsyncConnection.run_sync() 方法可用于调用特殊的 DDL 函数,例如 MetaData.create_all(),它们不包含可等待的钩子。

提示

建议在使用 AsyncEngine 对象在超出上下文范围并被垃圾回收的范围内时,使用 await 调用 AsyncEngine.dispose() 方法,如上面示例中的 async_main 函数所示。 这确保连接池持有的任何连接都将在可等待的上下文中正确释放。 与使用阻塞 IO 不同,SQLAlchemy 无法在 __del__ 或 weakref finalizers 等方法中正确释放这些连接,因为没有机会调用 await。 如果在引擎超出范围时未能显式释放引擎,可能会导致标准输出中发出类似于 RuntimeError: Event loop is closed 形式的警告,在垃圾回收期间。

AsyncConnection 还通过 AsyncConnection.stream() 方法提供了一个“流式” API,该方法返回一个 AsyncResult 对象。 此结果对象使用服务器端游标并提供 async/await API,例如异步迭代器

async with engine.connect() as conn:
    async_result = await conn.stream(select(t1))

    async for row in async_result:
        print("row: %s" % (row,))

概要 - ORM

使用 2.0 风格 查询,AsyncSession 类提供完整的 ORM 功能。

在默认使用模式下,必须特别注意避免 惰性加载 或其他涉及 ORM 关系和列属性的过期属性访问;下一节 在使用 AsyncSession 时防止隐式 IO 详细介绍了这一点。

警告

AsyncSession 的单个实例不安全用于多个并发任务。 有关背景信息,请参阅 将 AsyncSession 与并发任务一起使用Session 是线程安全的吗? AsyncSession 可以安全地在并发任务中共享吗? 部分。

下面的示例说明了一个完整的示例,包括映射器和 session 配置

>>> from __future__ import annotations

>>> import asyncio
>>> import datetime
>>> from typing import List

>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload


>>> class Base(AsyncAttrs, DeclarativeBase):
...     pass

>>> class B(Base):
...     __tablename__ = "b"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
...     data: Mapped[str]

>>> class A(Base):
...     __tablename__ = "a"
...
...     id: Mapped[int] = mapped_column(primary_key=True)
...     data: Mapped[str]
...     create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
...     bs: Mapped[List[B]] = relationship()

>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
...     async with async_session() as session:
...         async with session.begin():
...             session.add_all(
...                 [
...                     A(bs=[B(data="b1"), B(data="b2")], data="a1"),
...                     A(bs=[], data="a2"),
...                     A(bs=[B(data="b3"), B(data="b4")], data="a3"),
...                 ]
...             )


>>> async def select_and_update_objects(
...     async_session: async_sessionmaker[AsyncSession],
... ) -> None:
...     async with async_session() as session:
...         stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
...         result = await session.execute(stmt)
...
...         for a in result.scalars():
...             print(a, a.data)
...             print(f"created at: {a.create_date}")
...             for b in a.bs:
...                 print(b, b.data)
...
...         result = await session.execute(select(A).order_by(A.id).limit(1))
...
...         a1 = result.scalars().one()
...
...         a1.data = "new data"
...
...         await session.commit()
...
...         # access attribute subsequent to commit; this is what
...         # expire_on_commit=False allows
...         print(a1.data)
...
...         # alternatively, AsyncAttrs may be used to access any attribute
...         # as an awaitable (new in 2.0.13)
...         for b1 in await a1.awaitable_attrs.bs:
...             print(b1, b1.data)


>>> async def async_main() -> None:
...     engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
...     # async_sessionmaker: a factory for new AsyncSession objects.
...     # expire_on_commit - don't expire objects after transaction commit
...     async_session = async_sessionmaker(engine, expire_on_commit=False)
...
...     async with engine.begin() as conn:
...         await conn.run_sync(Base.metadata.create_all)
...
...     await insert_objects(async_session)
...     await select_and_update_objects(async_session)
...
...     # for AsyncEngine created in function scope, close and
...     # clean-up pooled connections
...     await engine.dispose()


>>> asyncio.run(async_main())
BEGIN (implicit) ... CREATE TABLE a ( id INTEGER NOT NULL, data VARCHAR NOT NULL, create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL, PRIMARY KEY (id) ) ... CREATE TABLE b ( id INTEGER NOT NULL, a_id INTEGER NOT NULL, data VARCHAR NOT NULL, PRIMARY KEY (id), FOREIGN KEY(a_id) REFERENCES a (id) ) ... COMMIT BEGIN (implicit) INSERT INTO a (data) VALUES (?) RETURNING id, create_date [...] ('a1',) ... INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id [...] (1, 'b2') ... COMMIT BEGIN (implicit) SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id [...] () SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data FROM b WHERE b.a_id IN (?, ?, ?) [...] (1, 2, 3) <A object at ...> a1 created at: ... <B object at ...> b1 <B object at ...> b2 <A object at ...> a2 created at: ... <A object at ...> a3 created at: ... <B object at ...> b3 <B object at ...> b4 SELECT a.id, a.data, a.create_date FROM a ORDER BY a.id LIMIT ? OFFSET ? [...] (1, 0) UPDATE a SET data=? WHERE a.id = ? [...] ('new data', 1) COMMIT new data <B object at ...> b1 <B object at ...> b2

在上面的示例中,AsyncSession 使用可选的 async_sessionmaker 助手实例化,该助手为新的 AsyncSession 对象提供了一个工厂,其中包含一组固定的参数,这里包括将其与 AsyncEngine 关联,针对特定的数据库 URL。 然后将其传递给其他方法,在这些方法中,它可以在 Python 异步上下文管理器中使用(即 async with: 语句),以便在块的末尾自动关闭; 这等效于调用 AsyncSession.close() 方法。

将 AsyncSession 与并发任务一起使用

AsyncSession 对象是一个可变的、有状态的对象,它表示正在进行的单个有状态数据库事务。 将并发任务与 asyncio 一起使用时,例如使用 asyncio.gather() 等 API,应为每个单独的任务使用单独的 AsyncSession

有关 SessionAsyncSession 关于如何在并发工作负载中使用它们的一般描述,请参阅 Session 是线程安全的吗? AsyncSession 可以安全地在并发任务中共享吗? 部分。

在使用 AsyncSession 时防止隐式 IO

使用传统的 asyncio,应用程序需要避免任何可能发生属性访问时 IO 的点。 可以使用以下技术来帮助实现这一点,其中许多技术在前面的示例中进行了说明。

  • 惰性加载关系、延迟列或表达式或在过期场景中访问的属性可以利用 AsyncAttrs mixin。 此 mixin 在添加到特定类或更一般地添加到声明性 Base 超类时,提供了一个访问器 AsyncAttrs.awaitable_attrs,它将任何属性作为可等待对象传递

    from __future__ import annotations
    
    from typing import List
    
    from sqlalchemy.ext.asyncio import AsyncAttrs
    from sqlalchemy.orm import DeclarativeBase
    from sqlalchemy.orm import Mapped
    from sqlalchemy.orm import relationship
    
    
    class Base(AsyncAttrs, DeclarativeBase):
        pass
    
    
    class A(Base):
        __tablename__ = "a"
    
        # ... rest of mapping ...
    
        bs: Mapped[List[B]] = relationship()
    
    
    class B(Base):
        __tablename__ = "b"
    
        # ... rest of mapping ...

    当不使用预先加载时,访问 A 的新加载实例上的 A.bs 集合通常会使用 惰性加载,为了成功,惰性加载通常会向数据库发出 IO,这在 asyncio 下会失败,因为不允许隐式 IO。 要在没有任何先前加载操作的情况下在 asyncio 下直接访问此属性,可以通过指示 AsyncAttrs.awaitable_attrs 前缀将该属性作为可等待对象访问

    a1 = (await session.scalars(select(A))).one()
    for b1 in await a1.awaitable_attrs.bs:
        print(b1)

    AsyncAttrs mixin 提供了一个简洁的外观,它覆盖了 AsyncSession.run_sync() 方法也使用的内部方法。

    版本 2.0.13 中的新功能。

    另请参阅

    AsyncAttrs

  • 可以通过使用 SQLAlchemy 2.0 中的 只写关系 功能,将集合替换为永远不会隐式发出 IO 的只写集合。 使用此功能,永远不会读取集合,仅使用显式 SQL 调用查询集合。 有关与 asyncio 一起使用的只写集合的示例,请参阅 Asyncio 集成 部分中的 async_orm_writeonly.py 示例。

    使用只写集合时,程序的行为很简单,并且关于集合的行为易于预测。 但是,缺点是没有用于一次加载许多这些集合的任何内置系统,而这需要手动执行。 因此,下面的许多要点都解决了在使用带有 asyncio 的传统惰性加载关系时的特定技术,这需要更加小心。

  • 如果不使用 AsyncAttrs,则可以使用 lazy="raise" 声明关系,以便默认情况下它们不会尝试发出 SQL。 为了加载集合,将改为使用 预先加载

  • 最有用的预先加载策略是 selectinload() 预先加载器,在前面的示例中使用了该加载器,以便在 await session.execute() 调用的范围内预先加载 A.bs 集合

    stmt = select(A).options(selectinload(A.bs))
  • 构造新对象时,集合始终被分配一个默认的空集合,例如上面示例中的列表

    A(bs=[], data="a2")

    这允许上面的 A 对象上的 .bs 集合在刷新 A 对象时存在且可读; 否则,当刷新 A 时,.bs 将被卸载,并且在访问时会引发错误。

  • AsyncSession 配置为使用 Session.expire_on_commit 设置为 False,以便我们可以在调用 AsyncSession.commit() 后访问对象上的属性,就像结尾处我们访问属性的行一样

    # create AsyncSession with expire_on_commit=False
    async_session = AsyncSession(engine, expire_on_commit=False)
    
    # sessionmaker version
    async_session = async_sessionmaker(engine, expire_on_commit=False)
    
    async with async_session() as session:
        result = await session.execute(select(A).order_by(A.id))
    
        a1 = result.scalars().first()
    
        # commit would normally expire all attributes
        await session.commit()
    
        # access attribute subsequent to commit; this is what
        # expire_on_commit=False allows
        print(a1.data)

其他指南包括

  • 应避免使用 AsyncSession.expire() 等方法,而应使用 AsyncSession.refresh()如果绝对需要过期。 通常需要过期,因为在使用 asyncio 时,Session.expire_on_commit 通常应设置为 False

  • 可以使用 AsyncSession.refresh()asyncio 下显式加载惰性加载的关系,如果所需的属性名称显式传递给 Session.refresh.attribute_names,例如

    # assume a_obj is an A that has lazy loaded A.bs collection
    a_obj = await async_session.get(A, [1])
    
    # force the collection to load by naming it in attribute_names
    await async_session.refresh(a_obj, ["bs"])
    
    # collection is present
    print(f"bs collection: {a_obj.bs}")

    当然,最好预先使用预先加载,以便集合已经设置好,而无需惰性加载。

    版本 2.0.4 中的新功能:添加了对 AsyncSession.refresh() 和底层 Session.refresh() 方法的支持,以强制加载惰性加载的关系(如果在 Session.refresh.attribute_names 参数中显式命名)。 在以前的版本中,即使在参数中命名,关系也会被静默跳过。

  • 避免使用 all 级联选项,该选项在 级联 中有文档记录,而应明确列出所需的级联特性。all 级联选项隐含了 refresh-expire 设置等,这意味着 AsyncSession.refresh() 方法将使相关对象上的属性过期,但不一定会刷新那些相关对象(假设在 relationship() 中未配置急切加载),从而使它们处于过期状态。

  • 如果使用 deferred() 列,则应使用适当的加载器选项,此外,如上所述,还应使用 relationship() 构造。有关延迟列加载的背景信息,请参阅 限制哪些列通过列延迟加载

  • 动态关系加载器 中描述的 “dynamic” 关系加载器策略默认情况下与 asyncio 方法不兼容。它只能在 AsyncSession.run_sync() 方法(在 在 asyncio 下运行同步方法和函数 中描述)中调用时直接使用,或者通过使用其 .statement 属性来获取正常的 select。

    user = await session.get(User, 42)
    addresses = (await session.scalars(user.addresses.statement)).all()
    stmt = user.addresses.statement.where(Address.email_address.startswith("patrick"))
    addresses_filter = (await session.scalars(stmt)).all()

    SQLAlchemy 2.0 版本中引入的 只写 技术与 asyncio 完全兼容,应优先使用。

    另请参阅

    “动态”关系加载器被 “只写” 取代 - 关于迁移到 2.0 风格的说明

  • 如果将 asyncio 与不支持 RETURNING 的数据库(如 MySQL 8)一起使用,则除非使用 Mapper.eager_defaults 选项,否则在新刷新的对象上将无法使用服务器默认值(如生成的时间戳)。在 SQLAlchemy 2.0 中,此行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样的后端,这些后端使用 RETURNING 在插入行时获取新值。

在 asyncio 下运行同步方法和函数

深度炼金术

这种方法本质上是公开 SQLAlchemy 能够首先提供 asyncio 接口的机制。虽然这样做没有技术问题,但总体而言,这种方法可能被认为是 “有争议的”,因为它与 asyncio 编程模型的一些核心理念相悖,即任何可能导致 IO 调用的编程语句 **必须** 具有 await 调用,否则程序不会明确指出可能发生 IO 的每一行。 这种方法并没有改变这个总体思路,只是它允许在一系列同步 IO 指令在函数调用的范围内免除此规则,本质上是捆绑到一个可等待对象中。

作为在 asyncio 事件循环中集成传统 SQLAlchemy “延迟加载” 的另一种方法,提供了一个 **可选的** 方法,称为 AsyncSession.run_sync(),它将在 greenlet 中运行任何 Python 函数,其中传统的同步编程概念将被转换为在到达数据库驱动程序时使用 await。 这里的一种假设方法是,面向 asyncio 的应用程序可以将数据库相关的方法打包到使用 AsyncSession.run_sync() 调用的函数中。

如果我们没有为 A.bs 集合使用 selectinload(),那么在上面的示例中,我们可以将对这些属性的访问处理放在一个单独的函数中来完成

import asyncio

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine


def fetch_and_update_objects(session):
    """run traditional sync-style ORM code in a function that will be
    invoked within an awaitable.

    """

    # the session object here is a traditional ORM Session.
    # all features are available here including legacy Query use.

    stmt = select(A)

    result = session.execute(stmt)
    for a1 in result.scalars():
        print(a1)

        # lazy loads
        for b1 in a1.bs:
            print(b1)

    # legacy Query use
    a1 = session.query(A).order_by(A.id).first()

    a1.data = "new data"


async def async_main():
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/test",
        echo=True,
    )
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

    async with AsyncSession(engine) as session:
        async with session.begin():
            session.add_all(
                [
                    A(bs=[B(), B()], data="a1"),
                    A(bs=[B()], data="a2"),
                    A(bs=[B(), B()], data="a3"),
                ]
            )

        await session.run_sync(fetch_and_update_objects)

        await session.commit()

    # for AsyncEngine created in function scope, close and
    # clean-up pooled connections
    await engine.dispose()


asyncio.run(async_main())

上述在 “sync” 运行器中运行某些函数的方法与在基于事件的编程库(如 gevent)之上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。 区别如下:

  1. 与使用 gevent 不同,我们可以继续使用标准的 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到 gevent 事件循环中。

  2. 没有任何 “猴子补丁”。上面的示例使用了真正的 asyncio 驱动程序,并且底层的 SQLAlchemy 连接池也使用了 Python 内置的 asyncio.Queue 来池化连接。

  3. 程序可以在 async/await 代码和包含 sync 代码的函数之间自由切换,几乎没有性能损失。没有使用 “线程执行器” 或任何额外的等待器或同步。

  4. 底层的网络驱动程序也使用了纯 Python asyncio 概念,没有使用像 geventeventlet 提供的第三方网络库。

将事件与 asyncio 扩展一起使用

SQLAlchemy 事件系统 没有被 asyncio 扩展直接公开,这意味着目前还没有 SQLAlchemy 事件处理程序的 “async” 版本。

但是,由于 asyncio 扩展包围了通常的同步 SQLAlchemy API,因此常规的 “同步” 风格事件处理程序可以像不使用 asyncio 时一样自由使用。

如下详述,给定面向 asyncio 的 API,目前有两种注册事件的策略:

当在 asyncio 上下文中的事件处理程序中工作时,像 Connection 这样的对象继续以其通常的 “同步” 方式工作,而无需 awaitasync 用法;当消息最终被 asyncio 数据库适配器接收时,调用风格会透明地适配回 asyncio 调用风格。 对于传递 DBAPI 级别连接的事件,例如 PoolEvents.connect(),该对象是一个 pep-249 兼容的 “连接” 对象,它将同步风格的调用适配到 asyncio 驱动程序中。

Async Engine / Session / Sessionmaker 的事件监听器示例

下面说明了一些与面向 async 的 API 构造关联的同步风格事件处理程序的示例

  • AsyncEngine 上的核心事件

    在此示例中,我们访问 AsyncEngine.sync_engine 属性,将其作为 ConnectionEventsPoolEvents 的目标

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.engine import Engine
    from sqlalchemy.ext.asyncio import create_async_engine
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    
    # connect event on instance of Engine
    @event.listens_for(engine.sync_engine, "connect")
    def my_on_connect(dbapi_con, connection_record):
        print("New DBAPI connection:", dbapi_con)
        cursor = dbapi_con.cursor()
    
        # sync style API use for adapted DBAPI connection / cursor
        cursor.execute("select 'execute from event'")
        print(cursor.fetchone()[0])
    
    
    # before_execute event on all Engine instances
    @event.listens_for(Engine, "before_execute")
    def my_before_execute(
        conn,
        clauseelement,
        multiparams,
        params,
        execution_options,
    ):
        print("before execute!")
    
    
    async def go():
        async with engine.connect() as conn:
            await conn.execute(text("select 1"))
        await engine.dispose()
    
    
    asyncio.run(go())

    输出

    New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>>
    execute from event
    before execute!
  • AsyncSession 上的 ORM 事件

    在此示例中,我们访问 AsyncSession.sync_session 作为 SessionEvents 的目标

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy import text
    from sqlalchemy.ext.asyncio import AsyncSession
    from sqlalchemy.ext.asyncio import create_async_engine
    from sqlalchemy.orm import Session
    
    engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test")
    
    session = AsyncSession(engine)
    
    
    # before_commit event on instance of Session
    @event.listens_for(session.sync_session, "before_commit")
    def my_before_commit(session):
        print("before commit!")
    
        # sync style API use on Session
        connection = session.connection()
    
        # sync style API use on Connection
        result = connection.execute(text("select 'execute from event'"))
        print(result.first())
    
    
    # after_commit event on all Session instances
    @event.listens_for(Session, "after_commit")
    def my_after_commit(session):
        print("after commit!")
    
    
    async def go():
        await session.execute(text("select 1"))
        await session.commit()
    
        await session.close()
        await engine.dispose()
    
    
    asyncio.run(go())

    输出

    before commit!
    execute from event
    after commit!
  • async_sessionmaker 上的 ORM 事件

    对于此用例,我们创建一个 sessionmaker 作为事件目标,然后使用 async_sessionmaker.sync_session_class 参数将其分配给 async_sessionmaker

    import asyncio
    
    from sqlalchemy import event
    from sqlalchemy.ext.asyncio import async_sessionmaker
    from sqlalchemy.orm import sessionmaker
    
    sync_maker = sessionmaker()
    maker = async_sessionmaker(sync_session_class=sync_maker)
    
    
    @event.listens_for(sync_maker, "before_commit")
    def before_commit(session):
        print("before commit")
    
    
    async def main():
        async_session = maker()
    
        await async_session.commit()
    
    
    asyncio.run(main())

    输出

    before commit

在连接池和其他事件中使用仅可等待的驱动程序方法

如上节所述,事件处理程序(例如围绕 PoolEvents 事件处理程序的那些)接收同步风格的 “DBAPI” 连接,这是一个由 SQLAlchemy asyncio 方言提供的包装器对象,用于将底层的 asyncio “驱动程序” 连接适配为 SQLAlchemy 内部可以使用的连接。 当用户定义的事件处理程序实现需要直接使用最终的 “驱动程序” 连接时,会出现一种特殊的用例,即在该驱动程序连接上使用仅可等待的方法。一个这样的示例是 asyncpg 驱动程序提供的 .set_type_codec() 方法。

为了适应这种用例,SQLAlchemy 的 AdaptedConnection 类提供了一个方法 AdaptedConnection.run_async(),允许在事件处理程序或其他 SQLAlchemy 内部的 “同步” 上下文中调用可等待函数。 此方法直接类似于 AsyncConnection.run_sync() 方法,后者允许同步风格的方法在 async 下运行。

AdaptedConnection.run_async() 应该传递一个函数,该函数将接受最内层的 “驱动程序” 连接作为单个参数,并返回一个可等待对象,该可等待对象将由 AdaptedConnection.run_async() 方法调用。 给定的函数本身不需要声明为 async;它可以是一个 Python lambda:,因为返回的可等待值将在返回后被调用

from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(...)


@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
    dbapi_connection.run_async(
        lambda connection: connection.set_type_codec(
            "MyCustomType",
            encoder,
            decoder,  # ...
        )
    )

在上面,传递给 register_custom_types 事件处理程序的对象是 AdaptedConnection 的实例,它为底层的仅 async 驱动程序级连接对象提供了类似 DBAPI 的接口。 然后,AdaptedConnection.run_async() 方法提供了对可等待环境的访问,在该环境中,可以对底层的驱动程序级连接进行操作。

1.4.30 版本新增。

使用多个 asyncio 事件循环

使用多个事件循环的应用程序(例如,在将 asyncio 与多线程结合使用的不常见情况下)在使用默认池实现时,不应与不同的事件循环共享同一个 AsyncEngine

如果 AsyncEngine 要从一个事件循环传递到另一个事件循环,则应在新的事件循环上重新使用之前调用 AsyncEngine.dispose() 方法。 否则可能会导致类似于 Task <Task pending ...> got Future attached to a different loopRuntimeError

如果必须在不同循环之间共享同一个引擎,则应将其配置为使用 NullPool 禁用池化,从而阻止 Engine 多次使用任何连接。

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/dbname",
    poolclass=NullPool,
)

使用 asyncio 作用域会话

在线程 SQLAlchemy 中与 scoped_session 对象一起使用的 “作用域会话” 模式也适用于 asyncio,使用一个名为 async_scoped_session 的适配版本。

提示

SQLAlchemy 通常不建议在新的开发中使用 “作用域” 模式,因为它依赖于可变全局状态,并且在线程或任务中的工作完成时也必须显式地拆除该状态。 特别是在使用 asyncio 时,将 AsyncSession 直接传递给需要它的可等待函数可能是一个更好的主意。

当使用 async_scoped_session 时,由于 asyncio 上下文中没有 “线程本地” 的概念,因此必须为构造函数提供 “scopefunc” 参数。 下面的示例说明了为此目的使用 asyncio.current_task() 函数:

from asyncio import current_task

from sqlalchemy.ext.asyncio import (
    async_scoped_session,
    async_sessionmaker,
)

async_session_factory = async_sessionmaker(
    some_async_engine,
    expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
    async_session_factory,
    scopefunc=current_task,
)
some_async_session = AsyncScopedSession()

警告

async_scoped_session 使用的 “scopefunc” 在任务中会被调用 **任意次数**,每次访问底层的 AsyncSession 时都会调用一次。 因此,该函数应该是 **幂等的** 和轻量级的,并且不应尝试创建或修改任何状态,例如建立回调等。

警告

在作用域中使用 current_task() 作为 “键” 需要从最外层的可等待对象中调用 async_scoped_session.remove() 方法,以确保在任务完成时从注册表中删除该键,否则任务句柄以及 AsyncSession 将保留在内存中,从而实际上造成内存泄漏。 请参阅以下示例,该示例说明了 async_scoped_session.remove() 的正确用法。

async_scoped_session 包含类似于 scoped_session 的 **代理行为**,这意味着它可以直接被视为 AsyncSession,请记住,通常的 await 关键字是必要的,包括对于 async_scoped_session.remove() 方法。

async def some_function(some_async_session, some_object):
    # use the AsyncSession directly
    some_async_session.add(some_object)

    # use the AsyncSession via the context-local proxy
    await AsyncScopedSession.commit()

    # "remove" the current proxied AsyncSession for the local context
    await AsyncScopedSession.remove()

1.4.19 版本新增。

使用 Inspector 检查模式对象

SQLAlchemy 尚未提供 Inspector 的 asyncio 版本(在 使用 Inspector 进行细粒度反射 中介绍),但是可以通过利用 AsyncConnection.run_sync() 方法的 AsyncConnection 来在 asyncio 上下文中使用现有接口。

import asyncio

from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")


def use_inspector(conn):
    inspector = inspect(conn)
    # use the inspector
    print(inspector.get_view_names())
    # return any value to the caller
    return inspector.get_table_names()


async def async_main():
    async with engine.connect() as conn:
        tables = await conn.run_sync(use_inspector)


asyncio.run(async_main())

Engine API 文档

对象名称 描述

async_engine_from_config(configuration[, prefix], **kwargs)

使用配置字典创建一个新的 AsyncEngine 实例。

AsyncConnection

用于 Connection 的 asyncio 代理。

AsyncEngine

用于 Engine 的 asyncio 代理。

AsyncTransaction

用于 Transaction 的 asyncio 代理。

create_async_engine(url, **kw)

创建一个新的 async engine 实例。

create_async_pool_from_url(url, **kwargs)

创建一个新的 async engine 实例。

function sqlalchemy.ext.asyncio.create_async_engine(url: str | URL, **kw: Any) AsyncEngine

创建一个新的 async engine 实例。

传递给 create_async_engine() 的参数与传递给 create_engine() 函数的参数基本相同。 指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg

版本 1.4 中的新功能。

参数:

async_creator

一个异步可调用对象,它返回驱动级别的 asyncio 连接。如果给定,该函数应该不接受任何参数,并从底层的 asyncio 数据库驱动返回一个新的 asyncio 连接;该连接将被包装在适当的结构中,以便与 AsyncEngine 一起使用。请注意,URL 中指定的参数不在此处应用,创建器函数应使用其自身的连接参数。

此参数等效于 create_engine.creator 参数,后者是 create_engine() 函数的参数的 asyncio 等效项。

2.0.16 版本新增。

function sqlalchemy.ext.asyncio.async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any) AsyncEngine

使用配置字典创建一个新的 AsyncEngine 实例。

此函数类似于 SQLAlchemy Core 中的 engine_from_config() 函数,不同之处在于请求的 dialect 必须是 asyncio 兼容的 dialect,例如 asyncpg。此函数的参数签名与 engine_from_config() 的参数签名相同。

1.4.29 版本新增。

function sqlalchemy.ext.asyncio.create_async_pool_from_url(url: str | URL, **kwargs: Any) Pool

创建一个新的 async engine 实例。

传递给 create_async_pool_from_url() 的参数与传递给 create_pool_from_url() 函数的参数基本相同。指定的 dialect 必须是 asyncio 兼容的 dialect,例如 asyncpg

2.0.10 版本新增。

class sqlalchemy.ext.asyncio.AsyncEngine

用于 Engine 的 asyncio 代理。

AsyncEngine 使用 create_async_engine() 函数获取

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncEngine (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.AsyncConnectable)

method sqlalchemy.ext.asyncio.AsyncEngine.begin() AsyncIterator[AsyncConnection]

返回一个上下文管理器,当进入该管理器时,将传递一个已建立 AsyncTransactionAsyncConnection

例如:

async with async_engine.begin() as conn:
    await conn.execute(
        text("insert into table (x, y, z) values (1, 2, 3)")
    )
    await conn.execute(text("my_special_procedure(5)"))
method sqlalchemy.ext.asyncio.AsyncEngine.clear_compiled_cache() None

清除与 dialect 关联的已编译缓存。

代表 AsyncEngine 类代理 Engine 类。

适用于通过 create_engine.query_cache_size 参数建立的内置缓存。它不会影响通过 Connection.execution_options.compiled_cache 参数传递的任何字典缓存。

版本 1.4 中的新功能。

method sqlalchemy.ext.asyncio.AsyncEngine.connect() AsyncConnection

返回一个 AsyncConnection 对象。

AsyncConnection 作为异步上下文管理器进入时,它将从底层连接池中获取数据库连接

async with async_engine.connect() as conn:
    result = await conn.execute(select(user_table))

AsyncConnection 也可以通过调用其 AsyncConnection.start() 方法在上下文管理器外部启动。

attribute sqlalchemy.ext.asyncio.AsyncEngine.dialect

代表 AsyncEngine 类代理 Engine.dialect 属性。

method sqlalchemy.ext.asyncio.AsyncEngine.async dispose(close: bool = True) None

释放此 AsyncEngine 使用的连接池。

参数:

close

如果保留默认值 True,则效果是完全关闭所有当前检入的数据库连接。 仍然检出的连接将不会关闭,但是它们将不再与此 Engine 关联,因此当它们单独关闭时,最终与它们关联的 Pool 将被垃圾回收,并且如果尚未在检入时关闭,它们将被完全关闭。

如果设置为 False,则取消引用先前的连接池,并且在其他方面不会以任何方式触及它。

另请参阅

Engine.dispose()

attribute sqlalchemy.ext.asyncio.AsyncEngine.driver

Engine 使用的 Dialect 的驱动程序名称。

代表 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.echo

True 时,为此元素启用日志输出。

代表 AsyncEngine 类代理 Engine 类。

这具有为该元素类和对象引用的命名空间设置 Python 日志级别的效果。布尔值 True 表示将为记录器设置日志级别 logging.INFO,而字符串值 debug 将日志级别设置为 logging.DEBUG

attribute sqlalchemy.ext.asyncio.AsyncEngine.engine

返回此 Engine

代表 AsyncEngine 类代理 Engine 类。

用于接受同一变量中 Connection / Engine 对象的旧方案。

method sqlalchemy.ext.asyncio.AsyncEngine.execution_options(**opt: Any) AsyncEngine

返回一个新的 AsyncEngine,它将提供具有给定执行选项的 AsyncConnection 对象。

Engine.execution_options() 代理。 有关详细信息,请参见该方法。

method sqlalchemy.ext.asyncio.AsyncEngine.get_execution_options() _ExecuteOptions

获取将在执行期间生效的非 SQL 选项。

代表 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.name

Engine 使用的 Dialect 的字符串名称。

代表 AsyncEngine 类代理 Engine 类。

attribute sqlalchemy.ext.asyncio.AsyncEngine.pool

代表 AsyncEngine 类代理 Engine.pool 属性。

method sqlalchemy.ext.asyncio.AsyncEngine.async raw_connection() PoolProxiedConnection

从连接池返回“原始” DBAPI 连接。

attribute sqlalchemy.ext.asyncio.AsyncEngine.sync_engine: Engine

对此 AsyncEngine 代理请求的同步样式 Engine 的引用。

此实例可以用作事件目标。

method sqlalchemy.ext.asyncio.AsyncEngine.update_execution_options(**opt: Any) None

更新此 Engine 的默认 execution_options 字典。

代表 AsyncEngine 类代理 Engine 类。

给定的 **opt 中的键/值对将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过 execution_options 参数发送到 create_engine()

attribute sqlalchemy.ext.asyncio.AsyncEngine.url

代表 AsyncEngine 类代理 Engine.url 属性。

class sqlalchemy.ext.asyncio.AsyncConnection

用于 Connection 的 asyncio 代理。

AsyncConnection 使用 AsyncEngine.connect() 方法从 AsyncEngine 获取

from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")

async with engine.connect() as conn:
    result = await conn.execute(select(table))

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncConnection (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.base.StartableContext, sqlalchemy.ext.asyncio.AsyncConnectable)

method sqlalchemy.ext.asyncio.AsyncConnection.async aclose() None

AsyncConnection.close() 同义。

AsyncConnection.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing 上下文管理器函数。

2.0.20 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.begin() AsyncTransaction

在自动开始之前开始事务。

method sqlalchemy.ext.asyncio.AsyncConnection.begin_nested() AsyncTransaction

开始嵌套事务并返回事务句柄。

method sqlalchemy.ext.asyncio.AsyncConnection.async close() None

关闭此 AsyncConnection

如果存在事务,这也会导致回滚事务。

attribute sqlalchemy.ext.asyncio.AsyncConnection.closed

如果此连接已关闭,则返回 True。

代表 AsyncConnection 类代理 Connection 类。

method sqlalchemy.ext.asyncio.AsyncConnection.async commit() None

提交当前正在进行的事务。

如果已启动事务,则此方法将提交当前事务。 如果未启动事务,则该方法不起作用,前提是连接处于非无效状态。

每当首次执行语句时,或者当调用 Connection.begin() 方法时,都会在 Connection 上自动开始事务。

attribute sqlalchemy.ext.asyncio.AsyncConnection.connection

未为异步实现;请调用 AsyncConnection.get_raw_connection()

attribute sqlalchemy.ext.asyncio.AsyncConnection.default_isolation_level

与正在使用的 Dialect 关联的初始连接时隔离级别。

代表 AsyncConnection 类代理 Connection 类。

此值独立于 Connection.execution_options.isolation_levelEngine.execution_options.isolation_level 执行选项,并且由 Dialect 在创建第一个连接时确定,方法是在发出任何其他命令之前对数据库执行 SQL 查询以获取当前隔离级别。

调用此访问器不会调用任何新的 SQL 查询。

另请参阅

Connection.get_isolation_level() - 查看当前实际隔离级别

create_engine.isolation_level - 设置每个 Engine 的隔离级别

Connection.execution_options.isolation_level - 设置每个 Connection 的隔离级别

attribute sqlalchemy.ext.asyncio.AsyncConnection.dialect

代表 AsyncConnection 类代理 Connection.dialect 属性。

method sqlalchemy.ext.asyncio.AsyncConnection.async exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行驱动程序级别的 SQL 字符串并返回缓冲的 Result

method sqlalchemy.ext.asyncio.AsyncConnection.async execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) CursorResult[Any]

执行 SQL 语句构造并返回缓冲的 Result

参数:
  • object

    要执行的语句。 这始终是一个同时位于 ClauseElementExecutable 层级结构中的对象,包括

  • parameters – 将绑定到语句中的参数。 这可以是参数名称到值的字典,也可以是字典的可变序列(例如,列表)。 当传递字典列表时,底层语句执行将使用 DBAPI cursor.executemany() 方法。 当传递单个字典时,将使用 DBAPI cursor.execute() 方法。

  • execution_options – 可选的执行选项字典,它将与语句执行关联。 此字典可以提供 Connection.execution_options() 接受的选项的子集。

返回:

一个 Result 对象。

method sqlalchemy.ext.asyncio.AsyncConnection.async execution_options(**opt: Any) AsyncConnection

为连接设置非 SQL 选项,这些选项在执行期间生效。

这将返回添加了新选项的 AsyncConnection 对象。

有关此方法的完整详细信息,请参阅 Connection.execution_options()

method sqlalchemy.ext.asyncio.AsyncConnection.get_nested_transaction() AsyncTransaction | None

返回一个 AsyncTransaction,表示当前的嵌套(保存点)事务(如果有)。

这利用底层同步连接的 Connection.get_nested_transaction() 方法来获取当前的 Transaction,然后在新的 AsyncTransaction 对象中代理它。

1.4.0b2 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.async get_raw_connection() PoolProxiedConnection

返回此 AsyncConnection 使用的池化 DBAPI 级别的连接。

这是一个 SQLAlchemy 连接池代理的连接,它具有属性 _ConnectionFairy.driver_connection,该属性指向实际的驱动程序连接。 它的 _ConnectionFairy.dbapi_connection 指的是一个 AdaptedConnection 实例,该实例使驱动程序连接适应 DBAPI 协议。

method sqlalchemy.ext.asyncio.AsyncConnection.get_transaction() AsyncTransaction | None

返回一个 AsyncTransaction,表示当前的事务(如果有)。

这利用底层同步连接的 Connection.get_transaction() 方法来获取当前的 Transaction,然后在新的 AsyncTransaction 对象中代理它。

1.4.0b2 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.in_nested_transaction() bool

如果事务正在进行中,则返回 True。

1.4.0b2 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.in_transaction() bool

如果事务正在进行中,则返回 True。

attribute sqlalchemy.ext.asyncio.AsyncConnection.info

返回底层 ConnectionConnection.info 字典。

此字典是可自由写入的,用于将用户定义的状态与数据库连接关联。

仅当 AsyncConnection 当前已连接时,此属性才可用。 如果 AsyncConnection.closed 属性为 True,则访问此属性将引发 ResourceClosedError

1.4.0b2 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.async invalidate(exception: BaseException | None = None) None

使与此 Connection 关联的底层 DBAPI 连接无效。

有关此方法的完整详细信息,请参阅方法 Connection.invalidate()

attribute sqlalchemy.ext.asyncio.AsyncConnection.invalidated

如果此连接已失效,则返回 True。

代表 AsyncConnection 类代理 Connection 类。

但是,这并不表示连接是否在池级别失效。

method sqlalchemy.ext.asyncio.AsyncConnection.async rollback() None

回滚当前正在进行的事务。

如果已启动事务,则此方法会回滚当前事务。 如果未启动事务,则此方法无效。 如果已启动事务且连接处于失效状态,则使用此方法清除事务。

每当首次执行语句时,或者当调用 Connection.begin() 方法时,都会在 Connection 上自动开始事务。

method sqlalchemy.ext.asyncio.AsyncConnection.async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P) _T

调用给定的同步(即非异步)可调用对象,并将同步样式的 Connection 作为第一个参数传递。

此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。

例如:

def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str:
    """A synchronous function that does not require awaiting

    :param conn: a Core SQLAlchemy Connection, used synchronously

    :return: an optional return value is supported

    """
    conn.execute(some_table.insert().values(int_col=arg1, str_col=arg2))
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    """an async function that uses awaiting"""

    async with async_engine.begin() as async_conn:
        # run do_something_with_core() with a sync-style
        # Connection, proxied into an awaitable
        return_code = await async_conn.run_sync(
            do_something_with_core, 5, "strval"
        )
        print(return_code)

此方法通过在专门检测的 greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。

`AsyncConnection.run_sync()` 最基本的使用是调用诸如 MetaData.create_all() 之类的方法,给定一个需要作为 Connection 对象提供给 MetaData.create_all()AsyncConnection

# run metadata.create_all(conn) with a sync-style Connection,
# proxied into an awaitable
with async_engine.begin() as conn:
    await conn.run_sync(metadata.create_all)

注意

提供的可调用对象在 asyncio 事件循环中内联调用,并将阻塞传统的 IO 调用。 此可调用对象中的 IO 应该仅调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将正确地适应 greenlet 上下文。

method sqlalchemy.ext.asyncio.AsyncConnection.async scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) Any

执行 SQL 语句构造并返回标量对象。

此方法是在调用 Connection.execute() 方法后调用 Result.scalar() 方法的简写形式。 参数是等效的。

返回:

表示返回的第一行第一列的标量 Python 值。

method sqlalchemy.ext.asyncio.AsyncConnection.async scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) ScalarResult[Any]

执行 SQL 语句构造并返回标量对象。

此方法是在调用 Connection.execute() 方法后调用 Result.scalars() 方法的简写形式。 参数是等效的。

返回:

一个 ScalarResult 对象。

1.4.24 版本新增。

method sqlalchemy.ext.asyncio.AsyncConnection.async start(is_ctxmanager: bool = False) AsyncConnection

在不使用 Python with: 块的情况下启动此 AsyncConnection 对象的上下文。

method sqlalchemy.ext.asyncio.AsyncConnection.stream(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) AsyncIterator[AsyncResult[Any]]

执行语句并返回一个可等待对象,该对象产生一个 AsyncResult 对象。

例如:

result = await conn.stream(stmt)
async for row in result:
    print(f"{row}")

AsyncConnection.stream() 方法支持针对 AsyncResult 对象的可选上下文管理器使用,如

async with conn.stream(stmt) as result:
    async for row in result:
        print(f"{row}")

在上述模式中,即使迭代器被异常抛出中断,也会无条件调用 AsyncResult.close() 方法。 但是,上下文管理器的使用仍然是可选的,并且该函数可以以 async with fn():await fn() 样式调用。

2.0.0b3 版本新增: 添加了上下文管理器支持

返回:

一个可等待对象,它将产生一个 AsyncResult 对象。

method sqlalchemy.ext.asyncio.AsyncConnection.stream_scalars(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) AsyncIterator[AsyncScalarResult[Any]]

执行语句并返回一个可等待对象,该对象产生一个 AsyncScalarResult 对象。

例如:

result = await conn.stream_scalars(stmt)
async for scalar in result:
    print(f"{scalar}")

此方法是调用 Connection.stream() 方法后调用 AsyncResult.scalars() 方法的简写形式。参数是等效的。

AsyncConnection.stream_scalars() 方法支持对 AsyncScalarResult 对象的可选上下文管理器使用,如下所示

async with conn.stream_scalars(stmt) as result:
    async for scalar in result:
        print(f"{scalar}")

在上述模式中,即使迭代器被异常抛出中断,AsyncScalarResult.close() 方法也会被无条件调用。但是,上下文管理器的使用仍然是可选的,并且该函数可以在 async with fn():await fn() 样式中调用。

2.0.0b3 版本新增: 添加了上下文管理器支持

返回:

一个可等待对象,它将产生一个 AsyncScalarResult 对象。

1.4.24 版本新增。

attribute sqlalchemy.ext.asyncio.AsyncConnection.sync_connection: Connection | None

对同步样式 Connection 的引用,此 AsyncConnection 将请求代理到该连接。

此实例可以用作事件目标。

attribute sqlalchemy.ext.asyncio.AsyncConnection.sync_engine: Engine

对同步样式 Engine 的引用,此 AsyncConnection 通过其底层 Connection 与之关联。

此实例可以用作事件目标。

class sqlalchemy.ext.asyncio.AsyncTransaction

用于 Transaction 的 asyncio 代理。

类签名

class sqlalchemy.ext.asyncio.AsyncTransaction (sqlalchemy.ext.asyncio.base.ProxyComparable, sqlalchemy.ext.asyncio.base.StartableContext)

method sqlalchemy.ext.asyncio.AsyncTransaction.async close() None

关闭此 AsyncTransaction

如果此事务是 begin/commit 嵌套中的基本事务,则该事务将 rollback()。否则,该方法返回。

这用于取消事务,而不影响封闭事务的范围。

method sqlalchemy.ext.asyncio.AsyncTransaction.async commit() None

提交此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncTransaction.async rollback() None

回滚此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncTransaction.async start(is_ctxmanager: bool = False) AsyncTransaction

在此 AsyncTransaction 对象之外启动上下文,无需使用 Python with: 块。

结果集 API 文档

AsyncResult 对象是 Result 对象的异步适配版本。它仅在使用 AsyncConnection.stream()AsyncSession.stream() 方法时返回,这些方法返回一个位于活动数据库游标之上的结果对象。

对象名称 描述

AsyncMappingResult

一个 AsyncResult 的包装器,它返回字典值而不是 Row 值。

AsyncResult

围绕 Result 对象的 asyncio 包装器。

AsyncScalarResult

一个 AsyncResult 的包装器,它返回标量值而不是 Row 值。

AsyncTupleResult

一个 AsyncResult,其类型被指定为返回纯 Python 元组而不是行。

class sqlalchemy.ext.asyncio.AsyncResult

围绕 Result 对象的 asyncio 包装器。

AsyncResult 仅适用于使用服务器端游标的语句执行。它仅从 AsyncConnection.stream()AsyncSession.stream() 方法返回。

注意

Result 的情况一样,此对象用于 AsyncSession.execute() 返回的 ORM 结果,它可以单独或在类似元组的行中生成 ORM 映射对象的实例。请注意,这些结果对象不会像传统的 Query 对象那样自动去重实例或行。对于实例或行的 Python 内去重,请使用 AsyncResult.unique() 修饰符方法。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncResult (sqlalchemy.engine._WithKeys, sqlalchemy.ext.asyncio.AsyncCommon)

method sqlalchemy.ext.asyncio.AsyncResult.async all() Sequence[Row[_TP]]

返回列表中所有行。

调用后关闭结果集。后续调用将返回一个空列表。

返回:

一个 Row 对象的列表。

method sqlalchemy.ext.asyncio.AsyncResult.async close() None

继承自 AsyncCommon.close() 方法,来自 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncResult.closed

继承自 AsyncCommon.closed 属性,来自 AsyncCommon

代理底层结果对象的 .closed 属性(如果有),否则引发 AttributeError

2.0.0b3 版本中的新功能。

method sqlalchemy.ext.asyncio.AsyncResult.columns(*col_expressions: _KeyIndexType) Self

建立应在每行中返回的列。

请参阅同步 SQLAlchemy API 中的 Result.columns() 以获得完整的行为描述。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchall() Sequence[Row[_TP]]

AsyncResult.all() 方法同义。

2.0 版本中的新功能。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchmany(size: int | None = None) Sequence[Row[_TP]]

获取多行。

当所有行都耗尽时,返回一个空列表。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要分组获取行,请使用 AsyncResult.partitions() 方法。

返回:

一个 Row 对象的列表。

method sqlalchemy.ext.asyncio.AsyncResult.async fetchone() Row[_TP] | None

获取一行。

当所有行都耗尽时,返回 None。

此方法是为了向后兼容 SQLAlchemy 1.x.x 而提供的。

要仅获取结果的第一行,请使用 AsyncResult.first() 方法。要迭代所有行,请直接迭代 AsyncResult 对象。

返回:

如果没有应用过滤器,则返回 Row 对象;如果不再有行,则返回 None

method sqlalchemy.ext.asyncio.AsyncResult.async first() Row[_TP] | None

获取第一行,如果没有行则返回 None

关闭结果集并丢弃剩余行。

注意

默认情况下,此方法返回一个,例如元组。要返回完全一个标量值,即第一行的第一列,请使用 AsyncResult.scalar() 方法,或结合使用 AsyncResult.scalars()AsyncResult.first()

此外,与传统的 ORM Query.first() 方法的行为相反,没有对调用以生成此 AsyncResult 的 SQL 查询应用限制;对于在生成行之前在内存中缓冲结果的 DBAPI 驱动程序,所有行都将发送到 Python 进程,并且除第一行之外的所有行都将被丢弃。

返回:

一个 Row 对象,如果没有行剩余,则为 None。

method sqlalchemy.ext.asyncio.AsyncResult.async freeze() FrozenResult[_TP]

返回一个可调用对象,该对象将在调用时生成此 AsyncResult 的副本。

返回的可调用对象是 FrozenResult 的实例。

这用于结果集缓存。该方法必须在结果未被消耗时调用,并且调用该方法将完全消耗结果。当从缓存中检索 FrozenResult 时,可以多次调用它,每次都会针对其存储的行集生成一个新的 Result 对象。

另请参阅

重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。

method sqlalchemy.ext.asyncio.AsyncResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法,来自 sqlalchemy.engine._WithKeys

返回一个可迭代的视图,该视图生成每个 Row 将表示的字符串键。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

该视图还可以使用 Python in 运算符测试键是否包含,这将测试视图中表示的字符串键以及备用键(例如列对象)。

在 1.4 版本中变更: 返回键视图对象而不是普通列表。

method sqlalchemy.ext.asyncio.AsyncResult.mappings() AsyncMappingResult

将映射过滤器应用于返回的行,返回 AsyncMappingResult 的实例。

当应用此过滤器时,获取行将返回 RowMapping 对象而不是 Row 对象。

返回:

一个新的 AsyncMappingResult 过滤对象,引用底层 Result 对象。

method sqlalchemy.ext.asyncio.AsyncResult.async one() Row[_TP]

返回恰好一行,否则引发异常。

如果结果不返回任何行,则引发 NoResultFound;如果返回多行,则引发 MultipleResultsFound

注意

默认情况下,此方法返回一个,例如元组。要返回完全一个标量值,即第一行的第一列,请使用 AsyncResult.scalar_one() 方法,或结合使用 AsyncResult.scalars()AsyncResult.one()

版本 1.4 中的新功能。

返回:

第一个 Row

引发:

MultipleResultsFound, NoResultFound

method sqlalchemy.ext.asyncio.AsyncResult.async one_or_none() Row[_TP] | None

返回最多一个结果,否则引发异常。

如果结果没有行,则返回 None。如果返回多行,则引发 MultipleResultsFound 异常。

版本 1.4 中的新功能。

返回:

返回第一个 RowNone(如果没有行可用)。

引发:

MultipleResultsFound(返回多个结果异常)

method sqlalchemy.ext.asyncio.AsyncResult.async partitions(size: int | None = None) AsyncIterator[Sequence[Row[_TP]]]

迭代遍历给定大小的行的子列表。

返回一个异步迭代器

async def scroll_results(connection):
    result = await connection.stream(select(users_table))

    async for partition in result.partitions(100):
        print("list of rows: %s" % partition)

有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.partitions()

method sqlalchemy.ext.asyncio.AsyncResult.async scalar() Any

获取第一行的第一列,并关闭结果集。

如果没有行可获取,则返回 None

不执行验证来测试是否还有其他行。

调用此方法后,对象将完全关闭,例如,CursorResult.close() 方法将被调用。

返回:

一个 Python 标量值,如果没有剩余行,则返回 None

method sqlalchemy.ext.asyncio.AsyncResult.async scalar_one() Any

返回恰好一个标量结果,否则引发异常。

这等效于调用 AsyncResult.scalars(),然后调用 AsyncScalarResult.one()

method sqlalchemy.ext.asyncio.AsyncResult.async scalar_one_or_none() Any | None

返回恰好一个标量结果或 None

这等效于调用 AsyncResult.scalars(),然后调用 AsyncScalarResult.one_or_none()

method sqlalchemy.ext.asyncio.AsyncResult.scalars(index: _KeyIndexType = 0) AsyncScalarResult[Any]

返回一个 AsyncScalarResult 过滤对象,它将返回单个元素,而不是 Row 对象。

有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.scalars()

参数:

index – 整数或行键,指示要从每行获取的列,默认为 0,表示第一列。

返回:

一个新的 AsyncScalarResult 过滤对象,引用此 AsyncResult 对象。

attribute sqlalchemy.ext.asyncio.AsyncResult.t

将“类型化元组”类型过滤器应用于返回的行。

AsyncResult.t 属性是调用 AsyncResult.tuples() 方法的同义词。

2.0 版本中的新功能。

method sqlalchemy.ext.asyncio.AsyncResult.tuples() AsyncTupleResult[_TP]

将“类型化元组”类型过滤器应用于返回的行。

此方法在运行时返回相同的 AsyncResult 对象,但在类型提示中注释为返回一个 AsyncTupleResult 对象,这将向 PEP 484 类型工具表明返回的是普通的类型化 Tuple 实例,而不是行。这允许对 Row 对象进行元组解包和 __getitem__ 访问进行类型化,适用于语句调用本身包含类型信息的情况。

2.0 版本中的新功能。

返回:

类型提示时的 AsyncTupleResult 类型。

另请参阅

AsyncResult.t - 更短的同义词

Row.t - Row 版本

method sqlalchemy.ext.asyncio.AsyncResult.unique(strategy: _UniqueFilterType | None = None) Self

将唯一性过滤应用于此 AsyncResult 返回的对象。

有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 Result.unique()

method sqlalchemy.ext.asyncio.AsyncResult.yield_per(num: int) Self

继承自 FilterResult.yield_per() 方法,属于 FilterResult

配置每次获取 num 行的行获取策略。

FilterResult.yield_per() 方法是对 Result.yield_per() 方法的传递。有关用法说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.ext.asyncio.AsyncScalarResult

一个 AsyncResult 的包装器,它返回标量值而不是 Row 值。

AsyncScalarResult 对象通过调用 AsyncResult.scalars() 方法获取。

有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 ScalarResult 对象。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncScalarResult (sqlalchemy.ext.asyncio.AsyncCommon>)

method sqlalchemy.ext.asyncio.AsyncScalarResult.async all() Sequence[_R]

以列表形式返回所有标量值。

等效于 AsyncResult.all(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async close() None

继承自 AsyncCommon.close() 方法,来自 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncScalarResult.closed

继承自 AsyncCommon.closed 属性,来自 AsyncCommon

代理底层结果对象的 .closed 属性(如果有),否则引发 AttributeError

2.0.0b3 版本中的新功能。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async fetchall() Sequence[_R]

AsyncScalarResult.all() 方法同义。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async fetchmany(size: int | None = None) Sequence[_R]

获取多个对象。

等效于 AsyncResult.fetchmany(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async first() _R | None

获取第一个对象,如果不存在对象,则返回 None

等效于 AsyncResult.first(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async one() _R

返回恰好一个对象,否则引发异常。

等效于 AsyncResult.one(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async one_or_none() _R | None

返回最多一个对象,否则引发异常。

等效于 AsyncResult.one_or_none(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.async partitions(size: int | None = None) AsyncIterator[Sequence[_R]]

迭代遍历给定大小的元素的子列表。

等效于 AsyncResult.partitions(),不同之处在于返回的是标量值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncScalarResult.unique(strategy: _UniqueFilterType | None = None) Self

将唯一性过滤应用于此 AsyncScalarResult 返回的对象。

有关用法详情,请参阅 AsyncResult.unique()

method sqlalchemy.ext.asyncio.AsyncScalarResult.yield_per(num: int) Self

继承自 FilterResult.yield_per() 方法,属于 FilterResult

配置每次获取 num 行的行获取策略。

FilterResult.yield_per() 方法是对 Result.yield_per() 方法的传递。有关用法说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.ext.asyncio.AsyncMappingResult

一个 AsyncResult 的包装器,它返回字典值而不是 Row 值。

AsyncMappingResult 对象通过调用 AsyncResult.mappings() 方法获取。

有关完整的行为描述,请参阅同步 SQLAlchemy API 中的 MappingResult 对象。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncMappingResult (sqlalchemy.engine._WithKeys, sqlalchemy.ext.asyncio.AsyncCommon)

method sqlalchemy.ext.asyncio.AsyncMappingResult.async all() Sequence[RowMapping]

返回列表中所有行。

等同于 AsyncResult.all(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async close() None

继承自 AsyncCommon.close() 方法,来自 AsyncCommon

关闭此结果。

attribute sqlalchemy.ext.asyncio.AsyncMappingResult.closed

继承自 AsyncCommon.closed 属性,来自 AsyncCommon

代理底层结果对象的 .closed 属性(如果有),否则引发 AttributeError

2.0.0b3 版本中的新功能。

method sqlalchemy.ext.asyncio.AsyncMappingResult.columns(*col_expressions: _KeyIndexType) Self

建立应在每行中返回的列。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchall() Sequence[RowMapping]

AsyncMappingResult.all() 方法是同义词。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchmany(size: int | None = None) Sequence[RowMapping]

获取多行。

等同于 AsyncResult.fetchmany(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async fetchone() RowMapping | None

获取一个对象。

等同于 AsyncResult.fetchone(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async first() RowMapping | None

获取第一个对象,如果不存在对象,则返回 None

等同于 AsyncResult.first(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.keys() RMKeyView

继承自 sqlalchemy.engine._WithKeys.keys 方法,来自 sqlalchemy.engine._WithKeys

返回一个可迭代的视图,该视图生成每个 Row 将表示的字符串键。

键可以表示核心语句返回的列的标签,也可以表示 orm 执行返回的 orm 类的名称。

该视图还可以使用 Python in 运算符测试键是否包含,这将测试视图中表示的字符串键以及备用键(例如列对象)。

在 1.4 版本中变更: 返回键视图对象而不是普通列表。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async one() RowMapping

返回恰好一个对象,否则引发异常。

等同于 AsyncResult.one(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async one_or_none() RowMapping | None

返回最多一个对象,否则引发异常。

等同于 AsyncResult.one_or_none(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.async partitions(size: int | None = None) AsyncIterator[Sequence[RowMapping]]

迭代遍历给定大小的元素的子列表。

等同于 AsyncResult.partitions(),除了返回的是 RowMapping 值,而不是 Row 对象。

method sqlalchemy.ext.asyncio.AsyncMappingResult.unique(strategy: _UniqueFilterType | None = None) Self

对此 AsyncMappingResult 返回的对象应用唯一性过滤。

有关用法详情,请参阅 AsyncResult.unique()

method sqlalchemy.ext.asyncio.AsyncMappingResult.yield_per(num: int) Self

继承自 FilterResult.yield_per() 方法,属于 FilterResult

配置每次获取 num 行的行获取策略。

FilterResult.yield_per() 方法是对 Result.yield_per() 方法的传递。有关用法说明,请参阅该方法的文档。

1.4.40 版本新增: - 添加了 FilterResult.yield_per(),以便该方法在所有结果集实现中都可用

class sqlalchemy.ext.asyncio.AsyncTupleResult

一个 AsyncResult,其类型被指定为返回纯 Python 元组而不是行。

由于 Row 在各个方面都已像一个元组,因此此类仅为类型提示类,运行时仍使用常规的 AsyncResult

类签名

class sqlalchemy.ext.asyncio.AsyncTupleResult (sqlalchemy.ext.asyncio.AsyncCommon, sqlalchemy.util.langhelpers.TypingOnly)

ORM Session API 文档

对象名称 描述

async_object_session(instance)

返回给定实例所属的 AsyncSession

async_scoped_session

提供 AsyncSession 对象的范围管理。

async_session(session)

返回代理给定 Session 对象的 AsyncSession(如果有)。

async_sessionmaker

可配置的 AsyncSession 工厂。

AsyncAttrs

混入类,为所有属性提供可等待的访问器。

AsyncSession

Session 的 Asyncio 版本。

AsyncSessionTransaction

ORM SessionTransaction 对象的包装器。

close_all_sessions()

关闭所有 AsyncSession 会话。

function sqlalchemy.ext.asyncio.async_object_session(instance: object) AsyncSession | None

返回给定实例所属的 AsyncSession

此函数使用同步 API 函数 object_session 来检索引用给定实例的 Session,并从那里将其链接到原始的 AsyncSession

如果 AsyncSession 已被垃圾回收,则返回值为 None

此功能也可以从 InstanceState.async_session 访问器获得。

参数:

instance – 一个 ORM 映射的实例

返回:

一个 AsyncSession 对象,或 None

2.0 版本的新功能。

function sqlalchemy.ext.asyncio.async_session(session: Session) AsyncSession | None

返回代理给定 Session 对象的 AsyncSession(如果有)。

参数:

session – 一个 Session 实例。

返回:

一个 AsyncSession 实例,或 None

2.0 版本的新功能。

function async sqlalchemy.ext.asyncio.close_all_sessions() None

关闭所有 AsyncSession 会话。

2.0.23 版本的新功能。

另请参阅

close_all_sessions()

class sqlalchemy.ext.asyncio.async_sessionmaker

可配置的 AsyncSession 工厂。

async_sessionmaker 工厂的工作方式与 sessionmaker 工厂相同,用于在调用时生成新的 AsyncSession 对象,并根据此处建立的配置参数创建它们。

例如:

from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker


async def run_some_sql(
    async_session: async_sessionmaker[AsyncSession],
) -> None:
    async with async_session() as session:
        session.add(SomeObject(data="object"))
        session.add(SomeOtherObject(name="other object"))
        await session.commit()


async def main() -> None:
    # an AsyncEngine, which the AsyncSession will use for connection
    # resources
    engine = create_async_engine(
        "postgresql+asyncpg://scott:tiger@localhost/"
    )

    # create a reusable factory for new AsyncSession instances
    async_session = async_sessionmaker(engine)

    await run_some_sql(async_session)

    await engine.dispose()

async_sessionmaker 非常有用,这样程序的不同部分可以使用预先建立的固定配置来创建新的 AsyncSession 对象。 请注意,在不使用 async_sessionmaker 时,也可以直接实例化 AsyncSession 对象。

2.0 版本新增: async_sessionmaker 提供了一个专用于 AsyncSession 对象的 sessionmaker 类,包括 pep-484 类型提示支持。

另请参阅

概要 - ORM - 显示使用示例

sessionmaker - sessionmaker 的通用概述

sessionmaker 架构

打开和关闭会话 - 关于使用 sessionmaker 创建会话的介绍性文本。

类签名

class sqlalchemy.ext.asyncio.async_sessionmaker (typing.Generic)

method sqlalchemy.ext.asyncio.async_sessionmaker.__call__(**local_kw: Any) _AS

使用此 async_sessionmaker 中建立的配置生成一个新的 AsyncSession 对象。

在 Python 中,当对象像函数一样被“调用”时,将调用 __call__ 方法。

AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False)
session = AsyncSession()  # invokes sessionmaker.__call__()
method sqlalchemy.ext.asyncio.async_sessionmaker.__init__(bind: Optional[_AsyncSessionBind] = None, *, class_: Type[_AS] = <class 'sqlalchemy.ext.asyncio.session.AsyncSession'>, autoflush: bool = True, expire_on_commit: bool = True, info: Optional[_InfoType] = None, **kw: Any)

构造一个新的 async_sessionmaker

除了 class_ 之外,此处的所有参数都对应于 Session 直接接受的参数。 有关参数的更多详细信息,请参阅 AsyncSession.__init__() 文档字符串。

method sqlalchemy.ext.asyncio.async_sessionmaker.begin() _AsyncSessionContextManager[_AS]

生成一个上下文管理器,它既提供一个新的 AsyncSession,又提供一个提交的事务。

例如:

async def main():
    Session = async_sessionmaker(some_engine)

    async with Session.begin() as session:
        session.add(some_object)

    # commits transaction, closes session
方法 sqlalchemy.ext.asyncio.async_sessionmaker.configure(**new_kw: Any) None

(重新)配置此 async_sessionmaker 的参数。

例如:

AsyncSession = async_sessionmaker(some_engine)

AsyncSession.configure(bind=create_async_engine("sqlite+aiosqlite://"))
sqlalchemy.ext.asyncio.async_scoped_session

提供 AsyncSession 对象的范围管理。

请参阅 使用 asyncio 作用域会话 部分,了解使用详情。

1.4.19 版本新增。

类签名

sqlalchemy.ext.asyncio.async_scoped_session (typing.Generic)

方法 sqlalchemy.ext.asyncio.async_scoped_session.__call__(**kw: Any) _AS

返回当前的 AsyncSession,如果不存在,则使用 scoped_session.session_factory 创建它。

参数:

**kw – 如果不存在现有的 AsyncSession,则关键字参数将传递给 scoped_session.session_factory 可调用对象。如果 AsyncSession 存在并且已传递关键字参数,则会引发 InvalidRequestError

方法 sqlalchemy.ext.asyncio.async_scoped_session.__init__(session_factory: async_sessionmaker[_AS], scopefunc: Callable[[], Any])

构造一个新的 async_scoped_session

参数:
  • session_factory – 用于创建新的 AsyncSession 实例的工厂。这通常(但不一定)是 async_sessionmaker 的实例。

  • scopefunc – 定义当前作用域的函数。诸如 asyncio.current_task 之类的函数在这里可能很有用。

方法 sqlalchemy.ext.asyncio.async_scoped_session.async aclose() None

AsyncSession.close() 的同义词。

代表 async_scoped_session 类,代理给 AsyncSession 类。

AsyncSession.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing 上下文管理器函数。

2.0.20 版本新增。

方法 sqlalchemy.ext.asyncio.async_scoped_session.add(instance: object, _warn: bool = True) None

将对象放入此 Session 中。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

当传递给 Session.add() 方法时,处于瞬态状态的对象将移动到挂起状态,直到下一次刷新,届时它们将移动到持久状态。

当传递给 Session.add() 方法时,处于分离状态的对象将直接移动到持久状态。

如果 Session 使用的事务回滚,则在传递给 Session.add() 时处于瞬态的对象将移回瞬态状态,并且将不再存在于此 Session 中。

方法 sqlalchemy.ext.asyncio.async_scoped_session.add_all(instances: Iterable[object]) None

将给定的实例集合添加到此 Session

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

有关一般的行为描述,请参阅 Session.add() 的文档。

属性 sqlalchemy.ext.asyncio.async_scoped_session.autoflush

代表 AsyncSession 类,代理给 Session.autoflush 属性。

代表 async_scoped_session 类,代理给 AsyncSession 类。

方法 sqlalchemy.ext.asyncio.async_scoped_session.begin() AsyncSessionTransaction

返回 AsyncSessionTransaction 对象。

代表 async_scoped_session 类,代理给 AsyncSession 类。

当进入 AsyncSessionTransaction 对象时,底层 Session 将执行 “begin” 操作

async with async_session.begin():
    ...  # ORM transaction is begun

请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应 SessionEvents.after_transaction_create() 事件钩子,该钩子可能会执行 IO。

有关 ORM begin 的一般说明,请参阅 Session.begin()

方法 sqlalchemy.ext.asyncio.async_scoped_session.begin_nested() AsyncSessionTransaction

返回 AsyncSessionTransaction 对象,该对象将启动“嵌套”事务,例如 SAVEPOINT。

代表 async_scoped_session 类,代理给 AsyncSession 类。

行为与 AsyncSession.begin() 的行为相同。

有关 ORM begin nested 的一般说明,请参阅 Session.begin_nested()

另请参阅

可序列化隔离/保存点/事务性 DDL(asyncio 版本) - 为了使 SAVEPOINT 正常工作,SQLite asyncio 驱动程序需要特殊的解决方法。

属性 sqlalchemy.ext.asyncio.async_scoped_session.bind

代表 async_scoped_session 类,代理给 AsyncSession.bind 属性。

方法 sqlalchemy.ext.asyncio.async_scoped_session.async close() None

关闭此 AsyncSession 使用的事务资源和 ORM 对象。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.close() - “close” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 的语义的详细信息。

async classmethod sqlalchemy.ext.asyncio.async_scoped_session.close_all() None

关闭所有 AsyncSession 会话。

代表 async_scoped_session 类,代理给 AsyncSession 类。

Deprecated since version 2.0: AsyncSession.close_all() 方法已弃用,并将在未来的版本中删除。请参考 close_all_sessions()

方法 sqlalchemy.ext.asyncio.async_scoped_session.async commit() None

提交当前正在进行的事务。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.commit() - “commit” 的主要文档

方法 sqlalchemy.ext.asyncio.async_scoped_session.configure(**kwargs: Any) None

重新配置此 scoped_session 使用的 sessionmaker

请参阅 sessionmaker.configure()

方法 sqlalchemy.ext.asyncio.async_scoped_session.async connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) AsyncConnection

返回与此 Session 对象的事务状态相对应的 AsyncConnection 对象。

代表 async_scoped_session 类,代理给 AsyncSession 类。

此方法还可以用于为当前事务使用的数据库连接建立执行选项。

New in version 1.4.24: 添加了 **kw 参数,这些参数将传递给底层的 Session.connection() 方法。

另请参阅

Session.connection() - “connection” 的主要文档

方法 sqlalchemy.ext.asyncio.async_scoped_session.async delete(instance: object) None

将实例标记为已删除。

代表 async_scoped_session 类,代理给 AsyncSession 类。

数据库删除操作在 flush() 时发生。

由于此操作可能需要沿着未加载的关系进行级联,因此它是可等待的,以允许进行这些查询。

另请参阅

Session.delete() - delete 的主要文档

attribute sqlalchemy.ext.asyncio.async_scoped_session.deleted

Session 中所有标记为 “已删除” 的实例的集合

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

attribute sqlalchemy.ext.asyncio.async_scoped_session.dirty

所有被认为 “脏” 的持久实例的集合。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

例如:

some_mapped_object in session.dirty

当实例被修改但未被删除时,它们被认为是 “脏” 的。

请注意,这种 “脏” 计算是 “乐观的”;大多数属性设置或集合修改操作都会将实例标记为 “脏” 并将其放入此集合中,即使属性的值没有实际更改。在刷新时,每个属性的值都会与其先前保存的值进行比较,如果没有实际更改,则不会发生 SQL 操作(这是一个更昂贵的操作,因此仅在刷新时执行)。

要检查实例的属性是否具有可操作的实际更改,请使用 Session.is_modified() 方法。

method sqlalchemy.ext.asyncio.async_scoped_session.async execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Result[Any]

执行语句并返回缓冲的 Result 对象。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.execute() - execute 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.expire(instance: object, attribute_names: Iterable[str] | None = None) None

使实例上的属性过期。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

将实例的属性标记为过时。当下次访问过期的属性时,将向 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与同一事务中先前读取的值相同的值,而与该事务之外的数据库状态更改无关。

要同时使 Session 中的所有对象过期,请使用 Session.expire_all()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态过期,以便可以为新事务加载新状态。因此,仅当当前事务中发出了非 ORM SQL 语句时,调用 Session.expire() 才有意义。

参数:
  • instance – 要刷新的实例。

  • attribute_names – 可选的字符串属性名称列表,指示要过期的属性子集。

method sqlalchemy.ext.asyncio.async_scoped_session.expire_all() None

使此 Session 中的所有持久实例过期。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

当下次访问持久实例上的任何属性时,将使用 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与同一事务中先前读取的值相同的值,而与该事务之外的数据库状态更改无关。

要使单个对象和这些对象上的单个属性过期,请使用 Session.expire()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态过期,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用 Session.expire_all()

method sqlalchemy.ext.asyncio.async_scoped_session.expunge(instance: object) None

从此 Session 中移除 instance

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

这将释放对实例的所有内部引用。级联将根据 expunge 级联规则应用。

method sqlalchemy.ext.asyncio.async_scoped_session.expunge_all() None

从此 Session 中移除所有对象实例。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

这等效于对该 Session 中的所有对象调用 expunge(obj)

method sqlalchemy.ext.asyncio.async_scoped_session.async flush(objects: Sequence[Any] | None = None) None

将所有对象更改刷新到数据库。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.flush() - flush 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O | None

根据给定的主键标识符返回一个实例,如果未找到,则返回 None

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.get() - get 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) Engine | Connection

返回同步代理的 Session 绑定的 “bind”。

代表 async_scoped_session 类,代理给 AsyncSession 类。

Session.get_bind() 方法不同,此方法当前被此 AsyncSession 以任何方式使用,以便解析请求的引擎。

注意

此方法直接代理到 Session.get_bind() 方法,但是当前用作覆盖目标,这与 Session.get_bind() 方法形成对比。下面的示例说明了如何实现与 AsyncSessionAsyncEngine 一起使用的自定义 Session.get_bind() 方案。

自定义垂直分区 中介绍的模式说明了如何在给定一组 Engine 对象的情况下,将自定义绑定查找方案应用于 Session。要应用相应的 Session.get_bind() 实现以与 AsyncSessionAsyncEngine 对象一起使用,请继续子类化 Session 并使用 AsyncSession.sync_session_class 将其应用于 AsyncSession。内部方法必须继续返回 Engine 实例,这些实例可以使用 AsyncEngine.sync_engine 属性从 AsyncEngine 获取

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    "leader": create_async_engine("sqlite+aiosqlite:///leader.db"),
    "other": create_async_engine("sqlite+aiosqlite:///other.db"),
    "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"),
    "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"),
}


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines["leader"].sync_engine
        else:
            return engines[
                random.choice(["follower1", "follower2"])
            ].sync_engine


# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)

Session.get_bind() 方法在非 asyncio、隐式非阻塞上下文中调用,方式与 ORM 事件挂钩和通过 AsyncSession.run_sync() 调用的函数相同,因此希望在 Session.get_bind() 内部运行 SQL 命令的例程可以继续使用阻塞风格的代码,这将在调用数据库驱动程序上的 IO 时转换为隐式异步调用。

method sqlalchemy.ext.asyncio.async_scoped_session.async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O

根据给定的主键标识符返回一个实例,如果未找到,则引发异常。

代表 async_scoped_session 类,代理给 AsyncSession 类。

如果查询未选择任何行,则引发 sqlalchemy.orm.exc.NoResultFound

..versionadded: 2.0.22

另请参阅

Session.get_one() - get_one 的主要文档

classmethod sqlalchemy.ext.asyncio.async_scoped_session.identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) _IdentityKeyType[Any]

返回标识键。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

这是 identity_key() 的别名。

attribute sqlalchemy.ext.asyncio.async_scoped_session.identity_map

代表 AsyncSession 类代理 Session.identity_map 属性。

代表 async_scoped_session 类,代理给 AsyncSession 类。

attribute sqlalchemy.ext.asyncio.async_scoped_session.info

用户可修改的字典。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

可以使用 info 参数来填充此字典的初始值,该参数用于 Session 构造函数或 sessionmaker 构造函数或工厂方法。此处的字典始终是此 Session 本地的,并且可以独立于所有其他 Session 对象进行修改。

method sqlalchemy.ext.asyncio.async_scoped_session.async invalidate() None

关闭此 Session,使用连接失效。

代表 async_scoped_session 类,代理给 AsyncSession 类。

有关完整说明,请参阅 Session.invalidate()

attribute sqlalchemy.ext.asyncio.async_scoped_session.is_active

如果此 Session 不处于 “部分回滚” 状态,则为 True。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

Changed in version 1.4: Session 不再立即开始新事务,因此当首次实例化 Session 时,此属性将为 False。

“partial rollback” 状态通常表示 Session 的 flush 过程失败,并且必须调用 Session.rollback() 方法才能完全回滚事务。

如果此 Session 完全不在事务中,则 Session 在首次使用时将自动开始事务,因此在这种情况下 Session.is_active 将返回 True。

否则,如果此 Session 位于事务中,并且该事务尚未在内部回滚,则 Session.is_active 也将返回 True。

method sqlalchemy.ext.asyncio.async_scoped_session.is_modified(instance: object, include_collections: bool = True) bool

如果给定实例具有本地修改的属性,则返回 True

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

此方法检索实例上每个已检测属性的历史记录,并将当前值与其先前刷新或提交的值(如果有)进行比较。

实际上,这是一种更昂贵但更准确的方法,用于检查给定实例是否在 Session.dirty 集合中;它会对每个属性的净 “dirty” 状态执行完整测试。

例如:

return session.is_modified(someobject)

此方法有一些注意事项。

  • 当使用此方法测试时,Session.dirty 集合中存在的实例可能会报告 False。这是因为对象可能通过属性突变接收到更改事件,从而将其放入 Session.dirty 中,但最终状态与从数据库加载的状态相同,因此此处没有净更改。

  • 当应用新值时,如果标量属性未加载或已过期,则标量属性可能未记录先前设置的值 - 在这些情况下,即使最终与数据库值相比没有净更改,也假定该属性已更改。在大多数情况下,当设置事件发生时,SQLAlchemy 不需要 “旧” 值,因此如果旧值不存在,它会跳过 SQL 调用的开销,这是基于标量值通常需要 UPDATE 的假设,并且在少数不需要 UPDATE 的情况下,平均而言,执行防御性 SELECT 的成本更低。

    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件获取 “旧” 值。此标志通常为非简单多对一的主键属性和标量对象引用设置。要为任何任意映射列设置此标志,请将 active_history 参数与 column_property() 一起使用。

参数:
  • instance – 要测试待处理更改的映射实例。

  • include_collections – 指示是否应将多值集合包含在操作中。 将此项设置为 False 是一种仅检测基于本地列的属性(即标量列或多对一外键)的方法,这些属性将在 flush 时导致此实例的 UPDATE。

method sqlalchemy.ext.asyncio.async_scoped_session.async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) _O

将给定实例的状态复制到此 AsyncSession 中的相应实例。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.merge() - merge 的主要文档

attribute sqlalchemy.ext.asyncio.async_scoped_session.new

Session 中所有标记为 “new” 的实例的集合。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

attribute sqlalchemy.ext.asyncio.async_scoped_session.no_autoflush

返回一个禁用 autoflush 的上下文管理器。

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

with: 块中进行的操作将不受查询访问时发生的 flush 的影响。这在初始化一系列涉及现有数据库查询的对象时非常有用,此时未完成的对象尚不应进行 flush。

classmethod sqlalchemy.ext.asyncio.async_scoped_session.object_session(instance: object) Session | None

返回对象所属的 Session

代表 async_scoped_session 类,代理给 AsyncSession 类。

代表 AsyncSession 类,代理给 Session 类。

这是 object_session() 的别名。

method sqlalchemy.ext.asyncio.async_scoped_session.async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) None

使给定实例上的属性过期并刷新。

代表 async_scoped_session 类,代理给 AsyncSession 类。

将向数据库发出查询,并且所有属性将使用其当前数据库值刷新。

这是 Session.refresh() 方法的异步版本。 有关所有选项的完整描述,请参阅该方法。

另请参阅

Session.refresh() - refresh 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async remove() None

释放当前的 AsyncSession(如果存在)。

与 scoped_session 的 remove 方法不同,此方法将使用 await 等待 AsyncSession 的 close 方法。

method sqlalchemy.ext.asyncio.async_scoped_session.async reset() None

关闭此 Session 使用的事务资源和 ORM 对象,并将会话重置为其初始状态。

代表 async_scoped_session 类,代理给 AsyncSession 类。

2.0.22 版本新增。

另请参阅

Session.reset() - “reset” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 的语义的详细信息。

method sqlalchemy.ext.asyncio.async_scoped_session.async rollback() None

回滚当前正在进行的事务。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.rollback() - “rollback” 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Any

执行语句并返回标量结果。

代表 async_scoped_session 类,代理给 AsyncSession 类。

另请参阅

Session.scalar() - scalar 的主要文档

method sqlalchemy.ext.asyncio.async_scoped_session.async scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) ScalarResult[Any]

执行语句并返回标量结果。

代表 async_scoped_session 类,代理给 AsyncSession 类。

返回:

一个 ScalarResult 对象

New in version 1.4.24: Added AsyncSession.scalars()

New in version 1.4.26: Added async_scoped_session.scalars()

另请参阅

Session.scalars() - scalars 的主要文档

AsyncSession.stream_scalars() - 流式版本

attribute sqlalchemy.ext.asyncio.async_scoped_session.session_factory: async_sessionmaker[_AS]

提供给 __init__session_factory 存储在此属性中,并且可以在稍后访问。 当需要新的非作用域 AsyncSession 时,这可能很有用。

method sqlalchemy.ext.asyncio.async_scoped_session.async stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncResult[Any]

执行语句并返回流式 AsyncResult 对象。

代表 async_scoped_session 类,代理给 AsyncSession 类。

method sqlalchemy.ext.asyncio.async_scoped_session.async stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncScalarResult[Any]

执行语句并返回标量结果流。

代表 async_scoped_session 类,代理给 AsyncSession 类。

返回:

一个 AsyncScalarResult 对象

1.4.24 版本新增。

另请参阅

Session.scalars() - scalars 的主要文档

AsyncSession.scalars() - 非流式版本

class sqlalchemy.ext.asyncio.AsyncAttrs

混入类,为所有属性提供可等待的访问器。

例如:

from __future__ import annotations

from typing import List

from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(AsyncAttrs, DeclarativeBase):
    pass


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str]
    bs: Mapped[List[B]] = relationship()


class B(Base):
    __tablename__ = "b"
    id: Mapped[int] = mapped_column(primary_key=True)
    a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
    data: Mapped[str]

在上面的示例中,AsyncAttrs mixin 应用于声明性 Base 类,它对所有子类生效。 此 mixin 向所有类添加了一个新的属性 AsyncAttrs.awaitable_attrs,它将把任何属性的值作为 awaitable 产生。 这允许访问可能受延迟加载或延迟/未过期加载影响的属性,以便仍然可以发出 IO。

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

# use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs``
# interface, so that it may be awaited
for b1 in await a1.awaitable_attrs.bs:
    print(b1)

AsyncAttrs.awaitable_attrs 对属性执行调用,该调用大致等效于使用 AsyncSession.run_sync() 方法,例如:

for b1 in await async_session.run_sync(lambda sess: a1.bs):
    print(b1)

版本 2.0.13 中的新功能。

attribute sqlalchemy.ext.asyncio.AsyncAttrs.awaitable_attrs

提供此对象上所有属性的命名空间,这些属性被包装为 awaitables。

例如:

a1 = (await async_session.scalars(select(A).where(A.id == 5))).one()

some_attribute = await a1.awaitable_attrs.some_deferred_attribute
some_collection = await a1.awaitable_attrs.some_collection
class sqlalchemy.ext.asyncio.AsyncSession

Session 的 Asyncio 版本。

AsyncSession 是传统 Session 实例的代理。

AsyncSession **不适合在并发任务中使用。** 有关背景信息,请参阅 Session 是线程安全的吗?AsyncSession 可以安全地在并发任务中共享吗?

版本 1.4 中的新功能。

要将 AsyncSession 与自定义 Session 实现一起使用,请参阅 AsyncSession.sync_session_class 参数。

类签名

class sqlalchemy.ext.asyncio.AsyncSession (sqlalchemy.ext.asyncio.base.ReversibleProxy)

attribute sqlalchemy.ext.asyncio.AsyncSession.sync_session_class: Type[Session] = <class 'sqlalchemy.orm.session.Session'>

提供特定 AsyncSession 的底层 Session 实例的类或可调用对象。

在类级别,此属性是 AsyncSession.sync_session_class 参数的默认值。AsyncSession 的自定义子类可以覆盖它。

在实例级别,此属性指示用于为此 AsyncSession 实例提供 Session 实例的当前类或可调用对象。

1.4.24 版本新增。

method sqlalchemy.ext.asyncio.AsyncSession.__init__(bind: _AsyncSessionBind | None = None, *, binds: Dict[_SessionBindKey, _AsyncSessionBind] | None = None, sync_session_class: Type[Session] | None = None, **kw: Any)

构造一个新的 AsyncSession

除了 sync_session_class 之外的所有参数都直接传递给 sync_session_class 可调用对象,以实例化一个新的 Session。有关参数文档,请参阅 Session.__init__()

参数:

sync_session_class

Session 子类或其他可调用对象,将用于构造将被代理的 Session。此参数可用于提供自定义的 Session 子类。默认为 AsyncSession.sync_session_class 类级别属性。

1.4.24 版本新增。

method sqlalchemy.ext.asyncio.AsyncSession.async aclose() None

AsyncSession.close() 的同义词。

AsyncSession.aclose() 名称专门用于支持 Python 标准库 @contextlib.aclosing 上下文管理器函数。

2.0.20 版本新增。

method sqlalchemy.ext.asyncio.AsyncSession.add(instance: object, _warn: bool = True) None

将对象放入此 Session 中。

代表 AsyncSession 类,代理给 Session 类。

当传递给 Session.add() 方法时,处于瞬态状态的对象将移动到挂起状态,直到下一次刷新,届时它们将移动到持久状态。

当传递给 Session.add() 方法时,处于分离状态的对象将直接移动到持久状态。

如果 Session 使用的事务回滚,则在传递给 Session.add() 时处于瞬态的对象将移回瞬态状态,并且将不再存在于此 Session 中。

method sqlalchemy.ext.asyncio.AsyncSession.add_all(instances: Iterable[object]) None

将给定的实例集合添加到此 Session

代表 AsyncSession 类,代理给 Session 类。

有关一般的行为描述,请参阅 Session.add() 的文档。

attribute sqlalchemy.ext.asyncio.AsyncSession.autoflush

代表 AsyncSession 类,代理给 Session.autoflush 属性。

method sqlalchemy.ext.asyncio.AsyncSession.begin() AsyncSessionTransaction

返回 AsyncSessionTransaction 对象。

当进入 AsyncSessionTransaction 对象时,底层 Session 将执行 “begin” 操作

async with async_session.begin():
    ...  # ORM transaction is begun

请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是按需开始的。但是,begin 块是异步的,以适应 SessionEvents.after_transaction_create() 事件钩子,该钩子可能会执行 IO。

有关 ORM begin 的一般说明,请参阅 Session.begin()

method sqlalchemy.ext.asyncio.AsyncSession.begin_nested() AsyncSessionTransaction

返回 AsyncSessionTransaction 对象,该对象将启动“嵌套”事务,例如 SAVEPOINT。

行为与 AsyncSession.begin() 的行为相同。

有关 ORM begin nested 的一般说明,请参阅 Session.begin_nested()

另请参阅

可序列化隔离/保存点/事务性 DDL(asyncio 版本) - 为了使 SAVEPOINT 正常工作,SQLite asyncio 驱动程序需要特殊的解决方法。

method sqlalchemy.ext.asyncio.AsyncSession.async close() None

关闭此 AsyncSession 使用的事务资源和 ORM 对象。

另请参阅

Session.close() - “close” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 的语义的详细信息。

async classmethod sqlalchemy.ext.asyncio.AsyncSession.close_all() None

关闭所有 AsyncSession 会话。

Deprecated since version 2.0: AsyncSession.close_all() 方法已弃用,并将在未来的版本中删除。请参考 close_all_sessions()

method sqlalchemy.ext.asyncio.AsyncSession.async commit() None

提交当前正在进行的事务。

另请参阅

Session.commit() - “commit” 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) AsyncConnection

返回与此 Session 对象的事务状态相对应的 AsyncConnection 对象。

此方法还可以用于为当前事务使用的数据库连接建立执行选项。

New in version 1.4.24: 添加了 **kw 参数,这些参数将传递给底层的 Session.connection() 方法。

另请参阅

Session.connection() - “connection” 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async delete(instance: object) None

将实例标记为已删除。

数据库删除操作在 flush() 时发生。

由于此操作可能需要沿着未加载的关系进行级联,因此它是可等待的,以允许进行这些查询。

另请参阅

Session.delete() - delete 的主要文档

attribute sqlalchemy.ext.asyncio.AsyncSession.deleted

Session 中所有标记为 “已删除” 的实例的集合

代表 AsyncSession 类,代理给 Session 类。

attribute sqlalchemy.ext.asyncio.AsyncSession.dirty

所有被认为 “脏” 的持久实例的集合。

代表 AsyncSession 类,代理给 Session 类。

例如:

some_mapped_object in session.dirty

当实例被修改但未被删除时,它们被认为是 “脏” 的。

请注意,这种 “脏” 计算是 “乐观的”;大多数属性设置或集合修改操作都会将实例标记为 “脏” 并将其放入此集合中,即使属性的值没有实际更改。在刷新时,每个属性的值都会与其先前保存的值进行比较,如果没有实际更改,则不会发生 SQL 操作(这是一个更昂贵的操作,因此仅在刷新时执行)。

要检查实例的属性是否具有可操作的实际更改,请使用 Session.is_modified() 方法。

method sqlalchemy.ext.asyncio.AsyncSession.async execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Result[Any]

执行语句并返回缓冲的 Result 对象。

另请参阅

Session.execute() - execute 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.expire(instance: object, attribute_names: Iterable[str] | None = None) None

使实例上的属性过期。

代表 AsyncSession 类,代理给 Session 类。

将实例的属性标记为过时。当下次访问过期的属性时,将向 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与同一事务中先前读取的值相同的值,而与该事务之外的数据库状态更改无关。

要同时使 Session 中的所有对象过期,请使用 Session.expire_all()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态过期,以便可以为新事务加载新状态。因此,仅当当前事务中发出了非 ORM SQL 语句时,调用 Session.expire() 才有意义。

参数:
  • instance – 要刷新的实例。

  • attribute_names – 可选的字符串属性名称列表,指示要过期的属性子集。

method sqlalchemy.ext.asyncio.AsyncSession.expire_all() None

使此 Session 中的所有持久实例过期。

代表 AsyncSession 类,代理给 Session 类。

当下次访问持久实例上的任何属性时,将使用 Session 对象的当前事务上下文发出查询,以便加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与同一事务中先前读取的值相同的值,而与该事务之外的数据库状态更改无关。

要使单个对象和这些对象上的单个属性过期,请使用 Session.expire()

Session 对象的默认行为是在调用 Session.rollback()Session.commit() 方法时使所有状态过期,以便可以为新事务加载新状态。因此,假设事务是隔离的,通常不需要调用 Session.expire_all()

method sqlalchemy.ext.asyncio.AsyncSession.expunge(instance: object) None

从此 Session 中移除 instance

代表 AsyncSession 类,代理给 Session 类。

这将释放对实例的所有内部引用。级联将根据 expunge 级联规则应用。

method sqlalchemy.ext.asyncio.AsyncSession.expunge_all() None

从此 Session 中移除所有对象实例。

代表 AsyncSession 类,代理给 Session 类。

这等效于对该 Session 中的所有对象调用 expunge(obj)

method sqlalchemy.ext.asyncio.AsyncSession.async flush(objects: Sequence[Any] | None = None) None

将所有对象更改刷新到数据库。

另请参阅

Session.flush() - flush 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O | None

根据给定的主键标识符返回一个实例,如果未找到,则返回 None

另请参阅

Session.get() - get 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) Engine | Connection

返回同步代理的 Session 绑定的 “bind”。

Session.get_bind() 方法不同,此方法当前被此 AsyncSession 以任何方式使用,以便解析请求的引擎。

注意

此方法直接代理到 Session.get_bind() 方法,但是当前用作覆盖目标,这与 Session.get_bind() 方法形成对比。下面的示例说明了如何实现与 AsyncSessionAsyncEngine 一起使用的自定义 Session.get_bind() 方案。

自定义垂直分区 中介绍的模式说明了如何在给定一组 Engine 对象的情况下,将自定义绑定查找方案应用于 Session。要应用相应的 Session.get_bind() 实现以与 AsyncSessionAsyncEngine 对象一起使用,请继续子类化 Session 并使用 AsyncSession.sync_session_class 将其应用于 AsyncSession。内部方法必须继续返回 Engine 实例,这些实例可以使用 AsyncEngine.sync_engine 属性从 AsyncEngine 获取

# using example from "Custom Vertical Partitioning"


import random

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy.orm import Session

# construct async engines w/ async drivers
engines = {
    "leader": create_async_engine("sqlite+aiosqlite:///leader.db"),
    "other": create_async_engine("sqlite+aiosqlite:///other.db"),
    "follower1": create_async_engine("sqlite+aiosqlite:///follower1.db"),
    "follower2": create_async_engine("sqlite+aiosqlite:///follower2.db"),
}


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kw):
        # within get_bind(), return sync engines
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"].sync_engine
        elif self._flushing or isinstance(clause, (Update, Delete)):
            return engines["leader"].sync_engine
        else:
            return engines[
                random.choice(["follower1", "follower2"])
            ].sync_engine


# apply to AsyncSession using sync_session_class
AsyncSessionMaker = async_sessionmaker(sync_session_class=RoutingSession)

Session.get_bind() 方法在非 asyncio、隐式非阻塞上下文中调用,方式与 ORM 事件挂钩和通过 AsyncSession.run_sync() 调用的函数相同,因此希望在 Session.get_bind() 内部运行 SQL 命令的例程可以继续使用阻塞风格的代码,这将在调用数据库驱动程序上的 IO 时转换为隐式异步调用。

method sqlalchemy.ext.asyncio.AsyncSession.get_nested_transaction() AsyncSessionTransaction | None

返回当前正在进行的嵌套事务(如果有)。

返回:

一个 AsyncSessionTransaction 对象,或者 None

2.0 版本的新功能。

method sqlalchemy.ext.asyncio.AsyncSession.async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) _O

根据给定的主键标识符返回一个实例,如果未找到,则引发异常。

如果查询未选择任何行,则引发 sqlalchemy.orm.exc.NoResultFound

..versionadded: 2.0.22

另请参阅

Session.get_one() - get_one 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.get_transaction() AsyncSessionTransaction | None

返回当前正在进行的根事务(如果有)。

返回:

一个 AsyncSessionTransaction 对象,或者 None

2.0 版本的新功能。

classmethod sqlalchemy.ext.asyncio.AsyncSession.identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) _IdentityKeyType[Any]

返回标识键。

代表 AsyncSession 类,代理给 Session 类。

这是 identity_key() 的别名。

attribute sqlalchemy.ext.asyncio.AsyncSession.identity_map

代表 AsyncSession 类代理 Session.identity_map 属性。

method sqlalchemy.ext.asyncio.AsyncSession.in_nested_transaction() bool

如果此 Session 已开始嵌套事务(例如,SAVEPOINT),则返回 True。

代表 AsyncSession 类,代理给 Session 类。

版本 1.4 中的新功能。

method sqlalchemy.ext.asyncio.AsyncSession.in_transaction() bool

如果此 Session 已开始事务,则返回 True。

代表 AsyncSession 类,代理给 Session 类。

版本 1.4 中的新功能。

另请参阅

Session.is_active

attribute sqlalchemy.ext.asyncio.AsyncSession.info

用户可修改的字典。

代表 AsyncSession 类,代理给 Session 类。

可以使用 info 参数来填充此字典的初始值,该参数用于 Session 构造函数或 sessionmaker 构造函数或工厂方法。此处的字典始终是此 Session 本地的,并且可以独立于所有其他 Session 对象进行修改。

method sqlalchemy.ext.asyncio.AsyncSession.async invalidate() None

关闭此 Session,使用连接失效。

有关完整说明,请参阅 Session.invalidate()

attribute sqlalchemy.ext.asyncio.AsyncSession.is_active

如果此 Session 不处于 “部分回滚” 状态,则为 True。

代表 AsyncSession 类,代理给 Session 类。

Changed in version 1.4: Session 不再立即开始新事务,因此当首次实例化 Session 时,此属性将为 False。

“partial rollback” 状态通常表示 Session 的 flush 过程失败,并且必须调用 Session.rollback() 方法才能完全回滚事务。

如果此 Session 完全不在事务中,则 Session 在首次使用时将自动开始事务,因此在这种情况下 Session.is_active 将返回 True。

否则,如果此 Session 位于事务中,并且该事务尚未在内部回滚,则 Session.is_active 也将返回 True。

method sqlalchemy.ext.asyncio.AsyncSession.is_modified(instance: object, include_collections: bool = True) bool

如果给定实例具有本地修改的属性,则返回 True

代表 AsyncSession 类,代理给 Session 类。

此方法检索实例上每个已检测属性的历史记录,并将当前值与其先前刷新或提交的值(如果有)进行比较。

实际上,这是一种更昂贵但更准确的方法,用于检查给定实例是否在 Session.dirty 集合中;它会对每个属性的净 “dirty” 状态执行完整测试。

例如:

return session.is_modified(someobject)

此方法有一些注意事项。

  • 当使用此方法测试时,Session.dirty 集合中存在的实例可能会报告 False。这是因为对象可能通过属性突变接收到更改事件,从而将其放入 Session.dirty 中,但最终状态与从数据库加载的状态相同,因此此处没有净更改。

  • 当应用新值时,如果标量属性未加载或已过期,则标量属性可能未记录先前设置的值 - 在这些情况下,即使最终与数据库值相比没有净更改,也假定该属性已更改。在大多数情况下,当设置事件发生时,SQLAlchemy 不需要 “旧” 值,因此如果旧值不存在,它会跳过 SQL 调用的开销,这是基于标量值通常需要 UPDATE 的假设,并且在少数不需要 UPDATE 的情况下,平均而言,执行防御性 SELECT 的成本更低。

    仅当属性容器的 active_history 标志设置为 True 时,才会在设置时无条件获取 “旧” 值。此标志通常为非简单多对一的主键属性和标量对象引用设置。要为任何任意映射列设置此标志,请将 active_history 参数与 column_property() 一起使用。

参数:
  • instance – 要测试待处理更改的映射实例。

  • include_collections – 指示是否应将多值集合包含在操作中。 将此项设置为 False 是一种仅检测本地列属性(即标量列或多对一外键)的方法,这些属性将导致此实例在刷新时进行 UPDATE。

method sqlalchemy.ext.asyncio.AsyncSession.async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) _O

将给定实例的状态复制到此 AsyncSession 中的相应实例。

另请参阅

Session.merge() - merge 的主要文档

attribute sqlalchemy.ext.asyncio.AsyncSession.new

Session 中所有标记为 “new” 的实例的集合。

代表 AsyncSession 类,代理给 Session 类。

attribute sqlalchemy.ext.asyncio.AsyncSession.no_autoflush

返回一个禁用 autoflush 的上下文管理器。

代表 AsyncSession 类,代理给 Session 类。

例如:

with session.no_autoflush:

    some_object = SomeClass()
    session.add(some_object)
    # won't autoflush
    some_object.related_thing = session.query(SomeRelated).first()

with: 块中进行的操作将不受查询访问时发生的 flush 的影响。这在初始化一系列涉及现有数据库查询的对象时非常有用,此时未完成的对象尚不应进行 flush。

classmethod sqlalchemy.ext.asyncio.AsyncSession.object_session(instance: object) Session | None

返回对象所属的 Session

代表 AsyncSession 类,代理给 Session 类。

这是 object_session() 的别名。

method sqlalchemy.ext.asyncio.AsyncSession.async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) None

使给定实例上的属性过期并刷新。

将向数据库发出查询,并且所有属性将使用其当前数据库值刷新。

这是 Session.refresh() 方法的异步版本。 有关所有选项的完整描述,请参阅该方法。

另请参阅

Session.refresh() - refresh 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async reset() None

关闭此 Session 使用的事务资源和 ORM 对象,并将会话重置为其初始状态。

2.0.22 版本新增。

另请参阅

Session.reset() - “reset” 的主要文档

关闭 - 有关 AsyncSession.close()AsyncSession.reset() 的语义的详细信息。

method sqlalchemy.ext.asyncio.AsyncSession.async rollback() None

回滚当前正在进行的事务。

另请参阅

Session.rollback() - “rollback” 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.orm.session.Session, ~_P]], ~sqlalchemy.ext.asyncio.session._T], *arg: ~typing.~_P, **kw: ~typing.~_P) _T

调用给定的同步(即非异步)可调用对象,并将同步样式的 Session 作为第一个参数传递。

此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。

例如:

def some_business_method(session: Session, param: str) -> str:
    """A synchronous function that does not require awaiting

    :param session: a SQLAlchemy Session, used synchronously

    :return: an optional return value is supported

    """
    session.add(MyObject(param=param))
    session.flush()
    return "success"


async def do_something_async(async_engine: AsyncEngine) -> None:
    """an async function that uses awaiting"""

    with AsyncSession(async_engine) as async_session:
        # run some_business_method() with a sync-style
        # Session, proxied into an awaitable
        return_code = await async_session.run_sync(
            some_business_method, param="param1"
        )
        print(return_code)

此方法通过在专门检测的 greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。

提示

提供的可调用对象在 asyncio 事件循环中内联调用,并将阻塞传统的 IO 调用。 此可调用对象中的 IO 应该仅调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将正确地适应 greenlet 上下文。

另请参阅

AsyncAttrs - 用于 ORM 映射类的混入,可在每个属性的基础上更简洁地提供类似的功能

AsyncConnection.run_sync()

在 asyncio 下运行同步方法和函数

method sqlalchemy.ext.asyncio.AsyncSession.async scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) Any

执行语句并返回标量结果。

另请参阅

Session.scalar() - scalar 的主要文档

method sqlalchemy.ext.asyncio.AsyncSession.async scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) ScalarResult[Any]

执行语句并返回标量结果。

返回:

一个 ScalarResult 对象

New in version 1.4.24: Added AsyncSession.scalars()

New in version 1.4.26: Added async_scoped_session.scalars()

另请参阅

Session.scalars() - scalars 的主要文档

AsyncSession.stream_scalars() - 流式版本

method sqlalchemy.ext.asyncio.AsyncSession.async stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncResult[Any]

执行语句并返回流式 AsyncResult 对象。

method sqlalchemy.ext.asyncio.AsyncSession.async stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) AsyncScalarResult[Any]

执行语句并返回标量结果流。

返回:

一个 AsyncScalarResult 对象

1.4.24 版本新增。

另请参阅

Session.scalars() - scalars 的主要文档

AsyncSession.scalars() - 非流式版本

attribute sqlalchemy.ext.asyncio.AsyncSession.sync_session: Session

指向底层 Session 的引用,此 AsyncSession 将请求代理到该 Session

此实例可以用作事件目标。

class sqlalchemy.ext.asyncio.AsyncSessionTransaction

ORM SessionTransaction 对象的包装器。

提供此对象是为了可以返回 AsyncSession.begin() 的事务保持对象。

该对象支持显式调用 AsyncSessionTransaction.commit()AsyncSessionTransaction.rollback(),以及用作异步上下文管理器。

版本 1.4 中的新功能。

类签名

class sqlalchemy.ext.asyncio.AsyncSessionTransaction (sqlalchemy.ext.asyncio.base.ReversibleProxy, sqlalchemy.ext.asyncio.base.StartableContext)

method sqlalchemy.ext.asyncio.AsyncSessionTransaction.async commit() None

提交此 AsyncTransaction

method sqlalchemy.ext.asyncio.AsyncSessionTransaction.async rollback() None

回滚此 AsyncTransaction