ORM 示例

SQLAlchemy 发行版包含各种代码示例,说明了一组精选的模式,其中一些是典型的,一些则不是那么典型。所有示例都可运行,并且可以在发行版的 /examples 目录中找到。所有示例的描述和源代码都可以在此处找到。

可在维基上找到其他 SQLAlchemy 示例,其中一些是用户贡献的:https://sqlalchemy.org.cn/trac/wiki/UsageRecipes.

映射配方

邻接表

使用邻接表模型映射字典的字典结构的示例。

例如:

node = TreeNode('rootnode')
node.append('node1')
node.append('node3')
session.add(node)
session.commit()

dump_tree(node)

文件列表

关联

说明“关联对象”模式用法的示例,其中一个中间类调解两个以多对多模式关联的类的关系。

文件列表

  • proxied_association.py - 与 basic_association 相同的示例,添加了 sqlalchemy.ext.associationproxy 的用法,使对 OrderItem 的显式引用成为可选的。

  • basic_association.py - 说明“订单”与一组“商品”对象之间的多对多关系,通过名为“OrderItem”的关联对象将每个商品的购买价格关联起来。

  • dict_of_sets_with_default.py - 一个高级关联代理示例,说明关联代理的嵌套以生成多级 Python 集合,在本例中,字典包含字符串键和整数集合作为值,这些集合隐藏了底层的映射类。

异步 I/O 集成

说明 SQLAlchemy 的异步引擎特性的示例。

文件列表

  • async_orm.py - 说明 sqlalchemy.ext.asyncio.AsyncSession 对象在异步 ORM 使用中的用法。

  • async_orm_writeonly.py - 说明使用只写关系简化 asyncio 下 ORM 集合的处理。

  • gather_orm_statements.py - 说明如何使用 asyncio.gather() 在多个异步数据库连接上并发运行多个语句,并将 ORM 结果合并到单个 AsyncSession 中。

  • basic.py - 说明异步引擎/连接接口。

  • greenlet_orm.py - 说明 sqlalchemy.ext.asyncio.AsyncSession 对象在异步 ORM 使用中的用法,包括可选的 run_sync() 方法。

有向图

有向图结构持久化的示例。该图存储为一组边,每条边都在节点表中引用一个“下级”节点和一个“上级”节点。说明了对下级和上级邻居的基本持久化和查询。

n2 = Node(2)
n5 = Node(5)
n2.add_neighbor(n5)
print(n2.higher_neighbors())

文件列表

作为字典的动态关系

说明如何在“动态”关系之上放置一个类似字典的界面,以便字典操作(假设简单的字符串键)可以在不加载整个集合的情况下对大型集合进行操作。

文件列表

泛型关联

说明将多种类型的父级与特定子级对象关联的各种方法。

所有示例都使用声明性扩展以及声明性 mixin。每个示例都在最后展示了相同的用例 - 两个类 CustomerSupplier,两者都继承自 HasAddresses mixin,该 mixin 确保父类提供一个包含 Address 对象的 addresses 集合。

discriminator_on_association.pygeneric_fk.py 脚本是 2007 年博客文章 使用 SQLAlchemy 的多态关联 中介绍的配方的现代化版本。

文件列表

  • table_per_association.py - 说明一个 mixin,该 mixin 通过为每个父类单独生成的关联表来提供泛型关联。关联对象本身保存在所有父类共享的单个表中。

  • table_per_related.py - 说明一个泛型关联,该关联将关联对象保存在单独的表中,每个表都生成以代表特定父类持久化这些对象。

  • discriminator_on_association.py - 说明一个 mixin,该 mixin 使用单个目标表和单个关联表来提供泛型关联,所有父表都引用该关联表。关联表包含一个“鉴别器”列,该列确定哪种类型的父对象与关联表中的每个特定行关联。

  • generic_fk.py - 说明所谓的“泛型外键”,类似于 Django、ROR 等流行框架中的外键。这种方法绕过了标准的参照完整性实践,因为“外键”列实际上并不受约束必须引用任何特定表;相反,应用程序内逻辑用于确定引用哪个表。

物化路径

说明使用 SQLAlchemy ORM 的层次结构数据的“物化路径”模式。

文件列表

嵌套集

说明使用 SQLAlchemy ORM 实现层次结构数据的“嵌套集”模式的基本方法。

