ORM 示例

SQLAlchemy 发行版包含各种代码示例,说明了一组精选的模式,其中一些是典型的,而另一些则不那么典型。所有示例都是可运行的,可以在发行版的 /examples 目录中找到。所有示例的描述和源代码都可以在这里找到。

其他 SQLAlchemy 示例(其中一些是用户贡献的)可在 wiki 上找到:https://sqlalchemy.org.cn/trac/wiki/UsageRecipes

映射技巧

邻接表

使用邻接表模型映射的字典字典结构的示例。

例如。

node = TreeNode("rootnode")
node.append("node1")
node.append("node3")
session.add(node)
session.commit()

dump_tree(node)

文件列表

关联

示例说明了“关联对象”模式的用法,其中中间类调解了以多对多模式关联的两个类之间的关系。

文件列表

  • proxied_association.py - 与 basic_association 相同的示例,添加了 sqlalchemy.ext.associationproxy 的用法,使对 OrderItem 的显式引用成为可选。

  • basic_association.py - 说明了“Order”和“Item”对象集合之间的多对多关系,通过名为“OrderItem”的关联对象将购买价格与每个对象关联起来

  • dict_of_sets_with_default.py - 一个高级关联代理示例,说明了关联代理的嵌套以生成多级 Python 集合,在本例中是一个字典,其中字符串键和整数集作为值,这些值隐藏了底层的映射类。

Asyncio 集成

示例说明了 SQLAlchemy 的 asyncio 引擎功能。

文件列表

  • async_orm.py - 说明了 sqlalchemy.ext.asyncio.AsyncSession 对象用于异步 ORM 的用法。

  • async_orm_writeonly.py - 说明了如何使用只写关系来更简单地处理 asyncio 下的 ORM 集合。

  • gather_orm_statements.py - 说明了如何使用 asyncio.gather() 沿许多 asyncio 数据库连接并发运行许多语句,并将 ORM 结果合并到单个 AsyncSession 中。

  • basic.py - 说明了 asyncio 引擎/连接接口。

  • greenlet_orm.py - 说明了 sqlalchemy.ext.asyncio.AsyncSession 对象用于异步 ORM 的用法,包括可选的 run_sync() 方法。

有向图

有向图结构的持久性示例。该图存储为边的集合,每个边都引用节点表中的“下”节点和“上”节点。说明了基本持久性和对下邻居和上邻居的查询

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())

文件列表

动态关系作为字典

说明了如何在“动态”关系之上放置类似字典的外观,以便字典操作(假设简单的字符串键)可以在大型集合上操作,而无需一次加载整个集合。

文件列表

通用关联

说明了将多种类型的父对象与特定子对象关联的各种方法。

这些示例都使用声明性扩展以及声明性 mixin。每个示例都在最后呈现了相同的用例 - 两个类,CustomerSupplier,都继承了 HasAddresses mixin,这确保了为父类提供了包含 Address 对象的 addresses 集合。

discriminator_on_association.pygeneric_fk.py 脚本是 2007 年博客文章 Polymorphic Associations with SQLAlchemy 中提出的技巧的现代化版本。

文件列表

  • table_per_association.py - 说明了一个 mixin,它通过为每个父类单独生成的关联表来提供通用关联。关联对象本身持久保存在所有父类共享的单个表中。

  • table_per_related.py - 说明了一种通用关联,它将关联对象持久保存在各个表中,每个表都是为了代表特定的父类持久化这些对象而生成的。

  • discriminator_on_association.py - 说明了一个 mixin,它使用单个目标表和单个关联表来提供通用关联,所有父表都引用该表。关联表包含一个“鉴别器”列,该列确定哪种类型的父对象与关联表中的每个特定行关联。

  • generic_fk.py - 说明了所谓的“通用外键”,其方式类似于 Django、ROR 等流行的框架。这种方法绕过了标准的引用完整性实践,因为“外键”列实际上并不约束为引用任何特定表;相反,应用程序内逻辑用于确定引用的表。

物化路径

说明了使用 SQLAlchemy ORM 进行分层数据的“物化路径”模式。

