SQLAlchemy 2.0 文档
SQLAlchemy ORM
- ORM 快速入门
- ORM 映射类配置
- 关系配置
- ORM 查询指南
- 使用 Session
- 事件和内部机制
- ORM 扩展
- 异步 I/O (asyncio)¶
- Asyncio 平台安装说明 (包括 Apple M1)
- 概要 - 核心
- 概要 - ORM
- 在 asyncio 扩展中使用事件
- 使用多个 asyncio 事件循环
- 使用 asyncio 范围会话
- 使用 Inspector 检查架构对象
- 引擎 API 文档
create_async_engine()
async_engine_from_config()
create_async_pool_from_url()
AsyncEngine
AsyncEngine.begin()
AsyncEngine.clear_compiled_cache()
AsyncEngine.connect()
AsyncEngine.dialect
AsyncEngine.dispose()
AsyncEngine.driver
AsyncEngine.echo
AsyncEngine.engine
AsyncEngine.execution_options()
AsyncEngine.get_execution_options()
AsyncEngine.name
AsyncEngine.pool
AsyncEngine.raw_connection()
AsyncEngine.sync_engine
AsyncEngine.update_execution_options()
AsyncEngine.url
AsyncConnection
AsyncConnection.aclose()
AsyncConnection.begin()
AsyncConnection.begin_nested()
AsyncConnection.close()
AsyncConnection.closed
AsyncConnection.commit()
AsyncConnection.connection
AsyncConnection.default_isolation_level
AsyncConnection.dialect
AsyncConnection.exec_driver_sql()
AsyncConnection.execute()
AsyncConnection.execution_options()
AsyncConnection.get_nested_transaction()
AsyncConnection.get_raw_connection()
AsyncConnection.get_transaction()
AsyncConnection.in_nested_transaction()
AsyncConnection.in_transaction()
AsyncConnection.info
AsyncConnection.invalidate()
AsyncConnection.invalidated
AsyncConnection.rollback()
AsyncConnection.run_sync()
AsyncConnection.scalar()
AsyncConnection.scalars()
AsyncConnection.start()
AsyncConnection.stream()
AsyncConnection.stream_scalars()
AsyncConnection.sync_connection
AsyncConnection.sync_engine
AsyncTransaction
- 结果集 API 文档
AsyncResult
AsyncResult.all()
AsyncResult.close()
AsyncResult.closed
AsyncResult.columns()
AsyncResult.fetchall()
AsyncResult.fetchmany()
AsyncResult.fetchone()
AsyncResult.first()
AsyncResult.freeze()
AsyncResult.keys()
AsyncResult.mappings()
AsyncResult.one()
AsyncResult.one_or_none()
AsyncResult.partitions()
AsyncResult.scalar()
AsyncResult.scalar_one()
AsyncResult.scalar_one_or_none()
AsyncResult.scalars()
AsyncResult.t
AsyncResult.tuples()
AsyncResult.unique()
AsyncResult.yield_per()
AsyncScalarResult
AsyncScalarResult.all()
AsyncScalarResult.close()
AsyncScalarResult.closed
AsyncScalarResult.fetchall()
AsyncScalarResult.fetchmany()
AsyncScalarResult.first()
AsyncScalarResult.one()
AsyncScalarResult.one_or_none()
AsyncScalarResult.partitions()
AsyncScalarResult.unique()
AsyncScalarResult.yield_per()
AsyncMappingResult
AsyncMappingResult.all()
AsyncMappingResult.close()
AsyncMappingResult.closed
AsyncMappingResult.columns()
AsyncMappingResult.fetchall()
AsyncMappingResult.fetchmany()
AsyncMappingResult.fetchone()
AsyncMappingResult.first()
AsyncMappingResult.keys()
AsyncMappingResult.one()
AsyncMappingResult.one_or_none()
AsyncMappingResult.partitions()
AsyncMappingResult.unique()
AsyncMappingResult.yield_per()
AsyncTupleResult
- ORM 会话 API 文档
async_object_session()
async_session()
close_all_sessions()
async_sessionmaker
async_scoped_session
async_scoped_session.__call__()
async_scoped_session.__init__()
async_scoped_session.aclose()
async_scoped_session.add()
async_scoped_session.add_all()
async_scoped_session.autoflush
async_scoped_session.begin()
async_scoped_session.begin_nested()
async_scoped_session.bind
async_scoped_session.close()
async_scoped_session.close_all()
async_scoped_session.commit()
async_scoped_session.configure()
async_scoped_session.connection()
async_scoped_session.delete()
async_scoped_session.deleted
async_scoped_session.dirty
async_scoped_session.execute()
async_scoped_session.expire()
async_scoped_session.expire_all()
async_scoped_session.expunge()
async_scoped_session.expunge_all()
async_scoped_session.flush()
async_scoped_session.get()
async_scoped_session.get_bind()
async_scoped_session.get_one()
async_scoped_session.identity_key()
async_scoped_session.identity_map
async_scoped_session.info
async_scoped_session.invalidate()
async_scoped_session.is_active
async_scoped_session.is_modified()
async_scoped_session.merge()
async_scoped_session.new
async_scoped_session.no_autoflush
async_scoped_session.object_session()
async_scoped_session.refresh()
async_scoped_session.remove()
async_scoped_session.reset()
async_scoped_session.rollback()
async_scoped_session.scalar()
async_scoped_session.scalars()
async_scoped_session.session_factory
async_scoped_session.stream()
async_scoped_session.stream_scalars()
AsyncAttrs
AsyncSession
AsyncSession.sync_session_class
AsyncSession.__init__()
AsyncSession.aclose()
AsyncSession.add()
AsyncSession.add_all()
AsyncSession.autoflush
AsyncSession.begin()
AsyncSession.begin_nested()
AsyncSession.close()
AsyncSession.close_all()
AsyncSession.commit()
AsyncSession.connection()
AsyncSession.delete()
AsyncSession.deleted
AsyncSession.dirty
AsyncSession.execute()
AsyncSession.expire()
AsyncSession.expire_all()
AsyncSession.expunge()
AsyncSession.expunge_all()
AsyncSession.flush()
AsyncSession.get()
AsyncSession.get_bind()
AsyncSession.get_nested_transaction()
AsyncSession.get_one()
AsyncSession.get_transaction()
AsyncSession.identity_key()
AsyncSession.identity_map
AsyncSession.in_nested_transaction()
AsyncSession.in_transaction()
AsyncSession.info
AsyncSession.invalidate()
AsyncSession.is_active
AsyncSession.is_modified()
AsyncSession.merge()
AsyncSession.new
AsyncSession.no_autoflush
AsyncSession.object_session()
AsyncSession.refresh()
AsyncSession.reset()
AsyncSession.rollback()
AsyncSession.run_sync()
AsyncSession.scalar()
AsyncSession.scalars()
AsyncSession.stream()
AsyncSession.stream_scalars()
AsyncSession.sync_session
AsyncSessionTransaction
- 关联代理
- 自动映射
- 烘焙查询
- 声明式扩展
- ORM 映射的 Mypy/Pep-484 支持
- 变更跟踪
- 排序列表
- 水平分片
- 混合属性
- 可索引
- 备用类工具
- 异步 I/O (asyncio)¶
- ORM 示例
项目版本
- 上一节: ORM 扩展
- 下一节: 关联代理
- 上一级: 主页
- 本页内容
- 异步 I/O (asyncio)
- Asyncio 平台安装说明 (包括 Apple M1)
- 概要 - 核心
- 概要 - ORM
- 在 asyncio 扩展中使用事件
- 使用多个 asyncio 事件循环
- 使用 asyncio 范围会话
- 使用 Inspector 检查架构对象
- 引擎 API 文档
create_async_engine()
async_engine_from_config()
create_async_pool_from_url()
AsyncEngine
AsyncEngine.begin()
AsyncEngine.clear_compiled_cache()
AsyncEngine.connect()
AsyncEngine.dialect
AsyncEngine.dispose()
AsyncEngine.driver
AsyncEngine.echo
AsyncEngine.engine
AsyncEngine.execution_options()
AsyncEngine.get_execution_options()
AsyncEngine.name
AsyncEngine.pool
AsyncEngine.raw_connection()
AsyncEngine.sync_engine
AsyncEngine.update_execution_options()
AsyncEngine.url
AsyncConnection
AsyncConnection.aclose()
AsyncConnection.begin()
AsyncConnection.begin_nested()
AsyncConnection.close()
AsyncConnection.closed
AsyncConnection.commit()
AsyncConnection.connection
AsyncConnection.default_isolation_level
AsyncConnection.dialect
AsyncConnection.exec_driver_sql()
AsyncConnection.execute()
AsyncConnection.execution_options()
AsyncConnection.get_nested_transaction()
AsyncConnection.get_raw_connection()
AsyncConnection.get_transaction()
AsyncConnection.in_nested_transaction()
AsyncConnection.in_transaction()
AsyncConnection.info
AsyncConnection.invalidate()
AsyncConnection.invalidated
AsyncConnection.rollback()
AsyncConnection.run_sync()
AsyncConnection.scalar()
AsyncConnection.scalars()
AsyncConnection.start()
AsyncConnection.stream()
AsyncConnection.stream_scalars()
AsyncConnection.sync_connection
AsyncConnection.sync_engine
AsyncTransaction
- 结果集 API 文档
AsyncResult
AsyncResult.all()
AsyncResult.close()
AsyncResult.closed
AsyncResult.columns()
AsyncResult.fetchall()
AsyncResult.fetchmany()
AsyncResult.fetchone()
AsyncResult.first()
AsyncResult.freeze()
AsyncResult.keys()
AsyncResult.mappings()
AsyncResult.one()
AsyncResult.one_or_none()
AsyncResult.partitions()
AsyncResult.scalar()
AsyncResult.scalar_one()
AsyncResult.scalar_one_or_none()
AsyncResult.scalars()
AsyncResult.t
AsyncResult.tuples()
AsyncResult.unique()
AsyncResult.yield_per()
AsyncScalarResult
AsyncScalarResult.all()
AsyncScalarResult.close()
AsyncScalarResult.closed
AsyncScalarResult.fetchall()
AsyncScalarResult.fetchmany()
AsyncScalarResult.first()
AsyncScalarResult.one()
AsyncScalarResult.one_or_none()
AsyncScalarResult.partitions()
AsyncScalarResult.unique()
AsyncScalarResult.yield_per()
AsyncMappingResult
AsyncMappingResult.all()
AsyncMappingResult.close()
AsyncMappingResult.closed
AsyncMappingResult.columns()
AsyncMappingResult.fetchall()
AsyncMappingResult.fetchmany()
AsyncMappingResult.fetchone()
AsyncMappingResult.first()
AsyncMappingResult.keys()
AsyncMappingResult.one()
AsyncMappingResult.one_or_none()
AsyncMappingResult.partitions()
AsyncMappingResult.unique()
AsyncMappingResult.yield_per()
AsyncTupleResult
- ORM 会话 API 文档
async_object_session()
async_session()
close_all_sessions()
async_sessionmaker
async_scoped_session
async_scoped_session.__call__()
async_scoped_session.__init__()
async_scoped_session.aclose()
async_scoped_session.add()
async_scoped_session.add_all()
async_scoped_session.autoflush
async_scoped_session.begin()
async_scoped_session.begin_nested()
async_scoped_session.bind
async_scoped_session.close()
async_scoped_session.close_all()
async_scoped_session.commit()
async_scoped_session.configure()
async_scoped_session.connection()
async_scoped_session.delete()
async_scoped_session.deleted
async_scoped_session.dirty
async_scoped_session.execute()
async_scoped_session.expire()
async_scoped_session.expire_all()
async_scoped_session.expunge()
async_scoped_session.expunge_all()
async_scoped_session.flush()
async_scoped_session.get()
async_scoped_session.get_bind()
async_scoped_session.get_one()
async_scoped_session.identity_key()
async_scoped_session.identity_map
async_scoped_session.info
async_scoped_session.invalidate()
async_scoped_session.is_active
async_scoped_session.is_modified()
async_scoped_session.merge()
async_scoped_session.new
async_scoped_session.no_autoflush
async_scoped_session.object_session()
async_scoped_session.refresh()
async_scoped_session.remove()
async_scoped_session.reset()
async_scoped_session.rollback()
async_scoped_session.scalar()
async_scoped_session.scalars()
async_scoped_session.session_factory
async_scoped_session.stream()
async_scoped_session.stream_scalars()
AsyncAttrs
AsyncSession
AsyncSession.sync_session_class
AsyncSession.__init__()
AsyncSession.aclose()
AsyncSession.add()
AsyncSession.add_all()
AsyncSession.autoflush
AsyncSession.begin()
AsyncSession.begin_nested()
AsyncSession.close()
AsyncSession.close_all()
AsyncSession.commit()
AsyncSession.connection()
AsyncSession.delete()
AsyncSession.deleted
AsyncSession.dirty
AsyncSession.execute()
AsyncSession.expire()
AsyncSession.expire_all()
AsyncSession.expunge()
AsyncSession.expunge_all()
AsyncSession.flush()
AsyncSession.get()
AsyncSession.get_bind()
AsyncSession.get_nested_transaction()
AsyncSession.get_one()
AsyncSession.get_transaction()
AsyncSession.identity_key()
AsyncSession.identity_map
AsyncSession.in_nested_transaction()
AsyncSession.in_transaction()
AsyncSession.info
AsyncSession.invalidate()
AsyncSession.is_active
AsyncSession.is_modified()
AsyncSession.merge()
AsyncSession.new
AsyncSession.no_autoflush
AsyncSession.object_session()
AsyncSession.refresh()
AsyncSession.reset()
AsyncSession.rollback()
AsyncSession.run_sync()
AsyncSession.scalar()
AsyncSession.scalars()
AsyncSession.stream()
AsyncSession.stream_scalars()
AsyncSession.sync_session
AsyncSessionTransaction
异步 I/O (asyncio)¶
支持 Python asyncio。支持核心和 ORM 使用,使用与 asyncio 兼容的方言。
版本 1.4 中的新功能。
警告
请阅读 Asyncio 平台安装说明 (包括 Apple M1),了解许多平台(包括 **Apple M1 架构**)的重要平台安装说明。
Asyncio 平台安装说明 (包括 Apple M1)¶
asyncio 扩展需要 Python 3。它还依赖于 greenlet 库。此依赖项在常见的机器平台上默认安装,包括
x86_64 aarch64 ppc64le amd64 win32
对于上述平台,greenlet
能够提供预编译的轮子文件。对于其他平台,**greenlet 默认不会安装**;greenlet 的当前文件列表可以在 Greenlet - 下载文件 中查看。请注意,**许多架构被省略,包括 Apple M1**。
要安装 SQLAlchemy 并确保无论使用什么平台 greenlet
依赖项都存在,可以安装 [asyncio]
setuptools 扩展,如下所示,这将包含并指示 pip
安装 greenlet
pip install sqlalchemy[asyncio]
请注意,在没有预编译的轮子文件的平台上安装 greenlet
意味着 greenlet
将从源代码构建,这需要 Python 的开发库也存在。
概要 - 核心¶
对于 Core 使用,create_async_engine()
函数创建一个 AsyncEngine
实例,然后提供传统 Engine
API 的异步版本。 AsyncEngine
通过其 AsyncEngine.connect()
和 AsyncEngine.begin()
方法提供一个 AsyncConnection
,这两个方法都提供异步上下文管理器。 然后,AsyncConnection
可以使用 AsyncConnection.execute()
方法来调用语句,以提供一个缓冲的 Result
,或使用 AsyncConnection.stream()
方法来提供一个流式服务器端 AsyncResult
>>> import asyncio
>>> from sqlalchemy import Column
>>> from sqlalchemy import MetaData
>>> from sqlalchemy import select
>>> from sqlalchemy import String
>>> from sqlalchemy import Table
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> meta = MetaData()
>>> t1 = Table("t1", meta, Column("name", String(50), primary_key=True))
>>> async def async_main() -> None:
... engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
... async with engine.begin() as conn:
... await conn.run_sync(meta.drop_all)
... await conn.run_sync(meta.create_all)
...
... await conn.execute(
... t1.insert(), [{"name": "some name 1"}, {"name": "some name 2"}]
... )
...
... async with engine.connect() as conn:
... # select a Result, which will be delivered with buffered
... # results
... result = await conn.execute(select(t1).where(t1.c.name == "some name 1"))
...
... print(result.fetchall())
...
... # for AsyncEngine created in function scope, close and
... # clean-up pooled connections
... await engine.dispose()
>>> asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE t1 (
name VARCHAR(50) NOT NULL,
PRIMARY KEY (name)
)
...
INSERT INTO t1 (name) VALUES (?)
[...] [('some name 1',), ('some name 2',)]
COMMIT
BEGIN (implicit)
SELECT t1.name
FROM t1
WHERE t1.name = ?
[...] ('some name 1',)
[('some name 1',)]
ROLLBACK
上面,AsyncConnection.run_sync()
方法可用于调用特殊的 DDL 函数,例如 MetaData.create_all()
,它们不包含可等待的钩子。
提示
建议使用 await
调用 AsyncEngine.dispose()
方法,使用 AsyncEngine
对象在一个将超出上下文并被垃圾回收的范围内,如上面示例中的 async_main
函数所示。 这确保了连接池持有的任何连接将在可等待的上下文中被正确地释放。 与使用阻塞 IO 不同,SQLAlchemy 无法在 __del__
或弱引用终结器等方法中正确释放这些连接,因为没有机会调用 await
。 当引擎超出范围时,如果未显式释放引擎,可能会导致类似于 RuntimeError: Event loop is closed
的形式的警告输出到标准输出,在垃圾回收中。
AsyncConnection
还通过 AsyncConnection.stream()
方法提供一个“流”API,该方法返回一个 AsyncResult
对象。 此结果对象使用服务器端游标并提供异步/等待 API,例如异步迭代器
async with engine.connect() as conn:
async_result = await conn.stream(select(t1))
async for row in async_result:
print("row: %s" % (row,))
概要 - ORM¶
使用 2.0 风格 查询, AsyncSession
类提供完整的 ORM 功能。
在默认使用模式下,必须特别注意避免 延迟加载 或涉及 ORM 关系和列属性的其他过期属性访问;下一节 使用 AsyncSession 时避免隐式 IO 详细说明了这一点。
警告
单个 AsyncSession
实例 **在多个并发任务中使用是不安全的**。 请参阅 使用 AsyncSession 同时执行任务 和 Session 是线程安全的吗?AsyncSession 在并发任务中共享是否安全? 了解背景。
以下示例说明了包括映射器和会话配置在内的完整示例
>>> from __future__ import annotations
>>> import asyncio
>>> import datetime
>>> from typing import List
>>> from sqlalchemy import ForeignKey
>>> from sqlalchemy import func
>>> from sqlalchemy import select
>>> from sqlalchemy.ext.asyncio import AsyncAttrs
>>> from sqlalchemy.ext.asyncio import async_sessionmaker
>>> from sqlalchemy.ext.asyncio import AsyncSession
>>> from sqlalchemy.ext.asyncio import create_async_engine
>>> from sqlalchemy.orm import DeclarativeBase
>>> from sqlalchemy.orm import Mapped
>>> from sqlalchemy.orm import mapped_column
>>> from sqlalchemy.orm import relationship
>>> from sqlalchemy.orm import selectinload
>>> class Base(AsyncAttrs, DeclarativeBase):
... pass
>>> class B(Base):
... __tablename__ = "b"
...
... id: Mapped[int] = mapped_column(primary_key=True)
... a_id: Mapped[int] = mapped_column(ForeignKey("a.id"))
... data: Mapped[str]
>>> class A(Base):
... __tablename__ = "a"
...
... id: Mapped[int] = mapped_column(primary_key=True)
... data: Mapped[str]
... create_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
... bs: Mapped[List[B]] = relationship()
>>> async def insert_objects(async_session: async_sessionmaker[AsyncSession]) -> None:
... async with async_session() as session:
... async with session.begin():
... session.add_all(
... [
... A(bs=[B(data="b1"), B(data="b2")], data="a1"),
... A(bs=[], data="a2"),
... A(bs=[B(data="b3"), B(data="b4")], data="a3"),
... ]
... )
>>> async def select_and_update_objects(
... async_session: async_sessionmaker[AsyncSession],
... ) -> None:
... async with async_session() as session:
... stmt = select(A).order_by(A.id).options(selectinload(A.bs))
...
... result = await session.execute(stmt)
...
... for a in result.scalars():
... print(a, a.data)
... print(f"created at: {a.create_date}")
... for b in a.bs:
... print(b, b.data)
...
... result = await session.execute(select(A).order_by(A.id).limit(1))
...
... a1 = result.scalars().one()
...
... a1.data = "new data"
...
... await session.commit()
...
... # access attribute subsequent to commit; this is what
... # expire_on_commit=False allows
... print(a1.data)
...
... # alternatively, AsyncAttrs may be used to access any attribute
... # as an awaitable (new in 2.0.13)
... for b1 in await a1.awaitable_attrs.bs:
... print(b1, b1.data)
>>> async def async_main() -> None:
... engine = create_async_engine("sqlite+aiosqlite://", echo=True)
...
... # async_sessionmaker: a factory for new AsyncSession objects.
... # expire_on_commit - don't expire objects after transaction commit
... async_session = async_sessionmaker(engine, expire_on_commit=False)
...
... async with engine.begin() as conn:
... await conn.run_sync(Base.metadata.create_all)
...
... await insert_objects(async_session)
... await select_and_update_objects(async_session)
...
... # for AsyncEngine created in function scope, close and
... # clean-up pooled connections
... await engine.dispose()
>>> asyncio.run(async_main())
BEGIN (implicit)
...
CREATE TABLE a (
id INTEGER NOT NULL,
data VARCHAR NOT NULL,
create_date DATETIME DEFAULT (CURRENT_TIMESTAMP) NOT NULL,
PRIMARY KEY (id)
)
...
CREATE TABLE b (
id INTEGER NOT NULL,
a_id INTEGER NOT NULL,
data VARCHAR NOT NULL,
PRIMARY KEY (id),
FOREIGN KEY(a_id) REFERENCES a (id)
)
...
COMMIT
BEGIN (implicit)
INSERT INTO a (data) VALUES (?) RETURNING id, create_date
[...] ('a1',)
...
INSERT INTO b (a_id, data) VALUES (?, ?) RETURNING id
[...] (1, 'b2')
...
COMMIT
BEGIN (implicit)
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
[...] ()
SELECT b.a_id AS b_a_id, b.id AS b_id, b.data AS b_data
FROM b
WHERE b.a_id IN (?, ?, ?)
[...] (1, 2, 3)
<A object at ...> a1
created at: ...
<B object at ...> b1
<B object at ...> b2
<A object at ...> a2
created at: ...
<A object at ...> a3
created at: ...
<B object at ...> b3
<B object at ...> b4
SELECT a.id, a.data, a.create_date
FROM a ORDER BY a.id
LIMIT ? OFFSET ?
[...] (1, 0)
UPDATE a SET data=? WHERE a.id = ?
[...] ('new data', 1)
COMMIT
new data
<B object at ...> b1
<B object at ...> b2
在上面的示例中, AsyncSession
使用可选的 async_sessionmaker
助手实例化,该助手为带有固定参数集的新 AsyncSession
对象提供了一个工厂,这里包括将它与特定数据库 URL 的 AsyncEngine
关联。 然后它被传递给其他方法,这些方法可以在 Python 异步上下文管理器(即 async with:
语句)中使用,以便在块结束时自动关闭;这等效于调用 AsyncSession.close()
方法。
使用 AsyncSession 同时执行任务¶
AsyncSession
对象是一个 **可变的、有状态的对象**,它代表 **正在进行的单个、有状态的数据库事务**。 使用 asyncio 同时执行任务,例如使用 asyncio.gather()
等 API,应该为 **每个单独的任务** 使用 **单独的** AsyncSession
。
请参阅 Session 是线程安全的吗?AsyncSession 在并发任务中共享是否安全?,了解关于 Session
和 AsyncSession
如何在并发工作负载中使用的一般说明。
使用 AsyncSession 时避免隐式 IO¶
使用传统的 asyncio,应用程序需要避免任何可能发生 IO-on-attribute 访问的点。 可以使用以下技术来帮助实现这一点,其中许多技术在前面的示例中进行了说明。
延迟加载关系、延迟列或表达式,或在过期情况下被访问的属性可以利用
AsyncAttrs
混合。 此混合,当添加到特定类或更普遍地添加到声明性Base
超类时,提供一个访问器AsyncAttrs.awaitable_attrs
,它将任何属性作为可等待对象提供from __future__ import annotations from typing import List from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" # ... rest of mapping ... bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" # ... rest of mapping ...
当未使用预加载时,在
A
的新加载实例上访问A.bs
集合通常会使用 延迟加载,为了成功,通常会向数据库发出 IO,这将在 asyncio 下失败,因为不允许隐式 IO。 为了在 asyncio 下直接访问此属性,而无需任何先前的加载操作,可以通过指示AsyncAttrs.awaitable_attrs
前缀将属性作为可等待对象访问a1 = (await session.scalars(select(A))).one() for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs
混合类提供了一个简洁的外部接口,用于访问内部方法,该方法也被AsyncSession.run_sync()
方法使用。新添加于版本 2.0.13。
另请参见
可以使用 SQLAlchemy 2.0 中的 只写关系 功能将集合替换为 **只写集合**,这些集合永远不会隐式发出 IO。使用此功能,集合永远不会读取,只能使用显式 SQL 调用进行查询。有关在 asyncio 中使用只写集合的示例,请参阅 Asyncio 集成 部分中的示例
async_orm_writeonly.py
。当使用只写集合时,程序的行为在集合方面是简单且易于预测的。然而,缺点是,没有内置系统用于一次加载许多这样的集合,这需要手动执行。因此,以下许多要点都针对在 asyncio 中使用传统延迟加载关系时的特定技术,这需要更多小心。
如果不使用
AsyncAttrs
,关系可以使用lazy="raise"
声明,这样默认情况下它们不会尝试发出 SQL。为了加载集合,将使用 预加载。最有用的预加载策略是
selectinload()
预加载器,它在前面的示例中使用,以便在await session.execute()
调用范围内预加载A.bs
集合。stmt = select(A).options(selectinload(A.bs))
在构造新对象时,**集合始终被分配一个默认的空集合**,例如上面示例中的列表。
A(bs=[], data="a2")
这允许在上面
A
对象上的.bs
集合在A
对象被刷新时存在且可读;否则,当A
被刷新时,.bs
将被卸载,并在访问时引发错误。AsyncSession
使用Session.expire_on_commit
设置为 False 进行配置,以便我们可以在调用AsyncSession.commit()
后访问对象上的属性,例如最后一行中我们访问属性的代码。# create AsyncSession with expire_on_commit=False async_session = AsyncSession(engine, expire_on_commit=False) # sessionmaker version async_session = async_sessionmaker(engine, expire_on_commit=False) async with async_session() as session: result = await session.execute(select(A).order_by(A.id)) a1 = result.scalars().first() # commit would normally expire all attributes await session.commit() # access attribute subsequent to commit; this is what # expire_on_commit=False allows print(a1.data)
其他指南包括
应该避免使用
AsyncSession.expire()
,而应该使用AsyncSession.refresh()
;**如果**绝对需要过期。过期通常**不**需要,因为当使用 asyncio 时,Session.expire_on_commit
通常应该设置为False
。可以使用
AsyncSession.refresh()
**显式加载** asyncio 下的延迟加载关系,**如果**将所需的属性名称显式传递给Session.refresh.attribute_names
,例如# assume a_obj is an A that has lazy loaded A.bs collection a_obj = await async_session.get(A, [1]) # force the collection to load by naming it in attribute_names await async_session.refresh(a_obj, ["bs"]) # collection is present print(f"bs collection: {a_obj.bs}")
当然,最好事先使用预加载,以便在无需延迟加载的情况下设置集合。
新添加于版本 2.0.4: 添加了对
AsyncSession.refresh()
和底层Session.refresh()
方法的支持,以强制加载延迟加载的关系(如果它们在Session.refresh.attribute_names
参数中被显式命名)。在以前的版本中,即使在参数中命名,关系也会被静默地跳过。避免使用在 级联 中记录的
all
级联选项,而应该显式列出所需的级联功能。all
级联选项意味着除其他之外还意味着 refresh-expire 设置,这意味着AsyncSession.refresh()
方法将使相关对象上的属性过期,但不一定刷新这些相关对象(假设在relationship()
中未配置预加载),使它们处于过期状态。除了上面提到的
relationship()
结构之外,还应该为deferred()
列(如果使用)采用适当的加载器选项。有关延迟加载列的背景信息,请参阅 使用列延迟加载限制加载的列。
在 动态关系加载器 中描述的“动态”关系加载器策略默认情况下与 asyncio 方法不兼容。它只能在
AsyncSession.run_sync()
方法(在 在 asyncio 下运行同步方法和函数 中描述)中调用,或使用其.statement
属性获取一个普通的 select。user = await session.get(User, 42) addresses = (await session.scalars(user.addresses.statement)).all() stmt = user.addresses.statement.where(Address.email_address.startswith("patrick")) addresses_filter = (await session.scalars(stmt)).all()
在 SQLAlchemy 2.0 版本中引入的 只写 技术与 asyncio 完全兼容,应该优先使用。
另请参见
“动态”关系加载器被“只写”取代 - 有关迁移到 2.0 风格的说明
如果在不支持 RETURNING 的数据库(例如 MySQL 8)中使用 asyncio,除非使用
Mapper.eager_defaults
选项,否则新刷新的对象上将无法使用服务器默认值(例如生成的 timestamps)。在 SQLAlchemy 2.0 中,此行为会自动应用于像 PostgreSQL、SQLite 和 MariaDB 这样的后端,这些后端使用 RETURNING 在插入行时获取新值。
在 asyncio 下运行同步方法和函数¶
深层 Alchemy
这种方法本质上是在公开 SQLAlchemy 能够提供 asyncio 接口的机制。虽然在技术上没有问题,但总体来说,这种方法可能被认为是“有争议的”,因为它违背了 asyncio 编程模型的一些核心理念,即任何可能导致 IO 被调用的编程语句**必须**具有 await
调用,否则程序不会明确地表明 IO 可能会在每一行发生。这种方法并没有改变这个基本思想,只是它允许将一系列同步 IO 指令在函数调用的范围内免除此规则,本质上是将其捆绑成一个可等待的单个单元。
作为在 asyncio 事件循环中集成传统 SQLAlchemy“延迟加载”的另一种方式,提供了一个名为 AsyncSession.run_sync()
的可选方法,该方法将在 greenlet 中运行任何 Python 函数,其中传统的同步编程概念将被转换为在到达数据库驱动程序时使用 await
。一个假设的方法是,面向 asyncio 的应用程序可以将与数据库相关的代码打包到函数中,这些函数使用 AsyncSession.run_sync()
调用。
修改上面的示例,如果我们没有为 A.bs
集合使用 selectinload()
,我们可以在一个单独的函数中完成对这些属性访问的处理。
import asyncio
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
def fetch_and_update_objects(session):
"""run traditional sync-style ORM code in a function that will be
invoked within an awaitable.
"""
# the session object here is a traditional ORM Session.
# all features are available here including legacy Query use.
stmt = select(A)
result = session.execute(stmt)
for a1 in result.scalars():
print(a1)
# lazy loads
for b1 in a1.bs:
print(b1)
# legacy Query use
a1 = session.query(A).order_by(A.id).first()
a1.data = "new data"
async def async_main():
engine = create_async_engine(
"postgresql+asyncpg://scott:tiger@localhost/test",
echo=True,
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
await conn.run_sync(Base.metadata.create_all)
async with AsyncSession(engine) as session:
async with session.begin():
session.add_all(
[
A(bs=[B(), B()], data="a1"),
A(bs=[B()], data="a2"),
A(bs=[B(), B()], data="a3"),
]
)
await session.run_sync(fetch_and_update_objects)
await session.commit()
# for AsyncEngine created in function scope, close and
# clean-up pooled connections
await engine.dispose()
asyncio.run(async_main())
在“同步”运行器中运行某些函数的方法与在基于事件的编程库(如 gevent
)之上运行 SQLAlchemy 应用程序的应用程序有一些相似之处。差异如下:
与使用
gevent
不同,我们可以继续使用标准 Python asyncio 事件循环,或任何自定义事件循环,而无需集成到gevent
事件循环中。没有任何“猴子补丁”。上面的示例使用了一个真正的 asyncio 驱动程序,底层的 SQLAlchemy 连接池也使用 Python 内置的
asyncio.Queue
来池化连接。程序可以在异步/等待代码和使用同步代码的包含函数之间自由切换,几乎没有性能损失。没有“线程执行程序”或任何其他等待者或同步在使用中。
底层的网络驱动程序也使用纯 Python asyncio 概念,没有像
gevent
和eventlet
提供的第三方网络库在使用中。
使用 asyncio 扩展的事件¶
SQLAlchemy 事件系统 未被 asyncio 扩展直接公开,这意味着还没有 SQLAlchemy 事件处理程序的“异步”版本。
但是,由于 asyncio 扩展围绕着通常的同步 SQLAlchemy API,因此常规的“同步”样式事件处理程序可以像未使用 asyncio 时一样自由使用。
如下所述,目前有两种针对面向 asyncio 的 API 注册事件的策略。
事件可以在实例级别(例如,特定
AsyncEngine
实例)注册,方法是将事件与引用代理对象的sync
属性相关联。例如,要针对AsyncEngine
实例注册PoolEvents.connect()
事件,请使用其AsyncEngine.sync_engine
属性作为目标。目标包括:要以类级别注册事件,针对相同类型的所有实例(例如,所有
AsyncSession
实例),请使用相应的同步样式类。例如,要针对AsyncSession
类注册SessionEvents.before_commit()
事件,请使用Session
类作为目标。要在
sessionmaker
级别注册,请将显式sessionmaker
与async_sessionmaker
相结合,使用async_sessionmaker.sync_session_class
,并将事件与sessionmaker
关联。
在 asyncio 上下文中运行的事件处理程序中,对象(如 Connection
)将继续以它们通常的“同步”方式工作,无需使用 await
或 async
;当消息最终由 asyncio 数据库适配器接收时,调用方式会透明地转换回 asyncio 调用方式。对于传递 DBAPI 级连接的事件,例如 PoolEvents.connect()
,该对象是一个符合 pep-249 的“连接”对象,它将同步样式调用转换为 asyncio 驱动程序。
使用异步引擎/会话/会话制造商的事件侦听器示例¶
以下是一些与面向异步的 API 结构相关的同步样式事件处理程序的示例。
异步引擎上的核心事件
在此示例中,我们将访问
AsyncEngine.sync_engine
属性作为ConnectionEvents
和PoolEvents
的目标。import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") # connect event on instance of Engine @event.listens_for(engine.sync_engine, "connect") def my_on_connect(dbapi_con, connection_record): print("New DBAPI connection:", dbapi_con) cursor = dbapi_con.cursor() # sync style API use for adapted DBAPI connection / cursor cursor.execute("select 'execute from event'") print(cursor.fetchone()[0]) # before_execute event on all Engine instances @event.listens_for(Engine, "before_execute") def my_before_execute( conn, clauseelement, multiparams, params, execution_options, ): print("before execute!") async def go(): async with engine.connect() as conn: await conn.execute(text("select 1")) await engine.dispose() asyncio.run(go())
输出
New DBAPI connection: <AdaptedConnection <asyncpg.connection.Connection object at 0x7f33f9b16960>> execute from event before execute!
异步会话上的 ORM 事件
在此示例中,我们将访问
AsyncSession.sync_session
作为SessionEvents
的目标。import asyncio from sqlalchemy import event from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.orm import Session engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost:5432/test") session = AsyncSession(engine) # before_commit event on instance of Session @event.listens_for(session.sync_session, "before_commit") def my_before_commit(session): print("before commit!") # sync style API use on Session connection = session.connection() # sync style API use on Connection result = connection.execute(text("select 'execute from event'")) print(result.first()) # after_commit event on all Session instances @event.listens_for(Session, "after_commit") def my_after_commit(session): print("after commit!") async def go(): await session.execute(text("select 1")) await session.commit() await session.close() await engine.dispose() asyncio.run(go())
输出
before commit! execute from event after commit!
异步会话制造商上的 ORM 事件
对于此用例,我们将
sessionmaker
作为事件目标,然后使用async_sessionmaker.sync_session_class
参数将其分配给async_sessionmaker
。import asyncio from sqlalchemy import event from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import sessionmaker sync_maker = sessionmaker() maker = async_sessionmaker(sync_session_class=sync_maker) @event.listens_for(sync_maker, "before_commit") def before_commit(session): print("before commit") async def main(): async_session = maker() await async_session.commit() asyncio.run(main())
输出
before commit
在连接池和其他事件中使用仅可等待的驱动程序方法¶
如上一节所述,围绕 PoolEvents
事件处理程序的事件处理程序接收同步样式的“DBAPI”连接,这是 SQLAlchemy asyncio 方言提供的包装器对象,用于将底层的 asyncio“驱动程序”连接适配为 SQLAlchemy 内部可以使用的一个连接。当用户定义的事件处理程序实现需要直接使用最终的“驱动程序”连接时,就会出现一个特殊的用例,在该驱动程序连接上使用仅可等待的方法。一个这样的例子是 asyncpg 驱动程序提供的 .set_type_codec()
方法。
为了适应这种用例,SQLAlchemy 的 AdaptedConnection
类提供了一个方法 AdaptedConnection.run_async()
,该方法允许在事件处理程序或其他 SQLAlchemy 内部的“同步”上下文中调用可等待的函数。此方法与允许同步样式方法在异步下运行的 AsyncConnection.run_sync()
方法完全类似。
AdaptedConnection.run_async()
应该传递一个函数,该函数接受最内层的“驱动程序”连接作为单个参数,并返回一个可等待对象,该对象将由 AdaptedConnection.run_async()
方法调用。给定函数本身不需要声明为 async
;它可以是 Python lambda:
,因为返回的可等待值将在返回后被调用。
from sqlalchemy import event
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(...)
@event.listens_for(engine.sync_engine, "connect")
def register_custom_types(dbapi_connection, *args):
dbapi_connection.run_async(
lambda connection: connection.set_type_codec(
"MyCustomType",
encoder,
decoder, # ...
)
)
在上面,传递给 register_custom_types
事件处理程序的对象是 AdaptedConnection
的实例,它提供了一个类似 DBAPI 的接口,用于访问底层的仅异步驱动程序级连接对象。 AdaptedConnection.run_async()
方法然后提供对可等待环境的访问,在该环境中,可以对底层的驱动程序级连接进行操作。
版本 1.4.30 中新增。
使用多个 asyncio 事件循环¶
在使用默认池实现时,使用多个事件循环(例如,在将 asyncio 与多线程结合使用的不常见情况下)的应用程序不应将相同的 AsyncEngine
与不同的事件循环共享。
如果将 AsyncEngine
从一个事件循环传递到另一个事件循环,则应在将其重新用于新的事件循环之前调用方法 AsyncEngine.dispose()
。如果未这样做,可能会导致 RuntimeError
,例如 Task <Task pending ...> got Future attached to a different loop
如果必须在不同的循环之间共享相同的引擎,则应将其配置为使用 NullPool
禁用池,以防止引擎多次使用任何连接。
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.pool import NullPool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/dbname",
poolclass=NullPool,
)
使用 asyncio 范围会话¶
在使用 scoped_session
对象的线程化 SQLAlchemy 中使用的“范围会话”模式也适用于 asyncio,使用一个称为 async_scoped_session
的调整版本。
提示
SQLAlchemy 通常不建议对新开发使用“范围”模式,因为它依赖于可变的全局状态,并且在线程或任务中的工作完成后,必须显式地拆除这些状态。尤其是在使用 asyncio 时,最好将 AsyncSession
直接传递给需要它的可等待函数。
在使用 async_scoped_session
时,由于 asyncio 上下文中没有“线程本地”的概念,因此必须将“scopefunc”参数提供给构造函数。下面的示例说明了为此目的使用 asyncio.current_task()
函数。
from asyncio import current_task
from sqlalchemy.ext.asyncio import (
async_scoped_session,
async_sessionmaker,
)
async_session_factory = async_sessionmaker(
some_async_engine,
expire_on_commit=False,
)
AsyncScopedSession = async_scoped_session(
async_session_factory,
scopefunc=current_task,
)
some_async_session = AsyncScopedSession()
警告
async_scoped_session
使用的“scopefunc”在任务中被调用**任意多次**,每次底层的 AsyncSession
被访问时都会被调用一次。因此,该函数应该**幂等**并且轻量级,并且不应尝试创建或修改任何状态,例如建立回调等。
警告
在范围内使用 current_task()
作为“键”要求从最外层的可等待函数中调用 async_scoped_session.remove()
方法,以确保在任务完成后从注册表中删除该键,否则任务句柄以及 AsyncSession
将保留在内存中,本质上会造成内存泄漏。请参见以下示例,其中说明了 async_scoped_session.remove()
的正确使用方法。
async_scoped_session
包含与 scoped_session
相似的**代理行为**,这意味着它可以直接作为 AsyncSession
来对待,请记住,通常的 await
关键字是必需的,包括 async_scoped_session.remove()
方法。
async def some_function(some_async_session, some_object):
# use the AsyncSession directly
some_async_session.add(some_object)
# use the AsyncSession via the context-local proxy
await AsyncScopedSession.commit()
# "remove" the current proxied AsyncSession for the local context
await AsyncScopedSession.remove()
版本 1.4.19 中新增。
使用 Inspector 检查模式对象¶
SQLAlchemy 尚未提供 Inspector
的 asyncio 版本(在 使用 Inspector 进行细粒度反射 中介绍),但是可以通过利用 AsyncConnection.run_sync()
方法在 asyncio 上下文中使用现有接口 AsyncConnection
。
import asyncio
from sqlalchemy import inspect
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine("postgresql+asyncpg://scott:tiger@localhost/test")
def use_inspector(conn):
inspector = inspect(conn)
# use the inspector
print(inspector.get_view_names())
# return any value to the caller
return inspector.get_table_names()
async def async_main():
async with engine.connect() as conn:
tables = await conn.run_sync(use_inspector)
asyncio.run(async_main())
引擎 API 文档¶
对象名称 | 描述 |
---|---|
async_engine_from_config(configuration[, prefix], **kwargs) |
使用配置字典创建一个新的 AsyncEngine 实例。 |
create_async_engine(url, **kw) |
创建一个新的异步引擎实例。 |
create_async_pool_from_url(url, **kwargs) |
创建一个新的异步引擎实例。 |
- function sqlalchemy.ext.asyncio.create_async_engine(url: str | URL, **kw: Any) → AsyncEngine¶
创建一个新的异步引擎实例。
传递给
create_async_engine()
的参数与传递给create_engine()
函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,例如 asyncpg。版本 1.4 中的新功能。
- 参数:
async_creator¶ –
一个异步可调用函数,它返回一个驱动程序级的 asyncio 连接。如果给定,该函数不应接受任何参数,并应从底层的 asyncio 数据库驱动程序返回一个新的 asyncio 连接;该连接将被包装在适当的结构中以与
AsyncEngine
一起使用。请注意,URL 中指定的参数不会在此处应用,并且创建者函数应使用它自己的连接参数。此参数是
create_engine.creator
的 asyncio 等效参数,它是在create_engine()
函数中使用的。版本 2.0.16 中新增。
- function sqlalchemy.ext.asyncio.async_engine_from_config(configuration: Dict[str, Any], prefix: str = 'sqlalchemy.', **kwargs: Any) → AsyncEngine¶
使用配置字典创建一个新的 AsyncEngine 实例。
此函数类似于 SQLAlchemy Core 中的
engine_from_config()
函数,只是它要求的方言必须是与 asyncio 兼容的方言,比如 asyncpg。此函数的参数签名与engine_from_config()
函数相同。版本 1.4.29 中新增。
- function sqlalchemy.ext.asyncio.create_async_pool_from_url(url: str | URL, **kwargs: Any) → Pool¶
创建一个新的异步引擎实例。
传递给
create_async_pool_from_url()
的参数与传递给create_pool_from_url()
函数的参数基本相同。指定的方言必须是与 asyncio 兼容的方言,比如 asyncpg。版本 2.0.10 中新增。
- class sqlalchemy.ext.asyncio.AsyncEngine¶
-
AsyncEngine
是使用create_async_engine()
函数获取的。from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname")
版本 1.4 中的新功能。
成员
begin(), clear_compiled_cache(), connect(), dialect, dispose(), driver, echo, engine, execution_options(), get_execution_options(), name, pool, raw_connection(), sync_engine, update_execution_options(), url
类签名
class
sqlalchemy.ext.asyncio.AsyncEngine
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.AsyncConnectable
)-
method
sqlalchemy.ext.asyncio.AsyncEngine.
begin() → AsyncIterator[AsyncConnection]¶ 返回一个上下文管理器,当进入时将提供一个带有已建立的
AsyncTransaction
的AsyncConnection
。例如:
async with async_engine.begin() as conn: await conn.execute( text("insert into table (x, y, z) values (1, 2, 3)") ) await conn.execute(text("my_special_procedure(5)"))
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
clear_compiled_cache() → None¶ 清除与方言相关的编译缓存。
为了
AsyncEngine
类代理Engine
类。这 **仅** 适用于通过
create_engine.query_cache_size
参数建立的内置缓存。它不会影响通过Connection.execution_options.compiled_cache
参数传递的任何字典缓存。版本 1.4 中的新功能。
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
connect() → AsyncConnection¶ 返回一个
AsyncConnection
对象。当
AsyncConnection
作为异步上下文管理器进入时,它将从底层连接池中获取数据库连接。async with async_engine.connect() as conn: result = await conn.execute(select(user_table))
也可以通过调用
AsyncConnection.start()
方法,在上下文管理器之外启动AsyncConnection
。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
dialect¶ 用于
AsyncEngine
类代表Engine.dialect
属性的代理。
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
async dispose(close: bool = True) → None¶ 释放此
AsyncEngine
使用的连接池。- 参数:
close¶ –
如果保留其默认值
True
,则具有完全关闭所有 **当前已检入** 数据库连接的效果。仍然签出的连接 **不会** 被关闭,但是它们将不再与此Engine
相关联,因此,当它们被单独关闭时,最终它们关联的Pool
将被垃圾回收,并且它们将被完全关闭,如果在检入时尚未关闭。如果设置为
False
,则先前连接池将被取消引用,否则不会以任何方式触碰。
另请参见
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
driver¶ 此
Engine
使用的Dialect
的驱动程序名称。为了
AsyncEngine
类代理Engine
类。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
echo¶ 当
True
时,启用此元素的日志输出。为了
AsyncEngine
类代理Engine
类。这将影响为此元素的类的命名空间和对象引用设置 Python 日志级别。布尔值
True
表示将为记录器设置日志级别logging.INFO
,而字符串值debug
将将日志级别设置为logging.DEBUG
。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
engine¶ 返回此
Engine
。为了
AsyncEngine
类代理Engine
类。用于接受
Connection
/Engine
对象的旧方案。
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
execution_options(**opt: Any) → AsyncEngine¶ 返回一个新的
AsyncEngine
,它将提供具有给定执行选项的AsyncConnection
对象。从
Engine.execution_options()
代理。有关详细信息,请参见该方法。
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
get_execution_options() → _ExecuteOptions¶ 获取执行期间生效的非 SQL 选项。
为了
AsyncEngine
类代理Engine
类。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
name¶ 此
Engine
使用的Dialect
的字符串名称。为了
AsyncEngine
类代理Engine
类。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
pool¶ 用于
AsyncEngine
类代表Engine.pool
属性的代理。
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
async raw_connection() → PoolProxiedConnection¶ 从连接池返回“原始”DBAPI 连接。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
sync_engine: Engine¶ 对该
AsyncEngine
代理请求的同步样式Engine
的引用。此实例可用作事件目标。
另请参见
-
method
sqlalchemy.ext.asyncio.AsyncEngine.
update_execution_options(**opt: Any) → None¶ 更新此
Engine
的默认 execution_options 字典。为了
AsyncEngine
类代理Engine
类。**opt 中给定的键/值将添加到将用于所有连接的默认执行选项中。此字典的初始内容可以通过
execution_options
参数发送到create_engine()
。
-
attribute
sqlalchemy.ext.asyncio.AsyncEngine.
url¶ 用于
AsyncEngine
类代表Engine.url
属性的代理。
-
method
- class sqlalchemy.ext.asyncio.AsyncConnection¶
-
AsyncConnection
是使用AsyncEngine.connect()
方法从AsyncEngine
获取的。from sqlalchemy.ext.asyncio import create_async_engine engine = create_async_engine("postgresql+asyncpg://user:pass@host/dbname") async with engine.connect() as conn: result = await conn.execute(select(table))
版本 1.4 中的新功能。
成员
aclose()、begin()、begin_nested()、close()、closed、commit()、connection、default_isolation_level、dialect、exec_driver_sql()、execute()、execution_options()、get_nested_transaction()、get_raw_connection()、get_transaction()、in_nested_transaction()、in_transaction()、info、invalidate()、invalidated、rollback()、run_sync()、scalar()、scalars()、start()、stream()、stream_scalars()、sync_connection、sync_engine
类签名
class
sqlalchemy.ext.asyncio.AsyncConnection
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.base.StartableContext
,sqlalchemy.ext.asyncio.AsyncConnectable
)-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async aclose() → None¶ -
AsyncConnection.aclose()
的名称是为了专门支持 Python 标准库的@contextlib.aclosing
上下文管理器函数。新版本 2.0.20 中添加。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
begin() → AsyncTransaction¶ 在自动开始之前开始事务。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
begin_nested() → AsyncTransaction¶ 开始嵌套事务并返回事务句柄。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async close() → None¶ 关闭此
AsyncConnection
。这将使事务回滚,前提是事务已处于活动状态。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
closed¶ 如果此连接已关闭,则返回 True。
代表
AsyncConnection
类为Connection
类代理。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async commit() → None¶ 提交当前正在进行的事务。
如果已启动事务,此方法会提交当前事务。如果未启动任何事务,则该方法不执行任何操作,前提是连接处于非无效状态。
每当首次执行语句或调用
Connection.begin()
方法时,都会在Connection
上自动开始事务。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
connection¶ 异步未实现;调用
AsyncConnection.get_raw_connection()
。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
default_isolation_level¶ 与使用的
Dialect
关联的初始连接时隔离级别。代表
AsyncConnection
类为Connection
类代理。此值与
Connection.execution_options.isolation_level
和Engine.execution_options.isolation_level
执行选项无关,并且由Dialect
在创建第一个连接时确定,通过在发出任何其他命令之前对数据库执行 SQL 查询以获取当前隔离级别。调用此访问器不会调用任何新的 SQL 查询。
另请参见
Connection.get_isolation_level()
- 查看当前实际隔离级别create_engine.isolation_level
- 设置每个Engine
的隔离级别Connection.execution_options.isolation_level
- 设置每个Connection
的隔离级别
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
dialect¶ 代表
AsyncConnection
类为Connection.dialect
属性代理。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async exec_driver_sql(statement: str, parameters: _DBAPIAnyExecuteParams | None = None, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]¶ 执行驱动级别的 SQL 字符串并返回缓冲的
Result
。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async execute(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → CursorResult[Any]¶ 执行 SQL 语句构造并返回缓冲的
Result
。- 参数:
object¶ –
要执行的语句。这始终是一个同时处于
ClauseElement
和Executable
层次结构中的对象,包括DDL
和从ExecutableDDLElement
继承的对象
parameters¶ – 将绑定到语句的参数。这可以是参数名称到值的字典,也可以是可变序列(例如列表)字典。当传递字典列表时,底层语句执行将使用 DBAPI
cursor.executemany()
方法。当传递单个字典时,将使用 DBAPIcursor.execute()
方法。execution_options¶ – 可选的执行选项字典,它将与语句执行相关联。此字典可以提供
Connection.execution_options()
接受的选项的子集。
- 返回值:
一个
Result
对象。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async execution_options(**opt: Any) → AsyncConnection¶ 为连接设置非 SQL 选项,这些选项在执行期间生效。
这将返回此
AsyncConnection
对象,其中添加了新选项。有关此方法的完整详细信息,请参阅
Connection.execution_options()
。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
get_nested_transaction() → AsyncTransaction | None¶ 返回一个
AsyncTransaction
,表示当前嵌套(保存点)事务(如果有)。这使用底层同步连接的
Connection.get_nested_transaction()
方法获取当前Transaction
,然后在新的AsyncTransaction
对象中进行代理。版本 1.4.0b2 中的新增功能。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async get_raw_connection() → PoolProxiedConnection¶ 返回此
AsyncConnection
使用的池化的 DBAPI 级连接。这是一个 SQLAlchemy 连接池代理的连接,然后具有属性
_ConnectionFairy.driver_connection
,该属性引用实际的驱动程序连接。它的_ConnectionFairy.dbapi_connection
反而引用一个AdaptedConnection
实例,该实例将驱动程序连接适配到 DBAPI 协议。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
get_transaction() → AsyncTransaction | None¶ 返回一个
AsyncTransaction
对象,代表当前的事务(如果有)。此方法使用底层同步连接的
Connection.get_transaction()
方法获取当前的Transaction
对象,然后将其代理到新的AsyncTransaction
对象中。版本 1.4.0b2 中的新增功能。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
in_nested_transaction() → bool¶ 如果事务正在进行,则返回 True。
版本 1.4.0b2 中的新增功能。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
in_transaction() → bool¶ 如果事务正在进行,则返回 True。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
info¶ 返回底层
Connection
对象的Connection.info
字典。此字典是自由可写的,用户可以将自定义状态与数据库连接相关联。
此属性仅在
AsyncConnection
对象处于连接状态时可用。如果AsyncConnection.closed
属性为True
,则访问此属性将引发ResourceClosedError
错误。版本 1.4.0b2 中的新增功能。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async invalidate(exception: BaseException | None = None) → None¶ 使与该
Connection
对象关联的底层 DBAPI 连接失效。有关此方法的详细信息,请参阅
Connection.invalidate()
方法。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
invalidated¶ 如果该连接已失效,则返回 True。
代表
AsyncConnection
类为Connection
类代理。这并不表示该连接是否在池级别失效,但是
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async rollback() → None¶ 回滚当前正在进行的事务。
此方法回滚当前的事务(如果已经启动)。如果没有启动事务,则该方法没有效果。如果已经启动事务并且连接处于失效状态,则使用此方法清除事务。
每当首次执行语句或调用
Connection.begin()
方法时,都会在Connection
上自动开始事务。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.engine.base.Connection, ~_P]], ~sqlalchemy.ext.asyncio.engine._T], *arg: ~typing.~_P, **kw: ~typing.~_P) → _T¶ 调用给定的同步(即非异步)可调用对象,并将同步风格的
Connection
对象作为第一个参数传递。此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。
例如:
def do_something_with_core(conn: Connection, arg1: int, arg2: str) -> str: '''A synchronous function that does not require awaiting :param conn: a Core SQLAlchemy Connection, used synchronously :return: an optional return value is supported ''' conn.execute( some_table.insert().values(int_col=arg1, str_col=arg2) ) return "success" async def do_something_async(async_engine: AsyncEngine) -> None: '''an async function that uses awaiting''' async with async_engine.begin() as async_conn: # run do_something_with_core() with a sync-style # Connection, proxied into an awaitable return_code = await async_conn.run_sync(do_something_with_core, 5, "strval") print(return_code)
此方法通过在专门的 instrument greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。
AsyncConnection.run_sync()
的最基本用法是调用像MetaData.create_all()
这样的方法,给定一个AsyncConnection
对象,该对象需要作为Connection
对象提供给MetaData.create_all()
方法。# run metadata.create_all(conn) with a sync-style Connection, # proxied into an awaitable with async_engine.begin() as conn: await conn.run_sync(metadata.create_all)
注意
提供的可调用对象在 asyncio 事件循环中内联调用,并且将在传统的 IO 调用中阻塞。 此可调用对象中的 IO 应该只调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适配到 greenlet 上下文中。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async scalar(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → Any¶ 执行一个 SQL 语句构造,并返回一个标量对象。
此方法是调用
Result.scalar()
方法后的简写形式,并在调用Connection.execute()
方法之前。参数相同。- 返回值:
表示返回的第一行第一列的标量 Python 值。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async scalars(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → ScalarResult[Any]¶ 执行 SQL 语句构造并返回标量对象。
此方法是调用
Result.scalars()
方法后的简写形式,并在调用Connection.execute()
方法之前。参数相同。- 返回值:
一个
ScalarResult
对象。
版本 1.4.24 中新增。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
async start(is_ctxmanager: bool = False) → AsyncConnection¶ 在使用 Python
with:
块之外,启动此AsyncConnection
对象的上下文。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
stream(statement: Executable, parameters: _CoreAnyExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncResult[Any]]¶ 执行语句并返回一个可等待的,它会生成一个
AsyncResult
对象。例如:
result = await conn.stream(stmt): async for row in result: print(f"{row}")
The
AsyncConnection.stream()
方法支持针对AsyncResult
对象使用可选的上下文管理器,如async with conn.stream(stmt) as result: async for row in result: print(f"{row}")
在上述模式中,即使迭代器因异常抛出而中断,
AsyncResult.close()
方法也会无条件地调用。但是,上下文管理器使用仍然是可选的,该函数可以在async with fn():
或await fn()
样式中调用。版本 2.0.0b3 中新增: 添加了上下文管理器支持
- 返回值:
一个可等待的对象,它将生成一个
AsyncResult
对象。
-
method
sqlalchemy.ext.asyncio.AsyncConnection.
stream_scalars(statement: Executable, parameters: _CoreSingleExecuteParams | None = None, *, execution_options: CoreExecuteOptionsParameter | None = None) → AsyncIterator[AsyncScalarResult[Any]]¶ 执行语句并返回一个可等待的,它会生成一个
AsyncScalarResult
对象。例如:
result = await conn.stream_scalars(stmt) async for scalar in result: print(f"{scalar}")
此方法是调用
AsyncResult.scalars()
方法后的简写形式,并在调用Connection.stream()
方法之前。参数相同。The
AsyncConnection.stream_scalars()
方法支持针对AsyncScalarResult
对象使用可选的上下文管理器,如async with conn.stream_scalars(stmt) as result: async for scalar in result: print(f"{scalar}")
在上述模式中,即使迭代器因异常抛出而中断,
AsyncScalarResult.close()
方法也会无条件地调用。但是,上下文管理器使用仍然是可选的,该函数可以在async with fn():
或await fn()
样式中调用。版本 2.0.0b3 中新增: 添加了上下文管理器支持
- 返回值:
一个可等待的对象,它将生成一个
AsyncScalarResult
对象。
版本 1.4.24 中新增。
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
sync_connection: Connection | None¶ 此
AsyncConnection
代理请求所指向的同步风格Connection
的引用。此实例可用作事件目标。
另请参见
-
attribute
sqlalchemy.ext.asyncio.AsyncConnection.
sync_engine: Engine¶ 此
AsyncConnection
通过其底层Connection
与之关联的同步风格Engine
的引用。此实例可用作事件目标。
另请参见
-
method
- class sqlalchemy.ext.asyncio.AsyncTransaction¶
-
成员
类签名
class
sqlalchemy.ext.asyncio.AsyncTransaction
(sqlalchemy.ext.asyncio.base.ProxyComparable
,sqlalchemy.ext.asyncio.base.StartableContext
)-
method
sqlalchemy.ext.asyncio.AsyncTransaction.
async close() → None¶ 关闭此
AsyncTransaction
。如果此事务是 begin/commit 嵌套中的基本事务,则事务将回滚()。否则,该方法将返回。
此用于取消事务,而不会影响封闭事务的范围。
-
method
sqlalchemy.ext.asyncio.AsyncTransaction.
async commit() → None¶ 提交此
AsyncTransaction
。
-
method
sqlalchemy.ext.asyncio.AsyncTransaction.
async rollback() → None¶ 回滚此
AsyncTransaction
。
-
method
sqlalchemy.ext.asyncio.AsyncTransaction.
async start(is_ctxmanager: bool = False) → AsyncTransaction¶ 在不使用 Python
with:
块的情况下启动此AsyncTransaction
对象的上下文。
-
method
结果集 API 文档¶
该 AsyncResult
对象是 Result
对象的异步自适应版本。仅在使用 AsyncConnection.stream()
或 AsyncSession.stream()
方法时返回,这些方法返回一个位于活动数据库游标之上的结果对象。
对象名称 | 描述 |
---|---|
用于 |
|
|
|
用于 |
|
一个 |
- class sqlalchemy.ext.asyncio.AsyncResult¶
Result
对象的 asyncio 包装器。该
AsyncResult
仅适用于使用服务器端游标的语句执行。它仅从AsyncConnection.stream()
和AsyncSession.stream()
方法返回。注意
与
Result
一样,此对象用于AsyncSession.execute()
返回的 ORM 结果,它可以单独或在元组状行中产生 ORM 映射对象的实例。请注意,这些结果对象不会像传统Query
对象那样自动对实例或行进行重复数据删除。对于实例或行的 Python 内重复数据删除,请使用AsyncResult.unique()
修饰符方法。版本 1.4 中的新功能。
成员
all(), close(), closed, columns(), fetchall(), fetchmany(), fetchone(), first(), freeze(), keys(), mappings(), one(), one_or_none(), partitions(), scalar(), scalar_one(), scalar_one_or_none(), scalars(), t, tuples(), unique(), yield_per()
类签名
class
sqlalchemy.ext.asyncio.AsyncResult
(sqlalchemy.engine._WithKeys
,sqlalchemy.ext.asyncio.AsyncCommon
)-
method
sqlalchemy.ext.asyncio.AsyncResult.
async all() → Sequence[Row[_TP]]¶ 返回所有行,以列表形式。
调用后关闭结果集。后续调用将返回空列表。
- 返回值:
一个
Row
对象列表。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async close() → None¶ 继承自
AsyncCommon.close()
方法AsyncCommon
关闭此结果。
-
attribute
sqlalchemy.ext.asyncio.AsyncResult.
closed¶ 继承自
AsyncCommon.closed
属性AsyncCommon
代理底层结果对象的 .closed 属性,如果存在,否则引发
AttributeError
。新版 2.0.0b3。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
columns(*col_expressions: _KeyIndexType) → Self¶ 建立应在每行中返回的列。
请参考同步 SQLAlchemy API 中的
Result.columns()
以获取完整的行为描述。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async fetchall() → Sequence[Row[_TP]]¶ 是
AsyncResult.all()
方法的同义词。新版 2.0。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async fetchmany(size: int | None = None) → Sequence[Row[_TP]]¶ 获取多行。
当所有行都已耗尽时,返回空列表。
此方法为了向后兼容 SQLAlchemy 1.x.x 版本提供。
要按组获取行,请使用
AsyncResult.partitions()
方法。- 返回值:
一个
Row
对象列表。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async fetchone() → Row[_TP] | None¶ 获取一行。
当所有行都已耗尽时,返回 None。
此方法为了向后兼容 SQLAlchemy 1.x.x 版本提供。
要仅获取结果的第一行,请使用
AsyncResult.first()
方法。要遍历所有行,请直接迭代AsyncResult
对象。- 返回值:
一个
Row
对象(如果未应用任何过滤器),或者None
(如果无剩余行)。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async first() → Row[_TP] | None¶ 获取第一行,或者如果不存在行,则返回
None
。关闭结果集并丢弃剩余行。
注意
此方法默认返回一行,例如元组。要返回一个单一的标量值,即第一行的第一列,请使用
AsyncResult.scalar()
方法,或者将AsyncResult.scalars()
和AsyncResult.first()
结合使用。此外,与传统 ORM
Query.first()
方法的行为不同,不会对用于生成此AsyncResult
的 SQL 查询应用限制;对于在生成行之前将结果缓冲在内存中的 DBAPI 驱动程序,所有行都将发送到 Python 进程,除了第一行以外的所有行都将被丢弃。另请参见
- 返回值:
一个
Row
对象,或者如果无剩余行,则为 None。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async freeze() → FrozenResult[_TP]¶ 返回一个可调用对象,该对象在被调用时会生成此
AsyncResult
的副本。返回的可调用对象是
FrozenResult
的实例。用于结果集缓存。当结果未被使用时,必须在结果上调用此方法,调用此方法将完全使用结果。当从缓存中检索到
FrozenResult
时,可以对其进行多次调用,每次调用都会产生一个新的Result
对象,该对象针对其存储的行集。另请参见
重新执行语句 - 在 ORM 中实现结果集缓存的示例用法。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
keys() → RMKeyView¶ 继承自
sqlalchemy.engine._WithKeys.keys
方法的sqlalchemy.engine._WithKeys
返回一个可迭代视图,该视图生成由每个
Row
表示的字符串键。这些键可以表示核心语句返回的列的标签,或者表示 ORM 执行返回的 ORM 类的名称。
该视图还可以使用 Python
in
运算符测试键包含,这将同时测试视图中表示的字符串键,以及列对象等备用键。1.4 版中的变化: 返回一个键视图对象,而不是一个普通的列表。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
mappings() → AsyncMappingResult¶ 将映射过滤器应用于返回的行,返回
AsyncMappingResult
的实例。应用此过滤器后,获取的行将返回
RowMapping
对象,而不是Row
对象。- 返回值:
一个新的
AsyncMappingResult
过滤对象,引用底层的Result
对象。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async one() → Row[_TP]¶ 返回正好一行,否则引发异常。
如果结果没有返回行,则引发
NoResultFound
,如果返回多行,则引发MultipleResultsFound
。注意
此方法默认返回一个行,例如元组。要返回正好一个标量值,即第一行的第一列,请使用
AsyncResult.scalar_one()
方法,或者结合AsyncResult.scalars()
和AsyncResult.one()
。版本 1.4 中的新功能。
- 返回值:
第一个
Row
。- 引发:
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async one_or_none() → Row[_TP] | None¶ 返回最多一个结果,否则引发异常。
如果结果没有行,则返回
None
。如果返回多行,则引发MultipleResultsFound
。版本 1.4 中的新功能。
- 返回值:
第一个
Row
或None
(如果没有行可用)。- 引发:
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async partitions(size: int | None = None) → AsyncIterator[Sequence[Row[_TP]]]¶ 遍历给定大小的行子列表。
返回异步迭代器
async def scroll_results(connection): result = await connection.stream(select(users_table)) async for partition in result.partitions(100): print("list of rows: %s" % partition)
有关完整行为描述,请参考同步 SQLAlchemy API 中的
Result.partitions()
。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async scalar() → Any¶ 获取第一行的第一列,并关闭结果集。
如果要获取的行不存在,则返回
None
。不执行任何验证以测试是否还有其他行。
调用此方法后,对象将完全关闭,例如,
CursorResult.close()
方法将被调用。- 返回值:
一个 Python 标量值,或
None
(如果没有剩余的行)。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async scalar_one() → Any¶ 返回正好一个标量结果,否则引发异常。
这等效于调用
AsyncResult.scalars()
然后调用AsyncScalarResult.one()
。
-
method
sqlalchemy.ext.asyncio.AsyncResult.
async scalar_one_or_none() → Any | None¶ 返回正好一个标量结果,或
None
。这等同于调用
AsyncResult.scalars()
,然后调用AsyncScalarResult.one_or_none()
。
-
方法
sqlalchemy.ext.asyncio.AsyncResult.
scalars(index: _KeyIndexType = 0) → AsyncScalarResult[Any]¶ 返回一个
AsyncScalarResult
过滤对象,它将返回单个元素而不是Row
对象。有关完整行为描述,请参阅同步 SQLAlchemy API 中的
Result.scalars()
。- 参数:
index¶ – 表示要从每行中获取的列的整数或行键,默认为
0
,表示第一列。- 返回值:
一个新的
AsyncScalarResult
过滤对象,它引用此AsyncResult
对象。
-
属性
sqlalchemy.ext.asyncio.AsyncResult.
t¶ 对返回的行应用“类型化元组”类型过滤器。
AsyncResult.t
属性是调用AsyncResult.tuples()
方法的同义词。新版 2.0。
-
方法
sqlalchemy.ext.asyncio.AsyncResult.
tuples() → AsyncTupleResult[_TP]¶ 对返回的行应用“类型化元组”类型过滤器。
在运行时,此方法返回与
AsyncResult
对象相同的对象,但将其注释为返回一个AsyncTupleResult
对象,该对象将向 PEP 484 类型工具表明返回的是普通类型化的Tuple
实例,而不是行。这允许类型化元组解包和Row
对象的__getitem__
访问,适用于调用语句本身包含类型信息的情况。新版 2.0。
- 返回值:
类型时
AsyncTupleResult
类型。
-
方法
sqlalchemy.ext.asyncio.AsyncResult.
unique(strategy: _UniqueFilterType | None = None) → Self¶ 对此
AsyncResult
返回的对象应用唯一过滤。有关完整行为描述,请参阅同步 SQLAlchemy API 中的
Result.unique()
。
-
方法
sqlalchemy.ext.asyncio.AsyncResult.
yield_per(num: int) → Self¶ -
将行获取策略配置为一次获取
num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的直接调用。有关用法说明,请参阅该方法的文档。版本 1.4.40 新版: - 添加了
FilterResult.yield_per()
,以便该方法在所有结果集实现中都可用
-
method
- 类 sqlalchemy.ext.asyncio.AsyncScalarResult¶
用于
AsyncResult
的包装器,它返回标量值而不是Row
值。AsyncScalarResult
对象是通过调用AsyncResult.scalars()
方法获取的。有关完整行为描述,请参阅同步 SQLAlchemy API 中的
ScalarResult
对象。版本 1.4 中的新功能。
成员
all(), close(), closed, fetchall(), fetchmany(), first(), one(), one_or_none(), partitions(), unique(), yield_per()
类签名
类
sqlalchemy.ext.asyncio.AsyncScalarResult
(sqlalchemy.ext.asyncio.AsyncCommon
)-
方法
sqlalchemy.ext.asyncio.AsyncScalarResult.
async all() → Sequence[_R]¶ 将所有标量值作为列表返回。
等同于
AsyncResult.all()
,不同之处在于返回的是标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async close() → None¶ 继承自
AsyncCommon.close()
方法AsyncCommon
关闭此结果。
-
attribute
sqlalchemy.ext.asyncio.AsyncScalarResult.
closed¶ 继承自
AsyncCommon.closed
属性AsyncCommon
代理底层结果对象的 .closed 属性,如果存在,否则引发
AttributeError
。新版 2.0.0b3。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async fetchall() → Sequence[_R]¶ 它是
AsyncScalarResult.all()
方法的同义词。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async fetchmany(size: int | None = None) → Sequence[_R]¶ 获取多个对象。
等同于
AsyncResult.fetchmany()
,但返回标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async first() → _R | None¶ 获取第一个对象,如果不存在任何对象,则返回
None
。等同于
AsyncResult.first()
,但返回标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async one() → _R¶ 返回唯一的一个对象,否则抛出异常。
等同于
AsyncResult.one()
,但返回标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async one_or_none() → _R | None¶ 返回最多一个对象,否则抛出异常。
等同于
AsyncResult.one_or_none()
,但返回标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
async partitions(size: int | None = None) → AsyncIterator[Sequence[_R]]¶ 迭代给定大小的元素子列表。
等同于
AsyncResult.partitions()
,但返回标量值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
unique(strategy: _UniqueFilterType | None = None) → Self¶ 对由此
AsyncScalarResult
返回的对象应用唯一筛选。有关使用方法的详细信息,请参阅
AsyncResult.unique()
。
-
method
sqlalchemy.ext.asyncio.AsyncScalarResult.
yield_per(num: int) → Self¶ -
将行获取策略配置为一次获取
num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的直接调用。有关用法说明,请参阅该方法的文档。版本 1.4.40 新版: - 添加了
FilterResult.yield_per()
,以便该方法在所有结果集实现中都可用
-
方法
- class sqlalchemy.ext.asyncio.AsyncMappingResult¶
用于
AsyncResult
的包装器,它返回字典值而不是Row
值。调用
AsyncResult.mappings()
方法会获取AsyncMappingResult
对象。有关完整行为描述,请参阅同步 SQLAlchemy API 中的
MappingResult
对象。版本 1.4 中的新功能。
成员
all(),close(),closed,columns(),fetchall(),fetchmany(),fetchone(),first(),keys(),one(),one_or_none(),partitions(),unique(),yield_per()
类签名
class
sqlalchemy.ext.asyncio.AsyncMappingResult
(sqlalchemy.engine._WithKeys
,sqlalchemy.ext.asyncio.AsyncCommon
)-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async all() → Sequence[RowMapping]¶ 返回所有行,以列表形式。
等同于
AsyncResult.all()
,除了返回的是RowMapping
值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async close() → None¶ 继承自
AsyncCommon.close()
方法AsyncCommon
关闭此结果。
-
attribute
sqlalchemy.ext.asyncio.AsyncMappingResult.
closed¶ 继承自
AsyncCommon.closed
属性AsyncCommon
代理底层结果对象的 .closed 属性,如果存在,否则引发
AttributeError
。新版 2.0.0b3。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
columns(*col_expressions: _KeyIndexType) → Self¶ 建立应在每行中返回的列。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchall() → Sequence[RowMapping]¶ 是
AsyncMappingResult.all()
方法的同义词。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchmany(size: int | None = None) → Sequence[RowMapping]¶ 获取多行。
等同于
AsyncResult.fetchmany()
,除了返回的是RowMapping
值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async fetchone() → RowMapping | None¶ 获取一个对象。
等同于
AsyncResult.fetchone()
,除了返回的是RowMapping
值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async first() → RowMapping | None¶ 获取第一个对象,如果不存在任何对象,则返回
None
。等同于
AsyncResult.first()
,除了返回的是RowMapping
值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
keys() → RMKeyView¶ 继承自
sqlalchemy.engine._WithKeys.keys
方法的sqlalchemy.engine._WithKeys
返回一个可迭代视图,该视图生成由每个
Row
表示的字符串键。这些键可以表示核心语句返回的列的标签,或者表示 ORM 执行返回的 ORM 类的名称。
该视图还可以使用 Python
in
运算符测试键包含,这将同时测试视图中表示的字符串键,以及列对象等备用键。1.4 版中的变化: 返回一个键视图对象,而不是一个普通的列表。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async one() → RowMapping¶ 返回唯一的一个对象,否则抛出异常。
等同于
AsyncResult.one()
,除了返回的是RowMapping
值,而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async one_or_none() → RowMapping | None¶ 返回最多一个对象,否则抛出异常。
等效于
AsyncResult.one_or_none()
,除了返回的是RowMapping
值而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
async partitions(size: int | None = None) → AsyncIterator[Sequence[RowMapping]]¶ 迭代给定大小的元素子列表。
等效于
AsyncResult.partitions()
,除了返回的是RowMapping
值而不是Row
对象。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
unique(strategy: _UniqueFilterType | None = None) → Self¶ 对这个
AsyncMappingResult
返回的对象应用唯一过滤。有关使用方法的详细信息,请参阅
AsyncResult.unique()
。
-
method
sqlalchemy.ext.asyncio.AsyncMappingResult.
yield_per(num: int) → Self¶ -
将行获取策略配置为一次获取
num
行。FilterResult.yield_per()
方法是Result.yield_per()
方法的直接调用。有关用法说明,请参阅该方法的文档。版本 1.4.40 新版: - 添加了
FilterResult.yield_per()
,以便该方法在所有结果集实现中都可用
-
method
- class sqlalchemy.ext.asyncio.AsyncTupleResult¶
一个
AsyncResult
,其类型为返回普通 Python 元组而不是行。由于
Row
本身已经像元组一样,所以这个类只是一个类型类,运行时仍然使用普通的AsyncResult
。类签名
class
sqlalchemy.ext.asyncio.AsyncTupleResult
(sqlalchemy.ext.asyncio.AsyncCommon
,sqlalchemy.util.langhelpers.TypingOnly
)
ORM 会话 API 文档¶
对象名称 | 描述 |
---|---|
async_object_session(instance) |
返回给定实例所属的 |
提供 |
|
async_session(session) |
返回代理给定 |
一个可配置的 |
|
混合类,为所有属性提供一个可等待的访问器。 |
|
|
|
ORM |
|
关闭所有 |
- function sqlalchemy.ext.asyncio.async_object_session(instance: object) → AsyncSession | None¶
返回给定实例所属的
AsyncSession
。此函数使用同步 API 函数
object_session
来检索引用给定实例的Session
,并从中将其链接到原始的AsyncSession
。如果
AsyncSession
被垃圾回收,则返回值为None
。此功能也适用于
InstanceState.async_session
访问器。- 参数:
instance¶ – 一个 ORM 映射实例
- 返回值:
一个
AsyncSession
对象,或者None
。
版本 1.4.18 中的新增内容。
- function sqlalchemy.ext.asyncio.async_session(session: Session) → AsyncSession | None¶
返回代理给定
Session
对象的AsyncSession
,如果有的话。- 参数:
- 返回值:
一个
AsyncSession
实例,或者None
。
版本 1.4.18 中的新增内容。
- function async sqlalchemy.ext.asyncio.close_all_sessions() → None¶
关闭所有
AsyncSession
会话。版本 2.0.23 中新增。
另请参见
close_all_sessions()
- class sqlalchemy.ext.asyncio.async_sessionmaker¶
一个可配置的
AsyncSession
工厂。async_sessionmaker
工厂以与sessionmaker
工厂相同的方式工作,在调用时生成新的AsyncSession
对象,根据此处建立的配置参数创建它们。例如:
from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import async_sessionmaker async def run_some_sql(async_session: async_sessionmaker[AsyncSession]) -> None: async with async_session() as session: session.add(SomeObject(data="object")) session.add(SomeOtherObject(name="other object")) await session.commit() async def main() -> None: # an AsyncEngine, which the AsyncSession will use for connection # resources engine = create_async_engine('postgresql+asyncpg://scott:tiger@localhost/') # create a reusable factory for new AsyncSession instances async_session = async_sessionmaker(engine) await run_some_sql(async_session) await engine.dispose()
async_sessionmaker
非常有用,因此程序的不同部分可以使用预先建立的固定配置创建新的AsyncSession
对象。请注意,如果不使用async_sessionmaker
,也可以直接实例化AsyncSession
对象。版本 2.0 中新增:
async_sessionmaker
提供了一个sessionmaker
类,专门用于AsyncSession
对象,包括 pep-484 类型支持。另请参见
概要 - ORM - 展示了示例用法
sessionmaker
-sessionmaker
架构的总体概述sessionmaker
架构
打开和关闭会话 - 使用
sessionmaker
创建会话的介绍性文本。类签名
class
sqlalchemy.ext.asyncio.async_sessionmaker
(typing.Generic
)-
method
sqlalchemy.ext.asyncio.async_sessionmaker.
__call__(**local_kw: Any) → _AS¶ 使用此
async_sessionmaker
中建立的配置生成一个新的AsyncSession
对象。在 Python 中,当一个对象以与函数相同的方式被“调用”时,会调用它的
__call__
方法。AsyncSession = async_sessionmaker(async_engine, expire_on_commit=False) session = AsyncSession() # invokes sessionmaker.__call__()
-
method
sqlalchemy.ext.asyncio.async_sessionmaker.
__init__(bind: Optional[_AsyncSessionBind] = None, *, class_: Type[_AS] = <class 'sqlalchemy.ext.asyncio.session.AsyncSession'>, autoflush: bool = True, expire_on_commit: bool = True, info: Optional[_InfoType] = None, **kw: Any)¶ 构造一个新的
async_sessionmaker
。除了
class_
之外,这里的所有参数都对应于Session
直接接受的参数。有关参数的更多详细信息,请参阅AsyncSession.__init__()
文档字符串。
-
method
sqlalchemy.ext.asyncio.async_sessionmaker.
begin() → _AsyncSessionContextManager[_AS]¶ 生成一个上下文管理器,它既提供一个新的
AsyncSession
,也提供一个提交事务。例如:
async def main(): Session = async_sessionmaker(some_engine) async with Session.begin() as session: session.add(some_object) # commits transaction, closes session
-
method
sqlalchemy.ext.asyncio.async_sessionmaker.
configure(**new_kw: Any) → None¶ (重新)配置此 async_sessionmaker 的参数。
例如:
AsyncSession = async_sessionmaker(some_engine) AsyncSession.configure(bind=create_async_engine('sqlite+aiosqlite://'))
- class sqlalchemy.ext.asyncio.async_scoped_session¶
提供
AsyncSession
对象的范围管理。有关用法详细信息,请参阅 使用 asyncio 范围会话 部分。
版本 1.4.19 中新增。
成员
__call__(), __init__(), aclose(), add(), add_all(), autoflush, begin(), begin_nested(), bind, close(), close_all(), commit(), configure(), connection(), delete(), deleted, dirty, execute(), expire(), expire_all(), expunge(), expunge_all(), flush(), get(), get_bind(), get_one(), identity_key(), identity_map, info, invalidate(), is_active, is_modified(), merge(), new, no_autoflush, object_session(), refresh(), remove(), reset(), rollback(), scalar(), scalars(), session_factory, stream(), stream_scalars()
类签名
class
sqlalchemy.ext.asyncio.async_scoped_session
(typing.Generic
)-
method
sqlalchemy.ext.asyncio.async_scoped_session.
__call__(**kw: Any) → _AS¶ 返回当前的
AsyncSession
,如果不存在则使用scoped_session.session_factory
创建。- 参数:
**kw¶ – 关键字参数将传递给
scoped_session.session_factory
可调用对象,如果不存在现有的AsyncSession
。如果AsyncSession
存在并且传递了关键字参数,则会引发InvalidRequestError
。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
__init__(session_factory: async_sessionmaker[_AS], scopefunc: Callable[[], Any])¶ 构造一个新的
async_scoped_session
。- 参数:
session_factory¶ – 用于创建新的
AsyncSession
实例的工厂。这通常是async_sessionmaker
的实例,但并非必须。scopefunc¶ – 定义当前作用域的函数。像
asyncio.current_task
这样的函数在这里可能很有用。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async aclose() → None¶ AsyncSession.close()
的别名。代表
async_scoped_session
类,为AsyncSession
类代理。AsyncSession.aclose()
的名称专门用于支持 Python 标准库@contextlib.aclosing
上下文管理器函数。新版本 2.0.20 中添加。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
add(instance: object, _warn: bool = True) → None¶ 将一个对象放入此
Session
中。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。当对象处于 瞬态 状态并传递给
Session.add()
方法时,它们会转入 待处理 状态,直到下次刷新,此时它们会转入 持久化 状态。当对象处于 分离 状态并传递给
Session.add()
方法时,它们会直接转入 持久化 状态。如果
Session
使用的事务回滚,则传递给Session.add()
方法时处于瞬态状态的对象将被移回 瞬态 状态,并且将不再存在于此Session
中。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
add_all(instances: Iterable[object]) → None¶ 将给定的实例集合添加到此
Session
中。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。有关一般行为描述,请参阅
Session.add()
的文档。
-
属性
sqlalchemy.ext.asyncio.async_scoped_session.
autoflush¶ 代表
AsyncSession
类的Session.autoflush
属性的代理。代表
async_scoped_session
类,为AsyncSession
类代理。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
begin() → AsyncSessionTransaction¶ 返回一个
AsyncSessionTransaction
对象。代表
async_scoped_session
类,为AsyncSession
类代理。当进入
AsyncSessionTransaction
对象时,底层Session
会执行“begin”操作async with async_session.begin(): # .. ORM transaction is begun
请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是在按需基础上开始的。但是,begin 块是异步的,以适应可能执行 IO 的
SessionEvents.after_transaction_create()
事件钩子。有关 ORM begin 的一般描述,请参阅
Session.begin()
。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
begin_nested() → AsyncSessionTransaction¶ 返回一个
AsyncSessionTransaction
对象,该对象将开始一个“嵌套”事务,例如 SAVEPOINT。代表
async_scoped_session
类,为AsyncSession
类代理。行为与
AsyncSession.begin()
相同。有关 ORM begin nested 的一般描述,请参阅
Session.begin_nested()
。另请参见
可序列化隔离/保存点/事务性 DDL (asyncio 版本) - SQLite asyncio 驱动程序需要特殊的解决方法才能使 SAVEPOINT 正确工作。
-
属性
sqlalchemy.ext.asyncio.async_scoped_session.
bind¶ 代表
async_scoped_session
类的AsyncSession.bind
属性的代理。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
async close() → None¶ 关闭此
AsyncSession
使用的事务资源和 ORM 对象。代表
async_scoped_session
类,为AsyncSession
类代理。
-
async classmethod
sqlalchemy.ext.asyncio.async_scoped_session.
close_all() → None¶ 关闭所有
AsyncSession
会话。代表
async_scoped_session
类,为AsyncSession
类代理。自版本 2.0 起弃用:
AsyncSession.close_all()
方法已弃用,将在未来版本中删除。请参阅close_all_sessions()
。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
async commit() → None¶ 提交当前进行的事务。
代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.commit()
- “commit” 的主要文档
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
configure(**kwargs: Any) → None¶ 重新配置此
scoped_session
使用的sessionmaker
。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
async connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) → AsyncConnection¶ 返回对应于此
Session
对象的事务状态的AsyncConnection
对象。代表
async_scoped_session
类,为AsyncSession
类代理。此方法也可用于为当前事务使用的数据库连接建立执行选项。
版本 1.4.24 中的新内容: 添加了 **kw 参数,这些参数会传递给底层的
Session.connection()
方法。另请参见
Session.connection()
- “连接” 的主要文档
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
async delete(instance: object) → None¶ 将实例标记为已删除。
代表
async_scoped_session
类,为AsyncSession
类代理。数据库删除操作在
flush()
时发生。由于此操作可能需要沿着未加载的关系级联,因此它是可等待的,以允许这些查询发生。
另请参见
Session.delete()
- 删除的主要文档
-
属性
sqlalchemy.ext.asyncio.async_scoped_session.
deleted¶ 此
Session
中标记为“已删除”的所有实例的集合。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。
-
属性
sqlalchemy.ext.asyncio.async_scoped_session.
dirty¶ 所有被认为是脏的持久实例的集合。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。例如:
some_mapped_object in session.dirty
当实例被修改但未被删除时,它们被认为是脏的。
请注意,这种“脏”计算是“乐观”的;大多数属性设置或集合修改操作会将实例标记为“脏”并将其放入此集合中,即使属性的值没有净变化。在冲洗时,每个属性的值与其之前保存的值进行比较,如果不存在净变化,则不会发生 SQL 操作(这是一个更昂贵的操作,因此只在冲洗时执行)。
要检查实例是否对其属性具有可操作的净变化,请使用
Session.is_modified()
方法。
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
async execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → Result[Any]¶ 执行语句并返回一个缓冲的
Result
对象。代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.execute()
- execute 的主要文档
-
方法
sqlalchemy.ext.asyncio.async_scoped_session.
expire(instance: object, attribute_names: Iterable[str] | None = None) → None¶ 使实例上的属性失效。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。将实例的属性标记为已过期。下次访问过期的属性时,将向
Session
对象的当前事务上下文发出查询,以加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与在同一事务中先前读取的值相同的值,而不管该事务外部的数据库状态是否发生变化。要使
Session
中的所有对象同时失效,请使用Session.expire_all()
。Session
对象的默认行为是在调用Session.rollback()
或Session.commit()
方法时使所有状态失效,以便为新事务加载新状态。 因此,调用Session.expire()
仅在当前事务中发出非 ORM SQL 语句的特定情况下才有意义。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
expire_all() → None¶ 使此 Session 中的所有持久实例失效。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。当持久实例上的任何属性在下一次访问时,将使用
Session
对象的当前事务上下文发出查询,以便加载给定实例的所有失效属性。 请注意,高度隔离的事务将返回与在该事务中先前读取的值相同的值,无论该事务之外的数据库状态发生任何变化。要使失效单个对象和这些对象上的单个属性,请使用
Session.expire()
。Session
对象的默认行为是在调用Session.rollback()
或Session.commit()
方法时使所有状态失效,以便为新事务加载新状态。 因此,通常不需要调用Session.expire_all()
,假设事务是隔离的。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
expunge(instance: object) → None¶ 从此
Session
中删除 instance。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。这将释放对该实例的所有内部引用。 级联将根据expunge级联规则应用。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
expunge_all() → None¶ 从此
Session
中删除所有对象实例。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。这等效于对此
Session
中的所有对象调用expunge(obj)
。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async flush(objects: Sequence[Any] | None = None) → None¶ 将所有对象更改刷新到数据库中。
代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.flush()
- 刷新主文档
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) → _O | None¶ 返回基于给定主键标识符的实例,如果未找到,则返回
None
。代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.get()
- 获取主文档
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) → Engine | Connection¶ 返回同步代理的
Session
绑定的“bind”。代表
async_scoped_session
类,为AsyncSession
类代理。与
Session.get_bind()
方法不同,此方法目前不用于此AsyncSession
以任何方式来解析请求的引擎。注意
此方法直接代理到
Session.get_bind()
方法,但是目前不作为覆盖目标有用,与Session.get_bind()
方法不同。以下示例说明了如何实现与AsyncSession
和AsyncEngine
一起使用的自定义Session.get_bind()
方案。在 自定义垂直分区 中介绍的模式说明了如何将自定义绑定查找方案应用于
Session
,前提是给定一组Engine
对象。要应用相应的Session.get_bind()
实现以用于AsyncSession
和AsyncEngine
对象,继续子类化Session
并将其应用于AsyncSession
使用AsyncSession.sync_session_class
。内部方法必须继续返回Engine
实例,这些实例可以从AsyncEngine
中使用AsyncEngine.sync_engine
属性获取。# using example from "Custom Vertical Partitioning" import random from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import Session # construct async engines w/ async drivers engines = { 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"), 'other':create_async_engine("sqlite+aiosqlite:///other.db"), 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"), 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"), } class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): # within get_bind(), return sync engines if mapper and issubclass(mapper.class_, MyOtherClass): return engines['other'].sync_engine elif self._flushing or isinstance(clause, (Update, Delete)): return engines['leader'].sync_engine else: return engines[ random.choice(['follower1','follower2']) ].sync_engine # apply to AsyncSession using sync_session_class AsyncSessionMaker = async_sessionmaker( sync_session_class=RoutingSession )
Session.get_bind()
方法在与 ORM 事件钩子和通过AsyncSession.run_sync()
调用的函数相同的非异步、隐式非阻塞上下文中调用,因此希望在Session.get_bind()
中运行 SQL 命令的例程可以继续使用阻塞式代码,这将在调用数据库驱动程序上的 IO 时被转换为隐式异步调用。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) → _O¶ 根据给定的主键标识符返回一个实例,如果未找到,则引发异常。
代表
async_scoped_session
类,为AsyncSession
类代理。如果查询未选择任何行,则引发
sqlalchemy.orm.exc.NoResultFound
。..versionadded: 2.0.22
另请参见
Session.get_one()
- get_one 的主要文档
-
classmethod
sqlalchemy.ext.asyncio.async_scoped_session.
identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) → _IdentityKeyType[Any]¶ 返回一个身份键。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。这是
identity_key()
的别名。
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
identity_map¶ 代表
AsyncSession
类代理Session.identity_map
属性。代表
async_scoped_session
类,为AsyncSession
类代理。
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
info¶ 一个用户可修改的字典。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。可以使用
Session
构造函数或sessionmaker
构造函数或工厂方法的info
参数填充此字典的初始值。这里的字典始终是此Session
本地的,可以独立于所有其他Session
对象进行修改。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async invalidate() → None¶ 使用连接失效关闭此 Session。
代表
async_scoped_session
类,为AsyncSession
类代理。有关完整说明,请参见
Session.invalidate()
。
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
is_active¶ 如果此
Session
不处于“部分回滚”状态,则为 True。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。“部分回滚”状态通常表示该
Session
的刷新过程已失败,并且必须发出该Session.rollback()
方法才能完全回滚事务。如果此
Session
根本不处于事务中,该Session
将在首次使用时自动开始,因此在这种情况下Session.is_active
将返回 True。否则,如果此
Session
处于事务中,并且该事务未在内部回滚,则Session.is_active
也将返回 True。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
is_modified(instance: object, include_collections: bool = True) → bool¶ 如果给定实例具有本地修改的属性,则返回
True
。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。此方法检索实例上每个被检测属性的历史记录,并将其当前值与其之前刷新或提交的值(如果有)进行比较。
它实际上是检查给定实例在
Session.dirty
集合中的更昂贵且更准确的版本;将对每个属性的净“脏”状态执行完整测试。例如:
return session.is_modified(someobject)
此方法有一些注意事项
存在于
Session.dirty
集合中的实例在使用此方法测试时可能报告False
。这是因为该对象可能通过属性变异收到了更改事件,从而将其置于Session.dirty
中,但最终该状态与从数据库加载的状态相同,导致此处没有净变化。如果在收到新值时未加载标量属性或已过期,则标量属性可能未记录先前设置的值。在这些情况下,假设该属性发生了更改,即使最终其数据库值没有净变化。在大多数情况下,SQLAlchemy 在设置事件发生时不需要“旧”值,因此如果旧值不存在,它会跳过昂贵的 SQL 调用,假设通常需要对标量值进行 UPDATE,并且在极少数情况下不需要时,比发出防御性 SELECT 更便宜。
仅当属性容器的
active_history
标志设置为True
时,才会在设置时无条件地获取“旧”值。此标志通常针对主键属性和不是简单一对多的标量对象引用设置。要为任何任意映射列设置此标志,请使用active_history
参数与column_property()
一同使用。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) → _O¶ 将给定实例的状态复制到此
AsyncSession
中的相应实例。代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.merge()
- 合并的主要文档
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
new¶ 此
Session
中标记为“新建”的所有实例的集合。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
no_autoflush¶ 返回禁用自动刷新的上下文管理器。
代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。例如:
with session.no_autoflush: some_object = SomeClass() session.add(some_object) # won't autoflush some_object.related_thing = session.query(SomeRelated).first()
在
with:
块中进行的操作将不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时很有用,其中未完成的对象不应立即刷新。
-
classmethod
sqlalchemy.ext.asyncio.async_scoped_session.
object_session(instance: object) → Session | None¶ 返回对象所属的
Session
。代表
async_scoped_session
类,为AsyncSession
类代理。代表
Session
类,作为AsyncSession
类的代理。这是
object_session()
的别名。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) → None¶ 使给定实例的属性失效并刷新。
代表
async_scoped_session
类,为AsyncSession
类代理。将向数据库发出查询,所有属性将使用它们当前的数据库值进行刷新。
这是
Session.refresh()
方法的异步版本。有关所有选项的完整说明,请参阅该方法。另请参见
Session.refresh()
- refresh 的主要文档
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async remove() → None¶ 如果存在,则处理当前的
AsyncSession
。与 scoped_session 的 remove 方法不同,此方法将使用 await 等待 AsyncSession 的 close 方法。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async reset() → None¶ 关闭此
Session
使用的事务资源和 ORM 对象,将其重置为初始状态。代表
async_scoped_session
类,为AsyncSession
类代理。版本 2.0.22 中的新增功能。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async rollback() → None¶ 回滚当前正在进行的事务。
代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.rollback()
- “rollback” 的主要文档
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → Any¶ 执行语句并返回标量结果。
代表
async_scoped_session
类,为AsyncSession
类代理。另请参见
Session.scalar()
- scalar 的主要文档
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → ScalarResult[Any]¶ 执行语句并返回标量结果。
代表
async_scoped_session
类,为AsyncSession
类代理。- 返回值:
一个
ScalarResult
对象
版本 1.4.24 中的新增功能: 添加了
AsyncSession.scalars()
版本 1.4.26 中的新增功能: 添加了
async_scoped_session.scalars()
-
attribute
sqlalchemy.ext.asyncio.async_scoped_session.
session_factory: async_sessionmaker[_AS]¶ 提供给 __init__ 的 session_factory 存储在此属性中,可以在以后访问。当需要新的非范围
AsyncSession
时,这很有用。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → AsyncResult[Any]¶ 执行语句并返回一个流式
AsyncResult
对象。代表
async_scoped_session
类,为AsyncSession
类代理。
-
method
sqlalchemy.ext.asyncio.async_scoped_session.
async stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → AsyncScalarResult[Any]¶ 执行语句并返回一个标量结果流。
代表
async_scoped_session
类,为AsyncSession
类代理。- 返回值:
一个
AsyncScalarResult
对象。
版本 1.4.24 中新增。
-
method
- class sqlalchemy.ext.asyncio.AsyncAttrs¶
混合类,为所有属性提供一个可等待的访问器。
例如:
from __future__ import annotations from typing import List from sqlalchemy import ForeignKey from sqlalchemy import func from sqlalchemy.ext.asyncio import AsyncAttrs from sqlalchemy.orm import DeclarativeBase from sqlalchemy.orm import Mapped from sqlalchemy.orm import mapped_column from sqlalchemy.orm import relationship class Base(AsyncAttrs, DeclarativeBase): pass class A(Base): __tablename__ = "a" id: Mapped[int] = mapped_column(primary_key=True) data: Mapped[str] bs: Mapped[List[B]] = relationship() class B(Base): __tablename__ = "b" id: Mapped[int] = mapped_column(primary_key=True) a_id: Mapped[int] = mapped_column(ForeignKey("a.id")) data: Mapped[str]
在上面的例子中,
AsyncAttrs
mixin 应用于声明式Base
类,它对所有子类都有效。这个 mixin 为所有类添加了一个新属性AsyncAttrs.awaitable_attrs
,它将以可等待的方式生成任何属性的值。这允许访问可能需要延迟加载或延迟/失效加载的属性,以便仍然可以发出 IO。a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() # use the lazy loader on ``a1.bs`` via the ``.awaitable_attrs`` # interface, so that it may be awaited for b1 in await a1.awaitable_attrs.bs: print(b1)
AsyncAttrs.awaitable_attrs
对属性执行一个调用,这与使用AsyncSession.run_sync()
方法大致相同,例如:for b1 in await async_session.run_sync(lambda sess: a1.bs): print(b1)
新添加于版本 2.0.13。
-
attribute
sqlalchemy.ext.asyncio.AsyncAttrs.
awaitable_attrs¶ 提供此对象上所有属性作为可等待对象的命名空间。
例如:
a1 = (await async_session.scalars(select(A).where(A.id == 5))).one() some_attribute = await a1.awaitable_attrs.some_deferred_attribute some_collection = await a1.awaitable_attrs.some_collection
-
attribute
- class sqlalchemy.ext.asyncio.AsyncSession¶
Session
的异步版本。AsyncSession
是传统Session
实例的代理。AsyncSession
**不适用于并发任务。**。有关背景信息,请参阅 Session 是否线程安全?AsyncSession 在并发任务中是否安全共享?。版本 1.4 中的新功能。
要将
AsyncSession
与自定义Session
实现一起使用,请参阅AsyncSession.sync_session_class
参数。成员
sync_session_class,__init__(),aclose(),add(),add_all(),autoflush,begin(),begin_nested(),close(),close_all(),commit(),connection(),delete(),deleted,dirty,execute(),expire(),expire_all(),expunge(),expunge_all(),flush(),get(),get_bind(),get_nested_transaction(),get_one(),get_transaction(),identity_key(),identity_map,in_nested_transaction(),in_transaction(),info,invalidate(),is_active,is_modified(),merge(),new,no_autoflush,object_session(),refresh(),reset(),rollback(),run_sync(),scalar(),scalars(),stream(),stream_scalars(),sync_session
类签名
class
sqlalchemy.ext.asyncio.AsyncSession
(sqlalchemy.ext.asyncio.base.ReversibleProxy
)-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
sync_session_class: Type[Session] = <class 'sqlalchemy.orm.session.Session'>¶ 用于特定
AsyncSession
的底层Session
实例的类或可调用对象。在类级别,此属性是
AsyncSession.sync_session_class
参数的默认值。AsyncSession
的自定义子类可以覆盖它。在实例级别,此属性表示当前用于提供此
AsyncSession
实例的Session
实例的类或可调用对象。版本 1.4.24 中新增。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
__init__(bind: _AsyncSessionBind | None = None, *, binds: Dict[_SessionBindKey, _AsyncSessionBind] | None = None, sync_session_class: Type[Session] | None = None, **kw: Any)¶ 构造一个新的
AsyncSession
。除了
sync_session_class
之外的所有参数都直接传递给sync_session_class
可调用对象以实例化一个新的Session
。有关参数文档,请参阅Session.__init__()
。- 参数:
sync_session_class¶ –
将用于构造将被代理的
Session
的Session
子类或其他可调用对象。此参数可用于提供自定义Session
子类。默认为AsyncSession.sync_session_class
类级别属性。版本 1.4.24 中新增。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
async aclose() → None¶ AsyncSession.close()
的别名。AsyncSession.aclose()
的名称专门用于支持 Python 标准库@contextlib.aclosing
上下文管理器函数。新版本 2.0.20 中添加。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
add(instance: object, _warn: bool = True) → None¶ 将一个对象放入此
Session
中。代表
Session
类,作为AsyncSession
类的代理。当对象处于 瞬态 状态并传递给
Session.add()
方法时,它们会转入 待处理 状态,直到下次刷新,此时它们会转入 持久化 状态。当对象处于 分离 状态并传递给
Session.add()
方法时,它们会直接转入 持久化 状态。如果
Session
使用的事务回滚,则传递给Session.add()
方法时处于瞬态状态的对象将被移回 瞬态 状态,并且将不再存在于此Session
中。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
add_all(instances: Iterable[object]) → None¶ 将给定的实例集合添加到此
Session
中。代表
Session
类,作为AsyncSession
类的代理。有关一般行为描述,请参阅
Session.add()
的文档。
-
属性
sqlalchemy.ext.asyncio.AsyncSession.
autoflush¶ 代表
AsyncSession
类的Session.autoflush
属性的代理。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
begin() → AsyncSessionTransaction¶ 返回一个
AsyncSessionTransaction
对象。当进入
AsyncSessionTransaction
对象时,底层Session
会执行“begin”操作async with async_session.begin(): # .. ORM transaction is begun
请注意,当会话级事务开始时,通常不会发生数据库 IO,因为数据库事务是在按需基础上开始的。但是,begin 块是异步的,以适应可能执行 IO 的
SessionEvents.after_transaction_create()
事件钩子。有关 ORM begin 的一般描述,请参阅
Session.begin()
。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
begin_nested() → AsyncSessionTransaction¶ 返回一个
AsyncSessionTransaction
对象,该对象将开始一个“嵌套”事务,例如 SAVEPOINT。行为与
AsyncSession.begin()
相同。有关 ORM begin nested 的一般描述,请参阅
Session.begin_nested()
。另请参见
可序列化隔离/保存点/事务性 DDL (asyncio 版本) - SQLite asyncio 驱动程序需要特殊的解决方法才能使 SAVEPOINT 正确工作。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 close() → None¶ 关闭此
AsyncSession
使用的事务资源和 ORM 对象。
-
异步 类方法
sqlalchemy.ext.asyncio.AsyncSession.
close_all() → None¶ 关闭所有
AsyncSession
会话。自版本 2.0 起弃用:
AsyncSession.close_all()
方法已弃用,将在未来版本中删除。请参阅close_all_sessions()
。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 commit() → None¶ 提交当前进行的事务。
另请参见
Session.commit()
- “commit” 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 connection(bind_arguments: _BindArguments | None = None, execution_options: CoreExecuteOptionsParameter | None = None, **kw: Any) → AsyncConnection¶ 返回对应于此
Session
对象的事务状态的AsyncConnection
对象。此方法也可用于为当前事务使用的数据库连接建立执行选项。
版本 1.4.24 中的新内容: 添加了 **kw 参数,这些参数会传递给底层的
Session.connection()
方法。另请参见
Session.connection()
- “连接” 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 delete(instance: object) → None¶ 将实例标记为已删除。
数据库删除操作在
flush()
时发生。由于此操作可能需要沿着未加载的关系级联,因此它是可等待的,以允许这些查询发生。
另请参见
Session.delete()
- 删除的主要文档
-
属性
sqlalchemy.ext.asyncio.AsyncSession.
deleted¶ 此
Session
中标记为“已删除”的所有实例的集合。代表
Session
类,作为AsyncSession
类的代理。
-
属性
sqlalchemy.ext.asyncio.AsyncSession.
dirty¶ 所有被认为是脏的持久实例的集合。
代表
Session
类,作为AsyncSession
类的代理。例如:
some_mapped_object in session.dirty
当实例被修改但未被删除时,它们被认为是脏的。
请注意,这种“脏”计算是“乐观”的;大多数属性设置或集合修改操作会将实例标记为“脏”并将其放入此集合中,即使属性的值没有净变化。在冲洗时,每个属性的值与其之前保存的值进行比较,如果不存在净变化,则不会发生 SQL 操作(这是一个更昂贵的操作,因此只在冲洗时执行)。
要检查实例是否对其属性具有可操作的净变化,请使用
Session.is_modified()
方法。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 execute(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → Result[Any]¶ 执行语句并返回一个缓冲的
Result
对象。另请参见
Session.execute()
- execute 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
expire(instance: object, attribute_names: Iterable[str] | None = None) → None¶ 使实例上的属性失效。
代表
Session
类,作为AsyncSession
类的代理。将实例的属性标记为已过期。下次访问过期的属性时,将向
Session
对象的当前事务上下文发出查询,以加载给定实例的所有过期属性。请注意,高度隔离的事务将返回与在同一事务中先前读取的值相同的值,而不管该事务外部的数据库状态是否发生变化。要使
Session
中的所有对象同时失效,请使用Session.expire_all()
。Session
对象的默认行为是在调用Session.rollback()
或Session.commit()
方法时使所有状态失效,以便为新事务加载新状态。 因此,调用Session.expire()
仅在当前事务中发出非 ORM SQL 语句的特定情况下才有意义。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
expire_all() → None¶ 使此 Session 中的所有持久实例失效。
代表
Session
类,作为AsyncSession
类的代理。当持久实例上的任何属性在下一次访问时,将使用
Session
对象的当前事务上下文发出查询,以便加载给定实例的所有失效属性。 请注意,高度隔离的事务将返回与在该事务中先前读取的值相同的值,无论该事务之外的数据库状态发生任何变化。要使失效单个对象和这些对象上的单个属性,请使用
Session.expire()
。Session
对象的默认行为是在调用Session.rollback()
或Session.commit()
方法时使所有状态失效,以便为新事务加载新状态。 因此,通常不需要调用Session.expire_all()
,假设事务是隔离的。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
expunge(instance: object) → None¶ 从此
Session
中删除 instance。代表
Session
类,作为AsyncSession
类的代理。这将释放对该实例的所有内部引用。 级联将根据expunge级联规则应用。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
expunge_all() → None¶ 从此
Session
中删除所有对象实例。代表
Session
类,作为AsyncSession
类的代理。这等效于对此
Session
中的所有对象调用expunge(obj)
。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 flush(objects: Sequence[Any] | None = None) → None¶ 将所有对象更改刷新到数据库中。
另请参见
Session.flush()
- 刷新主文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 get(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) → _O | None¶ 返回基于给定主键标识符的实例,如果未找到,则返回
None
。另请参见
Session.get()
- 获取主文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
get_bind(mapper: _EntityBindKey[_O] | None = None, clause: ClauseElement | None = None, bind: _SessionBind | None = None, **kw: Any) → Engine | Connection¶ 返回同步代理的
Session
绑定的“bind”。与
Session.get_bind()
方法不同,此方法目前不用于此AsyncSession
以任何方式来解析请求的引擎。注意
此方法直接代理到
Session.get_bind()
方法,但是目前不作为覆盖目标有用,与Session.get_bind()
方法不同。以下示例说明了如何实现与AsyncSession
和AsyncEngine
一起使用的自定义Session.get_bind()
方案。在 自定义垂直分区 中介绍的模式说明了如何将自定义绑定查找方案应用于
Session
,前提是给定一组Engine
对象。要应用相应的Session.get_bind()
实现以用于AsyncSession
和AsyncEngine
对象,继续子类化Session
并将其应用于AsyncSession
使用AsyncSession.sync_session_class
。内部方法必须继续返回Engine
实例,这些实例可以从AsyncEngine
中使用AsyncEngine.sync_engine
属性获取。# using example from "Custom Vertical Partitioning" import random from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker from sqlalchemy.orm import Session # construct async engines w/ async drivers engines = { 'leader':create_async_engine("sqlite+aiosqlite:///leader.db"), 'other':create_async_engine("sqlite+aiosqlite:///other.db"), 'follower1':create_async_engine("sqlite+aiosqlite:///follower1.db"), 'follower2':create_async_engine("sqlite+aiosqlite:///follower2.db"), } class RoutingSession(Session): def get_bind(self, mapper=None, clause=None, **kw): # within get_bind(), return sync engines if mapper and issubclass(mapper.class_, MyOtherClass): return engines['other'].sync_engine elif self._flushing or isinstance(clause, (Update, Delete)): return engines['leader'].sync_engine else: return engines[ random.choice(['follower1','follower2']) ].sync_engine # apply to AsyncSession using sync_session_class AsyncSessionMaker = async_sessionmaker( sync_session_class=RoutingSession )
Session.get_bind()
方法在与 ORM 事件钩子和通过AsyncSession.run_sync()
调用的函数相同的非异步、隐式非阻塞上下文中调用,因此希望在Session.get_bind()
中运行 SQL 命令的例程可以继续使用阻塞式代码,这将在调用数据库驱动程序上的 IO 时被转换为隐式异步调用。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
get_nested_transaction() → AsyncSessionTransaction | None¶ 返回当前正在进行的嵌套事务(如果有)。
- 返回值:
一个
AsyncSessionTransaction
对象,或None
。
版本 1.4.18 中的新增内容。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 get_one(entity: _EntityBindKey[_O], ident: _PKIdentityArgument, *, options: Sequence[ORMOption] | None = None, populate_existing: bool = False, with_for_update: ForUpdateParameter = None, identity_token: Any | None = None, execution_options: OrmExecuteOptionsParameter = {}) → _O¶ 根据给定的主键标识符返回一个实例,如果未找到,则引发异常。
如果查询未选择任何行,则引发
sqlalchemy.orm.exc.NoResultFound
。..versionadded: 2.0.22
另请参见
Session.get_one()
- get_one 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
get_transaction() → AsyncSessionTransaction | None¶ 返回当前正在进行的根事务(如果有)。
- 返回值:
一个
AsyncSessionTransaction
对象,或None
。
版本 1.4.18 中的新增内容。
-
classmethod
sqlalchemy.ext.asyncio.AsyncSession.
identity_key(class_: Type[Any] | None = None, ident: Any | Tuple[Any, ...] = None, *, instance: Any | None = None, row: Row[Any] | RowMapping | None = None, identity_token: Any | None = None) → _IdentityKeyType[Any]¶ 返回一个身份键。
代表
Session
类,作为AsyncSession
类的代理。这是
identity_key()
的别名。
-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
identity_map¶ 代表
AsyncSession
类代理Session.identity_map
属性。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
in_nested_transaction() → bool¶ 如果此
Session
已开始嵌套事务(例如 SAVEPOINT),则返回 True。代表
Session
类,作为AsyncSession
类的代理。版本 1.4 中的新功能。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
in_transaction() → bool¶ 如果此
Session
已开始事务,则返回 True。代表
Session
类,作为AsyncSession
类的代理。版本 1.4 中的新功能。
另请参见
-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
info¶ 一个用户可修改的字典。
代表
Session
类,作为AsyncSession
类的代理。可以使用
Session
构造函数或sessionmaker
构造函数或工厂方法的info
参数填充此字典的初始值。这里的字典始终是此Session
本地的,可以独立于所有其他Session
对象进行修改。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
async invalidate() → None¶ 使用连接失效关闭此 Session。
有关完整说明,请参见
Session.invalidate()
。
-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
is_active¶ 如果此
Session
不处于“部分回滚”状态,则为 True。代表
Session
类,作为AsyncSession
类的代理。“部分回滚”状态通常表示该
Session
的刷新过程已失败,并且必须发出该Session.rollback()
方法才能完全回滚事务。如果此
Session
根本不处于事务中,该Session
将在首次使用时自动开始,因此在这种情况下Session.is_active
将返回 True。否则,如果此
Session
处于事务中,并且该事务未在内部回滚,则Session.is_active
也将返回 True。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
is_modified(instance: object, include_collections: bool = True) → bool¶ 如果给定实例具有本地修改的属性,则返回
True
。代表
Session
类,作为AsyncSession
类的代理。此方法检索实例上每个被检测属性的历史记录,并将其当前值与其之前刷新或提交的值(如果有)进行比较。
它实际上是检查给定实例在
Session.dirty
集合中的更昂贵且更准确的版本;将对每个属性的净“脏”状态执行完整测试。例如:
return session.is_modified(someobject)
此方法有一些注意事项
存在于
Session.dirty
集合中的实例在使用此方法测试时可能报告False
。这是因为该对象可能通过属性变异收到了更改事件,从而将其置于Session.dirty
中,但最终该状态与从数据库加载的状态相同,导致此处没有净变化。如果在收到新值时未加载标量属性或已过期,则标量属性可能未记录先前设置的值。在这些情况下,假设该属性发生了更改,即使最终其数据库值没有净变化。在大多数情况下,SQLAlchemy 在设置事件发生时不需要“旧”值,因此如果旧值不存在,它会跳过昂贵的 SQL 调用,假设通常需要对标量值进行 UPDATE,并且在极少数情况下不需要时,比发出防御性 SELECT 更便宜。
仅当属性容器的
active_history
标志设置为True
时,才会在设置时无条件地获取“旧”值。此标志通常针对主键属性和不是简单一对多的标量对象引用设置。要为任何任意映射列设置此标志,请使用active_history
参数与column_property()
一同使用。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
async merge(instance: _O, *, load: bool = True, options: Sequence[ORMOption] | None = None) → _O¶ 将给定实例的状态复制到此
AsyncSession
中的相应实例。另请参见
Session.merge()
- 合并的主要文档
-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
new¶ 此
Session
中标记为“新建”的所有实例的集合。代表
Session
类,作为AsyncSession
类的代理。
-
attribute
sqlalchemy.ext.asyncio.AsyncSession.
no_autoflush¶ 返回禁用自动刷新的上下文管理器。
代表
Session
类,作为AsyncSession
类的代理。例如:
with session.no_autoflush: some_object = SomeClass() session.add(some_object) # won't autoflush some_object.related_thing = session.query(SomeRelated).first()
在
with:
块中进行的操作将不会受到查询访问时发生的刷新的影响。这在初始化涉及现有数据库查询的一系列对象时很有用,其中未完成的对象不应立即刷新。
-
classmethod
sqlalchemy.ext.asyncio.AsyncSession.
object_session(instance: object) → Session | None¶ 返回对象所属的
Session
。代表
Session
类,作为AsyncSession
类的代理。这是
object_session()
的别名。
-
method
sqlalchemy.ext.asyncio.AsyncSession.
async refresh(instance: object, attribute_names: Iterable[str] | None = None, with_for_update: ForUpdateParameter = None) → None¶ 使给定实例的属性失效并刷新。
将向数据库发出查询,所有属性将使用它们当前的数据库值进行刷新。
这是
Session.refresh()
方法的异步版本。有关所有选项的完整说明,请参阅该方法。另请参见
Session.refresh()
- refresh 的主要文档
-
method
sqlalchemy.ext.asyncio.AsyncSession.
async reset() → None¶ 关闭此
Session
使用的事务资源和 ORM 对象,将其重置为初始状态。版本 2.0.22 中的新增功能。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 rollback() → 无¶ 回滚当前正在进行的事务。
另请参见
Session.rollback()
- “rollback” 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 run_sync(fn: ~typing.Callable[[~typing.Concatenate[~sqlalchemy.orm.session.Session, ~_P]], ~sqlalchemy.ext.asyncio.session._T], *arg: ~typing.~_P, **kw: ~typing.~_P) → _T¶ 调用给定的同步(即非异步)可调用对象,并将同步样式的
Session
作为第一个参数传递。此方法允许传统的同步 SQLAlchemy 函数在 asyncio 应用程序的上下文中运行。
例如:
def some_business_method(session: Session, param: str) -> str: '''A synchronous function that does not require awaiting :param session: a SQLAlchemy Session, used synchronously :return: an optional return value is supported ''' session.add(MyObject(param=param)) session.flush() return "success" async def do_something_async(async_engine: AsyncEngine) -> None: '''an async function that uses awaiting''' with AsyncSession(async_engine) as async_session: # run some_business_method() with a sync-style # Session, proxied into an awaitable return_code = await async_session.run_sync(some_business_method, param="param1") print(return_code)
此方法通过在专门的 instrument greenlet 中运行给定的可调用对象,将 asyncio 事件循环一直维护到数据库连接。
提示
提供的可调用对象在 asyncio 事件循环中内联调用,并且将在传统的 IO 调用中阻塞。 此可调用对象中的 IO 应该只调用 SQLAlchemy 的 asyncio 数据库 API,这些 API 将被正确地适配到 greenlet 上下文中。
另请参见
AsyncAttrs
- 用于 ORM 映射类的混合类,它以更简洁的方式在每个属性的基础上提供类似的功能
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 scalar(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → Any¶ 执行语句并返回标量结果。
另请参见
Session.scalar()
- scalar 的主要文档
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → ScalarResult[Any]¶ 执行语句并返回标量结果。
- 返回值:
一个
ScalarResult
对象
版本 1.4.24 中的新增功能: 添加了
AsyncSession.scalars()
版本 1.4.26 中的新增功能: 添加了
async_scoped_session.scalars()
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 stream(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → AsyncResult[Any]¶ 执行语句并返回一个流式
AsyncResult
对象。
-
方法
sqlalchemy.ext.asyncio.AsyncSession.
异步 stream_scalars(statement: Executable, params: _CoreAnyExecuteParams | None = None, *, execution_options: OrmExecuteOptionsParameter = {}, bind_arguments: _BindArguments | None = None, **kw: Any) → AsyncScalarResult[Any]¶ 执行语句并返回一个标量结果流。
- 返回值:
一个
AsyncScalarResult
对象。
版本 1.4.24 中新增。
-
属性
sqlalchemy.ext.asyncio.AsyncSession.
sync_session: Session¶ 对底层
Session
的引用,此AsyncSession
将请求代理到该对象。此实例可用作事件目标。
另请参见
-
attribute
- 类 sqlalchemy.ext.asyncio.AsyncSessionTransaction¶
ORM
SessionTransaction
对象的包装器。提供此对象是为了允许为
AsyncSession.begin()
返回一个持有事务的对象。此对象支持对
AsyncSessionTransaction.commit()
和AsyncSessionTransaction.rollback()
的显式调用,以及用作异步上下文管理器。版本 1.4 中的新功能。
类签名
class
sqlalchemy.ext.asyncio.AsyncSessionTransaction
(sqlalchemy.ext.asyncio.base.ReversibleProxy
,sqlalchemy.ext.asyncio.base.StartableContext
)-
method
sqlalchemy.ext.asyncio.AsyncSessionTransaction.
async commit() → None¶ 提交此
AsyncTransaction
。
-
method
sqlalchemy.ext.asyncio.AsyncSessionTransaction.
async rollback() → None¶ 回滚此
AsyncTransaction
。
-
method
flambé! 龙和炼金术士 图像设计由 Rotem Yaari 创建并慷慨捐赠。
使用 Sphinx 7.2.6 创建。文档最后生成时间:2024 年 11 月 8 日星期五上午 08:41:19 EST