文件列表

性能

针对各种 SQLAlchemy 用例的性能分析套件。

每个套件都侧重于具有特定性能配置文件和相关影响的特定用例。

  • 批量插入

  • 单个插入,有或无事务

  • 获取大量行

  • 运行大量短查询

所有套件都包含各种使用模式,说明了 Core 和 ORM 的用法,并且通常按性能从最差到最好排序,与 SQLAlchemy 提供的功能量成反比,从最大到最小(这两件事通常完全对应)。

在包级别提供了一个命令行工具,允许运行单个套件。

$ python -m examples.performance --help
usage: python -m examples.performance [-h] [--test TEST] [--dburl DBURL]
                                      [--num NUM] [--profile] [--dump]
                                      [--echo]

                                      {bulk_inserts,large_resultsets,single_inserts}

positional arguments:
  {bulk_inserts,large_resultsets,single_inserts}
                        suite to run

optional arguments:
  -h, --help            show this help message and exit
  --test TEST           run specific test name
  --dburl DBURL         database URL, default sqlite:///profile.db
  --num NUM             Number of iterations/items/etc for tests;
                        default is module-specific
  --profile             run profiling and dump call counts
  --dump                dump full call profile (implies --profile)
  --echo                Echo SQL output

示例运行如下所示。

$ python -m examples.performance bulk_inserts

或使用选项

$ python -m examples.performance bulk_inserts \
    --dburl mysql+mysqldb://scott:tiger@localhost/test \
    --profile --num 1000

文件列表

文件列表

  • bulk_updates.py - 这一系列测试将说明以批量方式更新大量行的不同方法(正在建设中!目前只有一个测试)。

  • large_resultsets.py - 在这一系列测试中,我们关注加载大量非常小且简单的行的时间。

  • bulk_inserts.py - 这一系列测试说明了以批量方式插入大量行的不同方法。

  • short_selects.py - 此系列测试展示了通过主键 SELECT 单条记录的不同方法

  • single_inserts.py - 在此系列测试中,我们研究了一种在独立事务中插入行,并在插入完成后返回到“关闭”状态的方法。这类似于 API 调用,它启动数据库连接,插入行,提交并关闭连接。

  • __main__.py - 允许将 examples/performance 包作为脚本运行。

使用时间运行所有测试

这是运行的默认形式

$ python -m examples.performance single_inserts
Tests to run: test_orm_commit, test_bulk_save,
              test_bulk_insert_dictionaries, test_core,
              test_core_query_caching, test_dbapi_raw_w_connect,
              test_dbapi_raw_w_pool

test_orm_commit : Individual INSERT/COMMIT pairs via the
    ORM (10000 iterations); total time 13.690218 sec
test_bulk_save : Individual INSERT/COMMIT pairs using
    the "bulk" API  (10000 iterations); total time 11.290371 sec
test_bulk_insert_dictionaries : Individual INSERT/COMMIT pairs using
    the "bulk" API with dictionaries (10000 iterations);
    total time 10.814626 sec
test_core : Individual INSERT/COMMIT pairs using Core.
    (10000 iterations); total time 9.665620 sec
test_core_query_caching : Individual INSERT/COMMIT pairs using Core
    with query caching (10000 iterations); total time 9.209010 sec
test_dbapi_raw_w_connect : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection each time (10000 iterations); total time 9.551103 sec
test_dbapi_raw_w_pool : Individual INSERT/COMMIT pairs w/ DBAPI +
    connection pool (10000 iterations); total time 8.001813 sec

转储单个测试的配置文件

可以为所有测试转储 Python 配置文件输出,或者更常见的是单个测试的输出

$ python -m examples.performance single_inserts --test test_core --num 1000 --dump
Tests to run: test_core
test_core : Individual INSERT/COMMIT pairs using Core. (1000 iterations); total fn calls 186109
         186109 function calls (186102 primitive calls) in 1.089 seconds

   Ordered by: internal time, call count

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1000    0.634    0.001    0.634    0.001 {method 'commit' of 'sqlite3.Connection' objects}
     1000    0.154    0.000    0.154    0.000 {method 'execute' of 'sqlite3.Cursor' objects}
     1000    0.021    0.000    0.074    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/sql/compiler.py:1950(_get_colparams)
     1000    0.015    0.000    0.034    0.000 /Users/classic/dev/sqlalchemy/lib/sqlalchemy/engine/default.py:503(_init_compiled)
        1    0.012    0.012    1.091    1.091 examples/performance/single_inserts.py:79(test_core)

    ...