文件列表

嵌套集

说明了使用 SQLAlchemy ORM 实现分层数据的“嵌套集”模式的基本方法。

文件列表

性能

各种 SQLAlchemy 用例的性能分析套件。

每个套件都侧重于具有特定性能配置文件和相关含义的特定用例

  • 批量插入

  • 单独插入,带或不带事务

  • 获取大量行

  • 运行大量短查询

所有套件都包含各种用例模式,说明了 Core 和 ORM 的用法,并且通常按性能从最差到最佳排序,反过来基于 SQLAlchemy 提供的功能量,从最大到最小(这两件事通常完全对应)。

包级别提供了一个命令行工具,允许运行单个套件

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

positional arguments:
  {bulk_inserts,large_resultsets,single_inserts}
                        suite to run

optional arguments:
  -h, --help            show this help message and exit
  --test TEST           run specific test name
  --dburl DBURL         database URL, default sqlite:///profile.db
  --num NUM             Number of iterations/items/etc for tests;
                        default is module-specific
  --profile             run profiling and dump call counts
  --dump                dump full call profile (implies --profile)
  --echo                Echo SQL output

一个示例运行如下所示

$ python -m examples.performance bulk_inserts

或带选项

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000

文件列表

文件列表

  • bulk_updates.py - 这系列测试将说明批量 UPDATE 大量行的不同方法(正在建设中!目前只有一个测试)

  • large_resultsets.py - 在这系列测试中,我们正在查看加载大量非常小的简单行的时间。

  • bulk_inserts.py - 这系列测试说明了批量 INSERT 大量行的不同方法。

  • short_selects.py - 这系列测试说明了通过主键 SELECT 单个记录的不同方法

  • single_inserts.py - 在这系列测试中,我们正在查看一种在不同事务中插入行的方法,然后在之后返回到基本上“关闭”状态。这类似于启动数据库连接、插入行、提交并关闭的 API 调用。

  • __main__.py - 允许 examples/performance 包作为脚本运行。

定时运行所有测试

这是运行的默认形式

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec

为单个测试转储配置文件

可以为所有测试或更常见的单个测试转储 Python 配置文件输出

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...

编写您自己的套件

分析器套件系统是可扩展的,可以应用于您自己的一组测试。这是一种有价值的技术,可用于决定某些性能关键例程的正确方法。例如,如果我们想要分析几种加载方式之间的差异,我们可以创建一个文件 test_loads.py,内容如下

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = "parent"
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = "child"
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey("parent.id"))


# Init with name of file, default number of items
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "setup once.  create an engine, insert fixture data"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all(
        [
            Parent(children=[Child() for j in range(100)])
            for i in range(num)
        ]
    )
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "setup per test.  create a new Session."
    global session
    session = Session(engine)
    # pre-connect so this part isn't profiled (if we choose)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "load everything, no eager loading."

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "load everything, joined eager loading."

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "load everything, subquery eager loading."

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children


if __name__ == "__main__":
    Profiler.main()

我们可以直接运行我们的新脚本

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec

太空侵略者

使用 SQLite 作为状态机的太空侵略者游戏。

最初于 2012 年开发。改编为在 Python 3 中工作。

在文本控制台中使用 ASCII 艺术运行。

../_images/space_invaders.jpg

要运行

$ python -m examples.space_invaders.space_invaders

当它运行时,观看日志中的 SQL 输出

$ tail -f space_invaders.log

尽情享受!

文件列表

对象版本控制

使用历史表进行版本控制

说明了一个扩展,该扩展为实体创建版本表,并存储每次更改的记录。给定的扩展生成一个匿名的“历史”类,该类表示目标对象的历史版本。

使用时间行进行版本控制 示例进行比较,后者将更新作为同一表中的新行写入,而不使用单独的历史表。

通过单元测试模块 test_versioning.py 说明了用法,该模块使用 SQLAlchemy 的内部 pytest 插件运行

$ pytest test/base/test_examples.py

使用声明性的示例用法片段

from history_meta import Versioned, versioned_session


