SQLAlchemy 2.0 文档
配置关系连接方式¶
relationship()
通常会通过检查两表之间的外键关系来创建两表之间的连接,以确定应比较哪些列。在许多情况下,这种行为需要自定义。
处理多重连接路径¶
最常见的需要处理的情况之一是当两表之间存在多个外键路径时。
考虑一个 Customer
类,它包含两个指向 Address
类的外键
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address")
shipping_address = relationship("Address")
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
当我们尝试使用上述映射时,会产生错误
sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables. Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.
上述消息很长。 relationship()
可以返回许多潜在的消息,这些消息经过精心设计,可以检测各种常见的配置问题;大多数消息会建议解决歧义或其他缺失信息所需的额外配置。
在这种情况下,消息希望我们通过指示每个关系应考虑哪个外键列来限定每个 relationship()
,合适的格式如下
class Customer(Base):
__tablename__ = "customer"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))
billing_address = relationship("Address", foreign_keys=[billing_address_id])
shipping_address = relationship("Address", foreign_keys=[shipping_address_id])
上面,我们指定了 foreign_keys
参数,它是一个 Column
或 Column
对象列表,这些对象指示要被视为“外键”的列,或者换句话说,包含引用父表的值的列。从 Customer
对象加载 Customer.billing_address
关系将使用 billing_address_id
中存在的值来标识要加载的 Address
中的行;类似地,shipping_address_id
用于 shipping_address
关系。这两列的链接在持久化期间也起作用;刚刚插入的 Address
对象的新生成的 primary key 将在刷新期间被复制到关联的 Customer
对象的适当外键列中。
当使用 Declarative 指定 foreign_keys
时,我们也可以使用字符串名称来指定,但是重要的是,如果使用列表,则列表是字符串的一部分
billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")
在这个特定示例中,列表在任何情况下都不是必需的,因为我们只需要一个 Column
billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")
警告
当作为 Python 可求值的字符串传递时,relationship.foreign_keys
参数使用 Python 的 eval()
函数解释。不要将不受信任的输入传递给此字符串。有关 relationship()
参数的声明式求值的详细信息,请参阅 关系参数的求值。
指定备选连接条件¶
relationship()
在构造连接时的默认行为是将一侧的主键列的值等同于另一侧的外键引用列的值。我们可以使用 relationship.primaryjoin
参数以及在使用“二级”表时使用 relationship.secondaryjoin
参数将此标准更改为我们想要的任何内容。
在下面的示例中,使用 User
类以及存储街道地址的 Address
类,我们创建一个 boston_addresses
关系,该关系将仅加载那些指定城市为 “Boston” 的 Address
对象
from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
name = mapped_column(String)
boston_addresses = relationship(
"Address",
primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
)
class Address(Base):
__tablename__ = "address"
id = mapped_column(Integer, primary_key=True)
user_id = mapped_column(Integer, ForeignKey("user.id"))
street = mapped_column(String)
city = mapped_column(String)
state = mapped_column(String)
zip = mapped_column(String)
在此字符串 SQL 表达式中,我们使用了 and_()
连接构造来为连接条件建立两个不同的谓词 - 将 User.id
和 Address.user_id
列相互连接,并将 Address
中的行限制为仅 city='Boston'
。当使用 Declarative 时,像 and_()
这样的基本 SQL 函数在字符串 relationship()
参数的求值命名空间中自动可用。
警告
当作为 Python 可求值的字符串传递时,relationship.primaryjoin
参数使用 Python 的 eval()
函数解释。不要将不受信任的输入传递给此字符串。有关 relationship()
参数的声明式求值的详细信息,请参阅 关系参数的求值。
我们在 relationship.primaryjoin
中使用的自定义条件通常仅在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才重要。也就是说,它用于发出的 SQL 语句中,以便执行每个属性的延迟加载,或者在查询时构造连接时,例如通过 Select.join()
,或者通过 eager “joined” 或 “subquery” 加载样式。当内存中的对象被操作时,我们可以将任何我们想要的 Address
对象放入 boston_addresses
集合中,无论 .city
属性的值是什么。对象将保留在集合中,直到属性过期并从应用条件的数据库重新加载。当发生刷新时,boston_addresses
内部的对象将被无条件刷新,将主键 user.id
列的值分配到每个行的外键持有 address.user_id
列上。city
条件在这里不起作用,因为刷新过程只关心将主键值同步到引用的外键值中。
创建自定义外键条件¶
主连接条件的另一个要素是如何确定那些被认为是“外键”的列。通常,Column
对象的一些子集将指定 ForeignKey
,或者成为与连接条件相关的 ForeignKeyConstraint
的一部分。relationship()
会查看此外键状态,因为它决定了它应该如何为此关系加载和持久化数据。但是,relationship.primaryjoin
参数可用于创建不涉及任何 “schema” 级别外键的连接条件。我们可以显式地组合 relationship.primaryjoin
以及 relationship.foreign_keys
和 relationship.remote_side
,以便建立这样的连接。
下面,HostEntry
类连接到自身,将字符串 content
列等同于 ip_address
列,后者是 PostgreSQL 类型,称为 INET
。我们需要使用 cast()
以将连接的一侧转换为另一侧的类型
from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET
from sqlalchemy.orm import DeclarativeBase
class Base(DeclarativeBase):
pass
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign_keys, remote_side
parent_host = relationship(
"HostEntry",
primaryjoin=ip_address == cast(content, INET),
foreign_keys=content,
remote_side=ip_address,
)
上述关系将产生如下连接
SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)
上述语法的替代方案是在 relationship.primaryjoin
表达式中内联使用 foreign()
和 remote()
注释。此语法表示 relationship()
通常根据 relationship.foreign_keys
和 relationship.remote_side
参数自行应用于连接条件的注释。当存在显式连接条件时,这些函数可能更简洁,此外,无论该列是否多次声明或在复杂的 SQL 表达式中声明,它们都用于精确标记 “外键” 或 “远程” 列
from sqlalchemy.orm import foreign, remote
class HostEntry(Base):
__tablename__ = "host_entry"
id = mapped_column(Integer, primary_key=True)
ip_address = mapped_column(INET)
content = mapped_column(String(50))
# relationship() using explicit foreign() and remote() annotations
# in lieu of separate arguments
parent_host = relationship(
"HostEntry",
primaryjoin=remote(ip_address) == cast(foreign(content), INET),
)
在连接条件中使用自定义运算符¶
关系的另一个用例是使用自定义运算符,例如 PostgreSQL 的 “is contained within” <<
运算符,当与 INET
和 CIDR
等类型连接时。对于自定义布尔运算符,我们使用 Operators.bool_op()
函数
inet_column.bool_op("<<")(cidr_column)
像上面这样的比较可以直接与 relationship.primaryjoin
一起使用,当构造 relationship()
时
class IPA(Base):
__tablename__ = "ip_address"
id = mapped_column(Integer, primary_key=True)
v4address = mapped_column(INET)
network = relationship(
"Network",
primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
viewonly=True,
)
class Network(Base):
__tablename__ = "network"
id = mapped_column(Integer, primary_key=True)
v4representation = mapped_column(CIDR)
上面,像这样的查询
select(IPA).join(IPA.network)
将渲染为
SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation
基于 SQL 函数的自定义运算符¶
Operators.op.is_comparison
用例的一个变体是我们不使用运算符,而是使用 SQL 函数。此用例的典型示例是 PostgreSQL PostGIS 函数,但是任何数据库上解析为二进制条件的任何 SQL 函数都可能适用。为了适应此用例,FunctionElement.as_comparison()
方法可以修改任何 SQL 函数,例如从 func
命名空间调用的那些函数,以向 ORM 指示该函数产生两个表达式的比较。下面的示例使用 Geoalchemy2 库对此进行了说明
from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign
class Polygon(Base):
__tablename__ = "polygon"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POLYGON", srid=4326))
points = relationship(
"Point",
primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
viewonly=True,
)
class Point(Base):
__tablename__ = "point"
id = mapped_column(Integer, primary_key=True)
geom = mapped_column(Geometry("POINT", srid=4326))
上面,FunctionElement.as_comparison()
指示 func.ST_Contains()
SQL 函数正在比较 Polygon.geom
和 Point.geom
表达式。foreign()
注释还指出哪个列在此特定关系中扮演 “外键” 角色。
1.3 版本新增: 添加了 FunctionElement.as_comparison()
。
重叠外键¶
当使用复合外键时,可能会出现一种罕见的情况,即单个列可能是通过外键约束引用的多个列的主题。
考虑一个(公认复杂的)映射,例如 Magazine
对象,Writer
对象和 Article
对象都使用复合主键方案引用它,该方案包括两者的 magazine_id
;然后为了使 Article
也引用 Writer
,Article.magazine_id
涉及两个独立的关系;Article.magazine
和 Article.writer
class Magazine(Base):
__tablename__ = "magazine"
id = mapped_column(Integer, primary_key=True)
class Article(Base):
__tablename__ = "article"
article_id = mapped_column(Integer)
magazine_id = mapped_column(ForeignKey("magazine.id"))
writer_id = mapped_column()
magazine = relationship("Magazine")
writer = relationship("Writer")
__table_args__ = (
PrimaryKeyConstraint("article_id", "magazine_id"),
ForeignKeyConstraint(
["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
),
)
class Writer(Base):
__tablename__ = "writer"
id = mapped_column(Integer, primary_key=True)
magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
magazine = relationship("Magazine")
当配置上述映射时,我们将看到发出此警告
SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.
这指的是 Article.magazine_id
是两个不同外键约束的主题这一事实;它直接将 Magazine.id
作为源列引用,但在复合键到 Writer
的上下文中也将 Writer.magazine_id
作为源列引用。如果我们将 Article
与特定的 Magazine
关联,然后将 Article
与与不同 Magazine
关联的 Writer
关联,则 ORM 将非确定性地覆盖 Article.magazine_id
,默默地更改我们引用的杂志;如果我们取消 Writer
与 Article
的关联,它也可能尝试将 NULL 放入此列。警告让我们知道情况就是这样。
为了解决这个问题,我们需要分解 Article
的行为,使其包括以下所有三个特性
Article
首先仅基于Article.magazine
关系中持久化的数据写入Article.magazine_id
,即从Magazine.id
复制的值。Article
可以代表Article.writer
关系中持久化的数据写入Article.writer_id
,但仅限于Writer.id
列;Writer.magazine_id
列不应写入Article.magazine_id
,因为它最终来源于Magazine.id
。Article
在加载Article.writer
时会考虑Article.magazine_id
,即使它不代表此关系写入它。
为了仅获得 #1 和 #2,我们可以仅将 Article.writer_id
指定为 Article.writer
的 “外键”
class Article(Base):
# ...
writer = relationship("Writer", foreign_keys="Article.writer_id")
但是,这会产生 Article.writer
在针对 Writer
查询时没有考虑 Article.magazine_id
的效果
SELECT article.article_id AS article_article_id,
article.magazine_id AS article_magazine_id,
article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id
因此,为了获得 #1、#2 和 #3 的所有内容,我们通过完全组合 relationship.primaryjoin
以及 relationship.foreign_keys
参数,或者更简洁地使用 foreign()
注释来表达连接条件以及要写入的列
class Article(Base):
# ...
writer = relationship(
"Writer",
primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
"Writer.magazine_id == Article.magazine_id)",
)
非关系比较 / 物化路径¶
警告
本节详细介绍了一项实验性功能。
使用自定义表达式意味着我们可以生成不遵循通常的主键/外键模型的非正统连接条件。一个这样的例子是物化路径模式,我们在其中比较字符串以查找重叠的路径令牌,以便生成树结构。
通过仔细使用 foreign()
和 remote()
,我们可以构建一个有效地生成基本物化路径系统的关系。本质上,当 foreign()
和 remote()
位于比较表达式的同一侧时,该关系被认为是 “一对多”;当它们位于不同侧时,该关系被认为是 “多对一”。对于我们在此处使用的比较,我们将处理集合,因此我们将配置保持为 “一对多”
class Element(Base):
__tablename__ = "element"
path = mapped_column(String, primary_key=True)
descendants = relationship(
"Element",
primaryjoin=remote(foreign(path)).like(path.concat("/%")),
viewonly=True,
order_by=path,
)
上面,如果给定一个路径属性为 "/foo/bar2"
的 Element
对象,我们寻求加载 Element.descendants
看起来像
SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path
自引用多对多关系¶
另请参阅
本节记录了 “邻接列表” 模式的两表变体,该模式在 邻接列表关系 中进行了文档化。请务必查看子节 自引用查询策略 和 配置自引用预加载 中的自引用查询模式,这些模式同样适用于此处讨论的映射模式。
多对多关系可以通过 relationship.primaryjoin
和 relationship.secondaryjoin
中的一个或两个进行自定义 - 后者对于使用 relationship.secondary
参数指定多对多引用的关系非常重要。涉及使用 relationship.primaryjoin
和 relationship.secondaryjoin
的常见情况是从一个类自身建立多对多关系,如下所示
from typing import List
from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship
class Base(DeclarativeBase):
pass
node_to_node = Table(
"node_to_node",
Base.metadata,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
class Node(Base):
__tablename__ = "node"
id: Mapped[int] = mapped_column(primary_key=True)
label: Mapped[str]
right_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.left_node_id,
secondaryjoin=id == node_to_node.c.right_node_id,
back_populates="left_nodes",
)
left_nodes: Mapped[List["Node"]] = relationship(
"Node",
secondary=node_to_node,
primaryjoin=id == node_to_node.c.right_node_id,
secondaryjoin=id == node_to_node.c.left_node_id,
back_populates="right_nodes",
)
在上面,SQLAlchemy 无法自动知道 right_nodes
和 left_nodes
关系中哪些列应该连接到哪些列。relationship.primaryjoin
和 relationship.secondaryjoin
参数建立了我们希望如何连接到关联表。在上面的 Declarative 形式中,当我们在与 Node
类对应的 Python 块中声明这些条件时,id
变量可以直接用作我们希望连接的 Column
对象。
或者,我们可以使用字符串定义 relationship.primaryjoin
和 relationship.secondaryjoin
参数,这适用于我们的配置尚不具有 Node.id
列对象可用或 node_to_node
表可能尚不可用的情况。当在声明性字符串中引用普通的 Table
对象时,我们使用表中存在的表字符串名称,因为它存在于 MetaData
中
class Node(Base):
__tablename__ = "node"
id = mapped_column(Integer, primary_key=True)
label = mapped_column(String)
right_nodes = relationship(
"Node",
secondary="node_to_node",
primaryjoin="Node.id==node_to_node.c.left_node_id",
secondaryjoin="Node.id==node_to_node.c.right_node_id",
backref="left_nodes",
)
警告
当作为 Python 可求值的字符串传递时,relationship.primaryjoin
和 relationship.secondaryjoin
参数使用 Python 的 eval()
函数解释。不要将不受信任的输入传递给这些字符串。有关 relationship()
参数的声明式求值的详细信息,请参阅 关系参数的求值。
此处的经典映射情况类似,其中 node_to_node
可以连接到 node.c.id
from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry
metadata_obj = MetaData()
mapper_registry = registry()
node_to_node = Table(
"node_to_node",
metadata_obj,
Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)
node = Table(
"node",
metadata_obj,
Column("id", Integer, primary_key=True),
Column("label", String),
)
class Node:
pass
mapper_registry.map_imperatively(
Node,
node,
properties={
"right_nodes": relationship(
Node,
secondary=node_to_node,
primaryjoin=node.c.id == node_to_node.c.left_node_id,
secondaryjoin=node.c.id == node_to_node.c.right_node_id,
backref="left_nodes",
)
},
)
请注意,在这两个示例中,relationship.backref
关键字指定了 left_nodes
反向引用 - 当 relationship()
在相反方向创建第二个关系时,它足够智能以反转 relationship.primaryjoin
和 relationship.secondaryjoin
参数。
复合 “二级” 连接¶
注意
本节介绍了一些 SQLAlchemy 勉强支持的极端情况,但建议尽可能使用更简单的方法来解决此类问题,例如使用合理的关系布局和/或 Python 属性。
有时,当人们试图在两个表之间建立 relationship()
时,可能需要两个或三个以上的表才能将它们连接起来。这是 relationship()
的一个领域,人们试图突破其可能性的边界,而且通常许多这些奇特用例的最终解决方案需要在 SQLAlchemy 邮件列表中讨论确定。
在较新版本的 SQLAlchemy 中,relationship.secondary
参数可以在某些情况下使用,以便提供由多个表组成的复合目标。以下是此类连接条件的一个示例(至少需要 0.9.2 版本才能按原样运行)
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
d = relationship(
"D",
secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
secondaryjoin="D.id == B.d_id",
uselist=False,
viewonly=True,
)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
d_id = mapped_column(ForeignKey("d.id"))
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
d_id = mapped_column(ForeignKey("d.id"))
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
在上面的示例中,我们在声明式样式中提供了 relationship.secondary
、relationship.primaryjoin
和 relationship.secondaryjoin
这三个参数,直接引用了名为 a
、b
、c
、d
的表。从 A
到 D
的查询看起来像
sess.scalars(select(A).join(A.d)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (
b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id
JOIN c AS c_1 ON c_1.d_id = d_1.id)
ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id
在上面的示例中,我们利用了能够将多个表塞进“secondary”容器的能力,以便我们可以在多个表之间进行连接,同时仍然保持 relationship()
的“简单性”,因为“左”侧和“右”侧都只有一个表;复杂性保留在中间。
警告
像上面这样的关系通常标记为 viewonly=True
,使用 relationship.viewonly
,并且应被视为只读。虽然有时有方法使上述关系可写,但这通常很复杂且容易出错。
与别名类的关系¶
在前一节中,我们说明了一种技术,我们使用 relationship.secondary
将额外的表放置在连接条件中。有一种复杂的连接情况,即使这种技术也不够充分;当我们试图从 A
连接到 B
时,使用任意数量的 C
、D
等中间表,但是 A
和 B
之间也存在直接连接条件。在这种情况下,从 A
到 B
的连接可能很难仅用复杂的 relationship.primaryjoin
条件来表示,因为中间表可能需要特殊处理,并且它也不能用 relationship.secondary
对象来表示,因为 A->secondary->B
模式不支持 A
和 B
之间的任何直接引用。当出现这种极其高级的情况时,我们可以求助于创建第二个映射作为关系的 targets。这就是我们使用 AliasedClass
来映射到一个类,该类包含此连接所需的所有附加表的地方。为了生成此映射器作为类的“替代”映射,我们使用 aliased()
函数来生成新的构造,然后像对待普通的映射类一样,对该对象使用 relationship()
。
下面说明了一个从 A
到 B
的简单连接的 relationship()
,但是 primaryjoin 条件用两个附加实体 C
和 D
进行了增强,这两个实体也必须具有与 A
和 B
中的行同时对齐的行
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
class C(Base):
__tablename__ = "c"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
some_c_value = mapped_column(String)
class D(Base):
__tablename__ = "d"
id = mapped_column(Integer, primary_key=True)
c_id = mapped_column(ForeignKey("c.id"))
b_id = mapped_column(ForeignKey("b.id"))
some_d_value = mapped_column(String)
# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
使用上面的映射,一个简单的连接看起来像
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id
将别名类映射与类型集成并避免早期映射器配置¶
对映射类创建 aliased()
构造会强制执行 configure_mappers()
步骤,这将解析所有当前的类及其关系。如果当前映射需要的无关映射类尚未声明,或者关系本身的配置需要访问尚未声明的类,则这可能会有问题。此外,当预先声明关系时,SQLAlchemy 的声明式模式与 Python 类型配合使用效果最佳。
为了组织关系的构建以解决这些问题,可以使用配置级别的事件钩子,例如 MapperEvents.before_mapper_configured()
,它只会在所有映射都准备好配置时调用配置代码
from sqlalchemy import event
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
b_id = mapped_column(ForeignKey("b.id"))
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# do the above configuration in a configuration hook
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, j, flat=True)
A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)
上面,函数 _configure_ab_relationship()
将仅在请求完全配置版本的 A
时调用,此时类 B
、D
和 C
将可用。
对于与内联类型集成的 approach,可以使用类似的技术来有效地为别名类生成“单例”创建模式,其中别名类作为全局变量延迟初始化,然后可以在关系内联中使用
from typing import Any
B_viacd: Any = None
b_viacd_join: Any = None
class A(Base):
__tablename__ = "a"
id: Mapped[int] = mapped_column(primary_key=True)
b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))
# 1. the relationship can be declared using lambdas, allowing it to resolve
# to targets that are late-configured
b: Mapped[B] = relationship(
lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
)
# 2. configure the targets of the relationship using a before_mapper_configured
# hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
# 3. set up the join() and AliasedClass as globals from within
# the configuration hook.
global B_viacd, b_viacd_join
b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
B_viacd = aliased(B, b_viacd_join, flat=True)
在查询中使用别名类目标¶
在前面的示例中,A.b
关系引用 B_viacd
实体作为目标,而不是直接引用 B
类。要添加涉及 A.b
关系的附加条件,通常需要直接引用 B_viacd
而不是使用 B
,尤其是在 A.b
的目标实体将被转换为别名或子查询的情况下。下面说明了使用子查询而不是连接的相同关系
subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()
B_viacd_subquery = aliased(B, subq)
A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)
使用上述 A.b
关系的查询将呈现一个子查询
sess.scalars(select(A).join(A.b)).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
如果我们想根据 A.b
连接添加额外的条件,我们必须以 B_viacd_subquery
而不是直接以 B
的形式进行
sess.scalars(
select(A)
.join(A.b)
.where(B_viacd_subquery.some_b_column == "some b")
.order_by(B_viacd_subquery.id)
).all()
SELECT a.id AS a_id, a.b_id AS a_b_id
FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column
FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id
WHERE anon_1.some_b_column = ? ORDER BY anon_1.id
使用窗口函数的行限制关系¶
与 AliasedClass
对象的关系的另一个有趣的用例是关系需要连接到任何形式的专门 SELECT 的情况。一种情况是希望使用窗口函数,例如限制关系应返回的行数。下面的示例说明了一个非主映射器关系,它将为每个集合加载前十个项目
class A(Base):
__tablename__ = "a"
id = mapped_column(Integer, primary_key=True)
class B(Base):
__tablename__ = "b"
id = mapped_column(Integer, primary_key=True)
a_id = mapped_column(ForeignKey("a.id"))
partition = select(
B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()
partitioned_b = aliased(B, partition)
A.partitioned_bs = relationship(
partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)
我们可以将上述 partitioned_bs
关系与大多数加载器策略一起使用,例如 selectinload()
for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
print(a1.partitioned_bs) # <-- will be no more than ten objects
其中,上面的“selectinload”查询看起来像
SELECT
a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id
上面,对于 “a” 中的每个匹配主键,我们将获得前十个 “bs”,并按 “b.id” 排序。通过在 “a_id” 上进行分区,我们确保每个 “行号” 都位于父 “a_id” 的本地。
这样的映射通常还会包括从 “A” 到 “B” 的“普通”关系,用于持久化操作以及需要每个 “A” 的完整 “B” 对象集时。
构建启用查询的属性¶
非常雄心勃勃的自定义连接条件可能无法直接持久化,并且在某些情况下甚至可能无法正确加载。要删除持久化部分,请在 relationship()
上使用标志 relationship.viewonly
,这将其建立为只读属性(写入集合的数据将在刷新时被忽略)。但是,在极端情况下,请考虑结合 Query
使用常规 Python 属性,如下所示
class User(Base):
__tablename__ = "user"
id = mapped_column(Integer, primary_key=True)
@property
def addresses(self):
return object_session(self).query(Address).with_parent(self).filter(...).all()
在其他情况下,可以构建描述符以利用现有的 Python 数据。有关特殊 Python 属性的更一般性讨论,请参阅关于 使用描述符和混合 的章节。
另请参阅
关于使用 viewonly 关系参数的注意事项¶
应用于 relationship()
构造的 relationship.viewonly
参数指示此 relationship()
不会参与任何 ORM 工作单元 操作,此外,该属性不希望参与其表示的集合的 Python 内部突变。这意味着,虽然 viewonly 关系可能引用可变的 Python 集合(如列表或集合),但对映射实例上存在的列表或集合进行更改将对 ORM 刷新过程没有影响。
要探索这种情况,请考虑以下映射
from __future__ import annotations
import datetime
from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship()
current_week_tasks: Mapped[list[Task]] = relationship(
primaryjoin=lambda: and_(
User.id == Task.user_account_id,
# this expression works on PostgreSQL but may not be supported
# by other database engines
Task.task_date >= func.now() - datetime.timedelta(days=7),
),
viewonly=True,
)
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="current_week_tasks")
以下各节将介绍此配置的不同方面。
Python 内部突变(包括反向引用)不适用于 viewonly=True¶
上面的映射将 User.current_week_tasks
viewonly 关系作为 Task.user
属性的 反向引用 目标。SQLAlchemy 的 ORM 配置过程目前未标记此项,但这是一个配置错误。更改 Task
上的 .user
属性不会影响 .current_week_tasks
属性
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]
还有另一个名为 relationship.sync_backrefs
的参数可以在此处打开以允许在这种情况下突变 .current_week_tasks
,但这不被认为是 viewonly 关系的*最佳实践*,而 viewonly 关系不应依赖于 Python 内部突变。
在此映射中,可以在 User.all_tasks
和 Task.user
之间配置反向引用,因为它们都不是 viewonly 的,并且会正常同步。
除了 viewonly 关系的反向引用突变被禁用之外,Python 中对 User.all_tasks
集合的简单更改也不会反映在 User.current_week_tasks
集合中,直到更改已刷新到数据库。
总的来说,对于自定义集合应立即响应 Python 内部突变的情况,viewonly 关系通常不适用。更好的方法是使用 SQLAlchemy 的 混合属性 功能,或者对于仅实例的情况,使用 Python @property
,其中可以实现根据当前 Python 实例生成的自定义集合。为了使我们的示例以这种方式工作,我们修复了 Task.user
上的 relationship.back_populates
参数以引用 User.all_tasks
,然后说明一个简单的 @property
,它将根据立即的 User.all_tasks
集合传递结果
class User(Base):
__tablename__ = "user_account"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str | None]
all_tasks: Mapped[list[Task]] = relationship(back_populates="user")
@property
def current_week_tasks(self) -> list[Task]:
past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
return [t for t in self.all_tasks if t.task_date >= past_seven_days]
class Task(Base):
__tablename__ = "task"
id: Mapped[int] = mapped_column(primary_key=True)
user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
description: Mapped[str | None]
task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())
user: Mapped[User] = relationship(back_populates="all_tasks")
使用每次动态计算的 Python 内部集合,我们可以保证始终获得正确的答案,而根本无需使用数据库
>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]
viewonly=True 集合/属性在过期之前不会重新查询¶
继续使用原始的 viewonly 属性,如果我们确实对 持久化 对象上的 User.all_tasks
集合进行了更改,则 viewonly 集合只能在两件事情发生后显示此更改的最终结果。首先是对 User.all_tasks
的更改被 刷新,以便新数据在数据库中可用,至少在本地事务的范围内。第二个是 User.current_week_tasks
属性 过期 并通过新的 SQL 查询重新加载到数据库。
为了支持此要求,最简单的使用流程是viewonly 关系仅在主要为只读的操作中使用。如下所示,如果我们从数据库中新鲜检索 User
,则集合将是最新的
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]
当我们修改 u1.all_tasks
时,如果我们想在 u1.current_week_tasks
viewonly 关系中看到这些更改,则需要刷新这些更改,并且 u1.current_week_tasks
属性需要过期,以便它在下次访问时 延迟加载。最简单的方法是使用 Session.commit()
,并将 Session.expire_on_commit
参数设置为默认值 True
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.commit()
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]
上面,调用 Session.commit()
将对 u1.all_tasks
的更改刷新到数据库,然后使所有对象过期,以便当我们访问 u1.current_week_tasks
时,发生 :term:`延迟加载`,从而从数据库中新鲜获取此属性的内容。
要在不实际提交事务的情况下拦截操作,首先需要显式地使属性 过期。一种简单的方法是直接调用它。在下面的示例中,Session.flush()
将挂起的更改发送到数据库,然后使用 Session.expire()
使 u1.current_week_tasks
集合过期,以便它在下次访问时重新获取
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.flush()
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
事实上,我们可以跳过对 Session.flush()
的调用,假设 Session
将 Session.autoflush
保留在其默认值 True
,因为过期的 current_week_tasks
属性将在过期后访问时触发自动刷新
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... sess.expire(u1, ["current_week_tasks"])
... print(u1.current_week_tasks) # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]
继续使用上述方法进行更详细的说明,我们可以在相关的 User.all_tasks
集合更改时以编程方式应用过期,使用 事件钩子。这是一个高级技术,应首先检查更简单的架构,例如 @property
或坚持只读用例。在我们的简单示例中,这将配置为
from sqlalchemy import event, inspect
@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
inspect(target).session.expire(target, ["current_week_tasks"])
使用上面的钩子,突变操作被拦截,并导致 User.current_week_tasks
集合自动过期
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]
上面使用的 AttributeEvents
事件钩子也由反向引用突变触发,因此使用上面的钩子,对 Task.user
的更改也会被拦截
>>> with Session(e) as sess:
... u1 = sess.scalar(select(User).where(User.id == 1))
... t1 = Task(task_date=datetime.datetime.now())
... t1.user = u1
... sess.add(t1)
... print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]
flambé! 龙和 The Alchemist 图像设计由 Rotem Yaari 创作并慷慨捐赠。
使用 Sphinx 7.2.6 创建。文档最后生成时间:Tue 11 Mar 2025 02:40:17 PM EDT