编写自己的测试套件

分析器套件系统是可扩展的,可以应用于你自己的测试集。这是一种有价值的技术,可以用于决定对一些性能关键的例程使用哪种方法。例如,如果我们要分析几种不同加载类型之间的差异,我们可以创建一个名为 test_loads.py 的文件,内容如下:

from examples.performance import Profiler
from sqlalchemy import Integer, Column, create_engine, ForeignKey
from sqlalchemy.orm import relationship, joinedload, subqueryload, Session
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()
engine = None
session = None


class Parent(Base):
    __tablename__ = 'parent'
    id = Column(Integer, primary_key=True)
    children = relationship("Child")


class Child(Base):
    __tablename__ = 'child'
    id = Column(Integer, primary_key=True)
    parent_id = Column(Integer, ForeignKey('parent.id'))


# Init with name of file, default number of items
Profiler.init("test_loads", 1000)


@Profiler.setup_once
def setup_once(dburl, echo, num):
    "setup once.  create an engine, insert fixture data"
    global engine
    engine = create_engine(dburl, echo=echo)
    Base.metadata.drop_all(engine)
    Base.metadata.create_all(engine)
    sess = Session(engine)
    sess.add_all([
        Parent(children=[Child() for j in range(100)])
        for i in range(num)
    ])
    sess.commit()


@Profiler.setup
def setup(dburl, echo, num):
    "setup per test.  create a new Session."
    global session
    session = Session(engine)
    # pre-connect so this part isn't profiled (if we choose)
    session.connection()


@Profiler.profile
def test_lazyload(n):
    "load everything, no eager loading."

    for parent in session.query(Parent):
        parent.children


@Profiler.profile
def test_joinedload(n):
    "load everything, joined eager loading."

    for parent in session.query(Parent).options(joinedload("children")):
        parent.children


@Profiler.profile
def test_subqueryload(n):
    "load everything, subquery eager loading."

    for parent in session.query(Parent).options(subqueryload("children")):
        parent.children

if __name__ == '__main__':
    Profiler.main()

我们可以直接运行新的脚本

$ python test_loads.py  --dburl postgresql+psycopg2://scott:tiger@localhost/test
Running setup once...
Tests to run: test_lazyload, test_joinedload, test_subqueryload
test_lazyload : load everything, no eager loading. (1000 iterations); total time 11.971159 sec
test_joinedload : load everything, joined eager loading. (1000 iterations); total time 2.754592 sec
test_subqueryload : load everything, subquery eager loading. (1000 iterations); total time 2.977696 sec

太空侵略者

使用 SQLite 作为状态机的太空侵略者游戏。

最初开发于 2012 年。已改编为适用于 Python 3。

在文本控制台中使用 ASCII 艺术运行。

../_images/space_invaders.jpg

要运行

python -m examples.space_invaders.space_invaders

在运行时,查看日志中的 SQL 输出

tail -f space_invaders.log

享受!

文件列表

版本化对象

使用历史表进行版本化

演示了一个扩展,该扩展为实体创建版本表,并为每次更改存储记录。给定的扩展会生成一个匿名“history”类,它表示目标对象的 historical 版本。

使用时间行进行版本化 例子进行比较,这些例子将更新写入同一个表中的新行,而不会使用单独的历史表。

用法通过一个单元测试模块 test_versioning.py 进行说明,该模块使用 SQLAlchemy 的内部 pytest 插件运行

pytest test/base/test_examples.py

示例用法的片段,使用声明式

from history_meta import Versioned, versioned_session

class Base(DeclarativeBase):
    pass

class SomeClass(Versioned, Base):
    __tablename__ = 'sometable'

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

Session = sessionmaker(bind=engine)
versioned_session(Session)

sess = Session()
sc = SomeClass(name='sc1')
sess.add(sc)
sess.commit()

sc.name = 'sc1modified'
sess.commit()

assert sc.version == 2

SomeClassHistory = SomeClass.__history_mapper__.class_

assert sess.query(SomeClassHistory).\
            filter(SomeClassHistory.version == 1).\
            all() \
            == [SomeClassHistory(version=1, name='sc1')]