class Base(DeclarativeBase):
    pass


class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id


Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name="sc1")
sess.add(sc)
sess.commit()

sc.name = "sc1modified"
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).filter(
    SomeClassHistory.version == 1
).all() == [SomeClassHistory(version=1, name="sc1")]

Versioned mixin 旨在与声明性一起使用。要将扩展与经典映射器一起使用,可以应用 _history_mapper 函数

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_

版本控制示例还与 配置版本计数器 中记录的 ORM 乐观并发功能集成。要启用此功能,请将标志 Versioned.use_mapper_versioning 设置为 True

class SomeClass(Versioned, Base):
    __tablename__ = "sometable"

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

上面,如果两个具有相同版本标识符的 SomeClass 实例被并发更新并发送到数据库以进行 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句继续进行,则其中一个将失败,因为它不再针对上次已知的版本标识符。

文件列表

使用时间行进行版本控制

几个示例说明了拦截更改的技术,这些更改最初将被解释为对行的 UPDATE,而是将其转换为插入新行,使先前的行保持不变作为历史版本。

使用历史表进行版本控制 示例进行比较,后者将历史行写入单独的历史表。

文件列表

  • versioned_rows.py - 说明了一种拦截对象更改的方法,将对单行的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。

  • versioned_rows_w_versionid.py - 说明了一种拦截对象更改的方法,将对单行的 UPDATE 语句转换为 INSERT 语句,以便插入包含新数据的新行,同时保持旧行不变。

  • versioned_map.py - versioned_rows 示例的变体,围绕“垂直表”结构的概念构建,类似于 垂直属性映射 示例中说明的结构。

  • versioned_update_old_row.py - 说明了与 versioned_rows.py 相同的 UPDATE 到 INSERT 技术,但也对行发出 UPDATE 以影响时间戳的更改。还包括一个 SessionEvents.do_orm_execute() 钩子,用于将查询限制为仅最新版本。

垂直属性映射

说明了“垂直表”映射。

“垂直表”是指一种技术,其中对象的各个属性存储为表中的不同行。“垂直表”技术用于持久化可能具有各种属性的对象,但代价是简单的查询控制和简洁性。它通常在内容/文档管理系统中找到,以便灵活地表示用户创建的结构。

给出了该方法的两种变体。在第二种变体中,每行都引用一个“数据类型”,其中包含有关属性中存储的信息类型的信息,例如整数、字符串或日期。

示例

shrew = Animal("shrew")
shrew["cuteness"] = 5
shrew["weasel-like"] = False
shrew["poisonous"] = True

session.add(shrew)
session.flush()

q = session.query(Animal).filter(
    Animal.facts.any(
        and_(AnimalFact.key == "weasel-like", AnimalFact.value == True)
    )
)
print("weasel-like animals", q.all())

文件列表

继承映射技巧

基本继承映射

单表、连接表和具体表继承的工作示例,如 映射类继承层次结构 中所述。

文件列表

  • joined.py - 连接表(每子类表)继承示例。

  • concrete.py - 具体表(每类表)继承示例。

  • single.py - 单表(每层次结构表)继承示例。

特殊 API

属性检测

示例说明了对 SQLAlchemy 属性管理系统的修改。

文件列表

水平分片

使用 SQLAlchemy 分片 API 的基本示例。分片是指跨多个数据库水平扩展数据。

“分片”映射的基本组件是

  • 多个 Engine 实例,每个实例都分配了一个“分片 ID”。这些 Engine 实例可以引用不同的数据库,或同一数据库中的不同架构/帐户,或者它们甚至可以仅通过选项来区分,这些选项将导致它们在使用时访问不同的架构或表。

  • 一个函数,可以返回单个分片 ID,给定要保存的实例;这称为“shard_chooser”

  • 一个函数,可以返回适用于特定实例标识符的分片 ID 列表;这称为“id_chooser”。如果它返回所有分片 ID,则将搜索所有分片。

  • 一个函数,可以返回要尝试的分片 ID 列表,给定特定的 Query(“query_chooser”)。如果它返回所有分片 ID,则将查询所有分片,并将结果连接在一起。

在这些示例中,针对相同的基本示例使用了不同种类的分片,该示例适应了按大陆划分的天气数据。我们提供了示例 shard_chooser、id_chooser 和 query_chooser 函数。query_chooser 说明了对 SQL 表达式元素的检查,以便尝试确定正在请求的单个分片。

构建通用分片例程是解决在多个数据库之间组织实例问题的雄心勃勃的方法。对于更直白的替代方案,“distinct entity”方法是一种简单的方法,可以显式地将对象分配给不同的表(以及可能的数据库节点)- 在 wiki 上描述:EntityName

文件列表

  • separate_databases.py - 说明了使用不同的 SQLite 数据库进行分片。

  • separate_tables.py - 说明了使用单个 SQLite 数据库进行分片,但这将具有使用命名约定的多个表。

  • separate_schema_translates.py - 说明了使用具有多个架构的单个数据库进行分片,其中可以为每个分片使用不同的“schema_translates_map”。

  • asyncio.py - 说明了与 asyncio 一起使用的分片 API。

扩展 ORM

ORM 查询事件

技巧说明了增强 ORM SELECT 行为,如 Session.execute()2.0 风格 使用 select() 以及 1.x 风格 Query 对象。

示例包括 with_loader_criteria() 选项以及 SessionEvents.do_orm_execute() 钩子的演示。

从 SQLAlchemy 1.4 开始,Query 构造与 Select 构造统一,因此这两个对象基本上是相同的。

文件列表

Dogpile 缓存

说明了如何将 dogpile.cache 功能嵌入 ORM 查询中,从而允许完全缓存控制以及从长期缓存中提取“延迟加载”属性的能力。

在此演示中,说明了以下技术

  • 使用 SessionEvents.do_orm_execute() 事件钩子

  • 绕过 Session.execute() 的基本技术,以从自定义缓存源而不是数据库中提取数据。

  • 使用 dogpile.cache 进行基本的缓存,使用“区域”来全局控制一组固定的配置。

  • 使用自定义 UserDefinedOption 对象在语句对象中配置选项。

另请参阅

重新执行语句 - 包括此处介绍的技术的通用示例。

例如。

# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))

# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# execute and results
result = session.execute(stmt)

print(result.scalars().all())

要运行,必须安装 SQLAlchemy 和 dogpile.cache 或在当前的 PYTHONPATH 中。演示将为数据文件创建一个本地目录,插入初始数据并运行。第二次运行演示将利用已存在的缓存文件,并且将发出针对两个表的正好一个 SQL 语句 - 但是,显示的結果将利用从缓存中提取的所有数十个延迟加载。

演示脚本本身,按复杂程度排序,作为 Python 模块运行,以便相对导入有效

$ python -m examples.dogpile_caching.helloworld

$ python -m examples.dogpile_caching.relationship_caching

$ python -m examples.dogpile_caching.advanced

$ python -m examples.dogpile_caching.local_session_caching

文件列表

  • environment.py - 建立数据/缓存文件路径和配置,并在必要时引导固定装置数据。

  • caching_query.py - 表示允许 Dogpile 缓存与 SQLAlchemy 一起使用的函数和类。引入一个名为 FromCache 的查询选项。

  • model.py - 数据模型,表示具有多个 Address 对象的人,每个对象都具有 PostalCode、City、Country。

  • fixture_data.py - 安装一些示例数据。这里我们有一些美国/加拿大城市的邮政编码。然后,安装了 100 个 Person 记录,每个记录都有一个随机选择的邮政编码。

  • helloworld.py - 演示如何加载一些数据,以及缓存结果。

  • relationship_caching.py - 演示如何在关系端点上添加缓存选项,以便延迟加载从缓存加载。

  • advanced.py - 演示 Query 与 FromCache 选项结合使用的用法,包括前端加载、缓存失效和集合缓存。

  • local_session_caching.py - 此示例创建一个新的 dogpile.cache 后端,该后端将数据持久化到当前会话本地的字典中。remove() 会话后,缓存将消失。