Versioned 混合类设计用于与声明式一起使用。要将扩展与经典映射器一起使用,可以应用 _history_mapper 函数

from history_meta import _history_mapper

m = mapper(SomeClass, sometable)
_history_mapper(m)

SomeHistoryClass = SomeClass.__history_mapper__.class_

版本化示例还与 ORM 乐观并发功能集成,该功能在 配置版本计数器 中有记录。要启用此功能,请将标志 Versioned.use_mapper_versioning 设置为 True

class SomeClass(Versioned, Base):
    __tablename__ = 'sometable'

    use_mapper_versioning = True

    id = Column(Integer, primary_key=True)
    name = Column(String(50))

    def __eq__(self, other):
        assert type(other) is SomeClass and other.id == self.id

在上面,如果两个具有相同版本标识符的 SomeClass 实例被同时更新并发送到数据库进行 UPDATE,如果数据库隔离级别允许两个 UPDATE 语句执行,其中一个会失败,因为它不再针对最后一个已知版本标识符。

文件列表

使用时间行进行版本化

一些例子说明了拦截更改的技术,这些更改最初会被解释为对行的 UPDATE,但实际上是将其转换为新的行的 INSERT,保持先前行完好无损作为历史版本。

使用历史表进行版本化 例子进行比较,该例子将历史行写入单独的历史表。

文件列表

  • versioned_rows.py - 说明了一种拦截对象更改的方法,将对单行的 UPDATE 语句转换为 INSERT 语句,以便插入一个包含新数据的行,同时保持旧行完好无损。

  • versioned_rows_w_versionid.py - 说明了一种拦截对象更改的方法,将对单行的 UPDATE 语句转换为 INSERT 语句,以便插入一个包含新数据的行,同时保持旧行完好无损。

  • versioned_map.py - 版本化行示例的变体,建立在“垂直表”结构的概念之上,就像 垂直属性映射 示例中说明的那样。

  • versioned_update_old_row.py - 说明了与 versioned_rows.py 相同的 UPDATE 到 INSERT 技术,但也会在 **旧** 行上发出 UPDATE,以影响时间戳的更改。还包含一个 SessionEvents.do_orm_execute() 钩子,以将查询限制为仅限最新版本。

垂直属性映射

说明“垂直表”映射。

“垂直表”指的是将对象的单个属性存储为表中不同行的一种技术。“垂直表”技术用于持久化可以具有各种属性集的对象,但以牺牲简单的查询控制和简洁性为代价。它通常在内容/文档管理系统中使用,以灵活地表示用户创建的结构。

给出了该方法的两种变体。在第二种变体中,每行都引用一个“数据类型”,其中包含有关属性中存储的信息类型的详细信息,例如整数、字符串或日期。

示例

shrew = Animal(u'shrew')
shrew[u'cuteness'] = 5
shrew[u'weasel-like'] = False
shrew[u'poisonous'] = True

session.add(shrew)
session.flush()

q = (session.query(Animal).
     filter(Animal.facts.any(
       and_(AnimalFact.key == u'weasel-like',
            AnimalFact.value == True))))
print('weasel-like animals', q.all())

文件列表

继承映射方案

基本继承映射

映射类继承层次结构 中描述的单表、连接表和具体表继承的工作示例。

文件列表

  • joined.py - 连接表(每子类一个表)继承示例。

  • concrete.py - 具体表(每个类一个表)继承示例。

  • single.py - 单表(每个层次结构一个表)继承示例。

特殊 API

属性检测

说明对 SQLAlchemy 属性管理系统的修改。

文件列表

水平分片

使用 SQLAlchemy 分片 API 的基本示例。分片指的是将数据水平扩展到多个数据库。

“分片”映射的基本组件是

  • 多个 Engine 实例,每个实例都分配了一个“分片 ID”。这些 Engine 实例可以引用不同的数据库,或同一个数据库内的不同模式/帐户,甚至可以通过在使用时导致它们访问不同模式或表的选项来区分。

  • 一个可以返回单个分片 ID 的函数,该函数根据要保存的实例提供;这称为“分片选择器”。

  • 一个可以返回适用于特定实例标识符的分片 ID 列表的函数;这称为“ID 选择器”。如果它返回所有分片 ID,则会搜索所有分片。

  • 一个可以返回要尝试的分片 ID 列表的函数,该函数根据特定查询提供 (“查询选择器”)。如果它返回所有分片 ID,则会查询所有分片并将结果连接在一起。

在这些示例中,不同类型的分片用于同一个基本示例,该示例适用于基于大陆的天气数据。我们提供了示例分片选择器、ID 选择器和查询选择器函数。查询选择器说明了对 SQL 表达式元素的检查,以便尝试确定正在请求的单个分片。

构建通用分片例程是解决在多个数据库中组织实例问题的雄心勃勃的方法。对于更直白的替代方法,“独立实体”方法是一种将对象显式分配到不同表(以及潜在的数据库节点)的简单方法,在维基百科上进行了解释,网址为 EntityName

文件列表

  • separate_databases.py - 说明使用独立 SQLite 数据库进行分片。

  • separate_tables.py - 说明使用单个 SQLite 数据库进行分片,该数据库将使用命名约定拥有多个表。

  • separate_schema_translates.py - 说明使用单个数据库和多个模式进行分片,每个分片可以使用不同的“schema_translates_map”。

  • asyncio.py - 说明与 asyncio 一起使用的分片 API。

扩展 ORM

ORM 查询事件

本节将展示如何增强 ORM SELECT 行为,如 Session.execute()2.0 风格select() 使用,以及 1.x 风格Query 对象。

示例包括 with_loader_criteria() 选项和 SessionEvents.do_orm_execute() 钩子的演示。

从 SQLAlchemy 1.4 开始,Query 结构与 Select 结构统一,因此这两个对象基本相同。

文件列表

Dogpile 缓存

展示如何将 dogpile.cache 功能与 ORM 查询结合,实现完整缓存控制以及从长期缓存中提取“延迟加载”属性的能力。

本演示展示了以下技术

  • 使用 SessionEvents.do_orm_execute() 事件钩子

  • 绕过 Session.execute() 从自定义缓存源而不是数据库中提取数据的基本技术。

  • 使用 dogpile.cache 进行基本缓存,使用“区域”来全局控制一组固定的配置。

  • 使用自定义 UserDefinedOption 对象在语句对象中配置选项。

另请参阅

重新执行语句 - 包含此处介绍的技术的通用示例。

例如:

# query for Person objects, specifying cache
stmt = select(Person).options(FromCache("default"))

# specify that each Person's "addresses" collection comes from
# cache too
stmt = stmt.options(RelationshipCache(Person.addresses, "default"))

# execute and results
result = session.execute(stmt)

print(result.scalars().all())

要运行,必须安装 SQLAlchemy 和 dogpile.cache,或者它们必须位于当前 PYTHONPATH 中。演示将创建一个本地目录用于数据文件,插入初始数据,然后运行。第二次运行演示将使用已存在的数据文件,并且将发出针对两个表的 SQL 语句 - 但显示的结果将使用数十个延迟加载,这些加载都来自缓存。

演示脚本本身,按复杂程度排序,作为 Python 模块运行,以便相对导入起作用

python -m examples.dogpile_caching.helloworld

python -m examples.dogpile_caching.relationship_caching

python -m examples.dogpile_caching.advanced

python -m examples.dogpile_caching.local_session_caching

文件列表

  • environment.py - 建立数据/缓存文件路径和配置,必要时引导固定数据。

  • caching_query.py - 代表允许将 Dogpile 缓存与 SQLAlchemy 结合使用的函数和类。引入了名为 FromCache 的查询选项。

  • model.py - 数据模型,表示具有多个 Address 对象的 Person,每个 Address 对象都有 PostalCode、City 和 Country。

  • fixture_data.py - 安装一些示例数据。这里我们有几个美国/加拿大城市的邮政编码。然后,安装了 100 个 Person 记录,每个记录都随机选择了一个邮政编码。

  • helloworld.py - 展示如何加载一些数据并将结果缓存。

  • relationship_caching.py - 展示如何在关系端点上添加缓存选项,以便延迟加载从缓存中加载。

  • advanced.py - 展示 Query 与 FromCache 选项结合使用的示例,包括前端加载、缓存失效和集合缓存。

  • local_session_caching.py - 此示例创建一个新的 dogpile.cache 后端,它将数据持久化到当前会话的字典中。删除会话,缓存也将消失。