配置关系连接方式

relationship() 通常会通过检查两表之间的外键关系来创建两表之间的连接,以确定应比较哪些列。在许多情况下,这种行为需要自定义。

处理多重连接路径

最常见的需要处理的情况之一是当两表之间存在多个外键路径时。

考虑一个 Customer 类,它包含两个指向 Address 类的外键

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

当我们尝试使用上述映射时,会产生错误

sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

上述消息很长。 relationship() 可以返回许多潜在的消息,这些消息经过精心设计,可以检测各种常见的配置问题;大多数消息会建议解决歧义或其他缺失信息所需的额外配置。

在这种情况下,消息希望我们通过指示每个关系应考虑哪个外键列来限定每个 relationship(),合适的格式如下

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

上面,我们指定了 foreign_keys 参数,它是一个 ColumnColumn 对象列表,这些对象指示要被视为“外键”的列,或者换句话说,包含引用父表的值的列。从 Customer 对象加载 Customer.billing_address 关系将使用 billing_address_id 中存在的值来标识要加载的 Address 中的行;类似地,shipping_address_id 用于 shipping_address 关系。这两列的链接在持久化期间也起作用;刚刚插入的 Address 对象的新生成的 primary key 将在刷新期间被复制到关联的 Customer 对象的适当外键列中。

当使用 Declarative 指定 foreign_keys 时,我们也可以使用字符串名称来指定,但是重要的是,如果使用列表,则列表是字符串的一部分

billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")

在这个特定示例中,列表在任何情况下都不是必需的,因为我们只需要一个 Column

billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")

警告

当作为 Python 可求值的字符串传递时,relationship.foreign_keys 参数使用 Python 的 eval() 函数解释。不要将不受信任的输入传递给此字符串。有关 relationship() 参数的声明式求值的详细信息,请参阅 关系参数的求值

指定备选连接条件

relationship() 在构造连接时的默认行为是将一侧的主键列的值等同于另一侧的外键引用列的值。我们可以使用 relationship.primaryjoin 参数以及在使用“二级”表时使用 relationship.secondaryjoin 参数将此标准更改为我们想要的任何内容。

在下面的示例中,使用 User 类以及存储街道地址的 Address 类,我们创建一个 boston_addresses 关系,该关系将仅加载那些指定城市为 “Boston” 的 Address 对象

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

在此字符串 SQL 表达式中,我们使用了 and_() 连接构造来为连接条件建立两个不同的谓词 - 将 User.idAddress.user_id 列相互连接,并将 Address 中的行限制为仅 city='Boston'。当使用 Declarative 时,像 and_() 这样的基本 SQL 函数在字符串 relationship() 参数的求值命名空间中自动可用。

警告

当作为 Python 可求值的字符串传递时,relationship.primaryjoin 参数使用 Python 的 eval() 函数解释。不要将不受信任的输入传递给此字符串。有关 relationship() 参数的声明式求值的详细信息,请参阅 关系参数的求值

我们在 relationship.primaryjoin 中使用的自定义条件通常仅在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才重要。也就是说,它用于发出的 SQL 语句中,以便执行每个属性的延迟加载,或者在查询时构造连接时,例如通过 Select.join(),或者通过 eager “joined” 或 “subquery” 加载样式。当内存中的对象被操作时,我们可以将任何我们想要的 Address 对象放入 boston_addresses 集合中,无论 .city 属性的值是什么。对象将保留在集合中,直到属性过期并从应用条件的数据库重新加载。当发生刷新时,boston_addresses 内部的对象将被无条件刷新,将主键 user.id 列的值分配到每个行的外键持有 address.user_id 列上。city 条件在这里不起作用,因为刷新过程只关心将主键值同步到引用的外键值中。

创建自定义外键条件

主连接条件的另一个要素是如何确定那些被认为是“外键”的列。通常,Column 对象的一些子集将指定 ForeignKey,或者成为与连接条件相关的 ForeignKeyConstraint 的一部分。relationship() 会查看此外键状态,因为它决定了它应该如何为此关系加载和持久化数据。但是,relationship.primaryjoin 参数可用于创建不涉及任何 “schema” 级别外键的连接条件。我们可以显式地组合 relationship.primaryjoin 以及 relationship.foreign_keysrelationship.remote_side,以便建立这样的连接。

下面,HostEntry 类连接到自身,将字符串 content 列等同于 ip_address 列,后者是 PostgreSQL 类型,称为 INET。我们需要使用 cast() 以将连接的一侧转换为另一侧的类型

from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )

上述关系将产生如下连接

SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)

上述语法的替代方案是在 relationship.primaryjoin 表达式中内联使用 foreign()remote() 注释。此语法表示 relationship() 通常根据 relationship.foreign_keysrelationship.remote_side 参数自行应用于连接条件的注释。当存在显式连接条件时,这些函数可能更简洁,此外,无论该列是否多次声明或在复杂的 SQL 表达式中声明,它们都用于精确标记 “外键” 或 “远程” 列

from sqlalchemy.orm import foreign, remote


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )

在连接条件中使用自定义运算符

关系的另一个用例是使用自定义运算符,例如 PostgreSQL 的 “is contained within” << 运算符,当与 INETCIDR 等类型连接时。对于自定义布尔运算符,我们使用 Operators.bool_op() 函数

inet_column.bool_op("<<")(cidr_column)

像上面这样的比较可以直接与 relationship.primaryjoin 一起使用,当构造 relationship()

class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )


class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)

上面,像这样的查询

select(IPA).join(IPA.network)

将渲染为

SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation

基于 SQL 函数的自定义运算符

Operators.op.is_comparison 用例的一个变体是我们不使用运算符,而是使用 SQL 函数。此用例的典型示例是 PostgreSQL PostGIS 函数,但是任何数据库上解析为二进制条件的任何 SQL 函数都可能适用。为了适应此用例,FunctionElement.as_comparison() 方法可以修改任何 SQL 函数,例如从 func 命名空间调用的那些函数,以向 ORM 指示该函数产生两个表达式的比较。下面的示例使用 Geoalchemy2 库对此进行了说明

from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign


class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )


class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))

上面,FunctionElement.as_comparison() 指示 func.ST_Contains() SQL 函数正在比较 Polygon.geomPoint.geom 表达式。foreign() 注释还指出哪个列在此特定关系中扮演 “外键” 角色。

1.3 版本新增: 添加了 FunctionElement.as_comparison()

重叠外键

当使用复合外键时,可能会出现一种罕见的情况,即单个列可能是通过外键约束引用的多个列的主题。

考虑一个(公认复杂的)映射,例如 Magazine 对象,Writer 对象和 Article 对象都使用复合主键方案引用它,该方案包括两者的 magazine_id;然后为了使 Article 也引用 WriterArticle.magazine_id 涉及两个独立的关系;Article.magazineArticle.writer

class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)


class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )


class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")

当配置上述映射时,我们将看到发出此警告

SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.

这指的是 Article.magazine_id 是两个不同外键约束的主题这一事实;它直接将 Magazine.id 作为源列引用,但在复合键到 Writer 的上下文中也将 Writer.magazine_id 作为源列引用。如果我们将 Article 与特定的 Magazine 关联,然后将 Article 与与不同 Magazine 关联的 Writer 关联,则 ORM 将非确定性地覆盖 Article.magazine_id,默默地更改我们引用的杂志;如果我们取消 WriterArticle 的关联,它也可能尝试将 NULL 放入此列。警告让我们知道情况就是这样。

为了解决这个问题,我们需要分解 Article 的行为,使其包括以下所有三个特性

  1. Article 首先仅基于 Article.magazine 关系中持久化的数据写入 Article.magazine_id,即从 Magazine.id 复制的值。

  2. Article 可以代表 Article.writer 关系中持久化的数据写入 Article.writer_id,但仅限于 Writer.id 列;Writer.magazine_id 列不应写入 Article.magazine_id,因为它最终来源于 Magazine.id

  3. Article 在加载 Article.writer 时会考虑 Article.magazine_id,即使它代表此关系写入它。

为了仅获得 #1 和 #2,我们可以仅将 Article.writer_id 指定为 Article.writer 的 “外键”

class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")

但是,这会产生 Article.writer 在针对 Writer 查询时没有考虑 Article.magazine_id 的效果

SELECT article.article_id AS article_article_id,
    article.magazine_id AS article_magazine_id,
    article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id

因此,为了获得 #1、#2 和 #3 的所有内容,我们通过完全组合 relationship.primaryjoin 以及 relationship.foreign_keys 参数,或者更简洁地使用 foreign() 注释来表达连接条件以及要写入的列

class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )

非关系比较 / 物化路径

警告

本节详细介绍了一项实验性功能。

使用自定义表达式意味着我们可以生成不遵循通常的主键/外键模型的非正统连接条件。一个这样的例子是物化路径模式,我们在其中比较字符串以查找重叠的路径令牌,以便生成树结构。

通过仔细使用 foreign()remote(),我们可以构建一个有效地生成基本物化路径系统的关系。本质上,当 foreign()remote() 位于比较表达式的同一侧时,该关系被认为是 “一对多”;当它们位于不同侧时,该关系被认为是 “多对一”。对于我们在此处使用的比较,我们将处理集合,因此我们将配置保持为 “一对多”

class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )

上面,如果给定一个路径属性为 "/foo/bar2"Element 对象,我们寻求加载 Element.descendants 看起来像

SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path

自引用多对多关系

另请参阅

本节记录了 “邻接列表” 模式的两表变体,该模式在 邻接列表关系 中进行了文档化。请务必查看子节 自引用查询策略配置自引用预加载 中的自引用查询模式,这些模式同样适用于此处讨论的映射模式。

多对多关系可以通过 relationship.primaryjoinrelationship.secondaryjoin 中的一个或两个进行自定义 - 后者对于使用 relationship.secondary 参数指定多对多引用的关系非常重要。涉及使用 relationship.primaryjoinrelationship.secondaryjoin 的常见情况是从一个类自身建立多对多关系,如下所示

from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship


class Base(DeclarativeBase):
    pass


node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)


class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )

在上面,SQLAlchemy 无法自动知道 right_nodesleft_nodes 关系中哪些列应该连接到哪些列。relationship.primaryjoinrelationship.secondaryjoin 参数建立了我们希望如何连接到关联表。在上面的 Declarative 形式中,当我们在与 Node 类对应的 Python 块中声明这些条件时,id 变量可以直接用作我们希望连接的 Column 对象。

或者,我们可以使用字符串定义 relationship.primaryjoinrelationship.secondaryjoin 参数,这适用于我们的配置尚不具有 Node.id 列对象可用或 node_to_node 表可能尚不可用的情况。当在声明性字符串中引用普通的 Table 对象时,我们使用表中存在的表字符串名称,因为它存在于 MetaData

class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )

警告

当作为 Python 可求值的字符串传递时,relationship.primaryjoinrelationship.secondaryjoin 参数使用 Python 的 eval() 函数解释。不要将不受信任的输入传递给这些字符串。有关 relationship() 参数的声明式求值的详细信息,请参阅 关系参数的求值

此处的经典映射情况类似,其中 node_to_node 可以连接到 node.c.id

from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)


class Node:
    pass


mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)

请注意,在这两个示例中,relationship.backref 关键字指定了 left_nodes 反向引用 - 当 relationship() 在相反方向创建第二个关系时,它足够智能以反转 relationship.primaryjoinrelationship.secondaryjoin 参数。

另请参阅

复合 “二级” 连接

注意

本节介绍了一些 SQLAlchemy 勉强支持的极端情况,但建议尽可能使用更简单的方法来解决此类问题,例如使用合理的关系布局和/或 Python 属性

有时,当人们试图在两个表之间建立 relationship() 时,可能需要两个或三个以上的表才能将它们连接起来。这是 relationship() 的一个领域,人们试图突破其可能性的边界,而且通常许多这些奇特用例的最终解决方案需要在 SQLAlchemy 邮件列表中讨论确定。

在较新版本的 SQLAlchemy 中,relationship.secondary 参数可以在某些情况下使用,以便提供由多个表组成的复合目标。以下是此类连接条件的一个示例(至少需要 0.9.2 版本才能按原样运行)

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)

在上面的示例中,我们在声明式样式中提供了 relationship.secondaryrelationship.primaryjoinrelationship.secondaryjoin 这三个参数,直接引用了名为 abcd 的表。从 AD 的查询看起来像

sess.scalars(select(A).join(A.d)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN ( b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id

在上面的示例中,我们利用了能够将多个表塞进“secondary”容器的能力,以便我们可以在多个表之间进行连接,同时仍然保持 relationship() 的“简单性”,因为“左”侧和“右”侧都只有一个表;复杂性保留在中间。

警告

像上面这样的关系通常标记为 viewonly=True,使用 relationship.viewonly,并且应被视为只读。虽然有时有方法使上述关系可写,但这通常很复杂且容易出错。

与别名类的关系

在前一节中,我们说明了一种技术,我们使用 relationship.secondary 将额外的表放置在连接条件中。有一种复杂的连接情况,即使这种技术也不够充分;当我们试图从 A 连接到 B 时,使用任意数量的 CD 等中间表,但是 AB 之间也存在直接连接条件。在这种情况下,从 AB 的连接可能很难仅用复杂的 relationship.primaryjoin 条件来表示,因为中间表可能需要特殊处理,并且它也不能用 relationship.secondary 对象来表示,因为 A->secondary->B 模式不支持 AB 之间的任何直接引用。当出现这种极其高级的情况时,我们可以求助于创建第二个映射作为关系的 targets。这就是我们使用 AliasedClass 来映射到一个类,该类包含此连接所需的所有附加表的地方。为了生成此映射器作为类的“替代”映射,我们使用 aliased() 函数来生成新的构造,然后像对待普通的映射类一样,对该对象使用 relationship()

下面说明了一个从 AB 的简单连接的 relationship(),但是 primaryjoin 条件用两个附加实体 CD 进行了增强,这两个实体也必须具有与 AB 中的行同时对齐的行

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)


# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

使用上面的映射,一个简单的连接看起来像

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id

将别名类映射与类型集成并避免早期映射器配置

对映射类创建 aliased() 构造会强制执行 configure_mappers() 步骤,这将解析所有当前的类及其关系。如果当前映射需要的无关映射类尚未声明,或者关系本身的配置需要访问尚未声明的类,则这可能会有问题。此外,当预先声明关系时,SQLAlchemy 的声明式模式与 Python 类型配合使用效果最佳。

为了组织关系的构建以解决这些问题,可以使用配置级别的事件钩子,例如 MapperEvents.before_mapper_configured(),它只会在所有映射都准备好配置时调用配置代码

from sqlalchemy import event


class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

上面,函数 _configure_ab_relationship() 将仅在请求完全配置版本的 A 时调用,此时类 BDC 将可用。

对于与内联类型集成的 approach,可以使用类似的技术来有效地为别名类生成“单例”创建模式,其中别名类作为全局变量延迟初始化,然后可以在关系内联中使用

from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )


# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)

在查询中使用别名类目标

在前面的示例中,A.b 关系引用 B_viacd 实体作为目标,而不是直接引用 B 类。要添加涉及 A.b 关系的附加条件,通常需要直接引用 B_viacd 而不是使用 B,尤其是在 A.b 的目标实体将被转换为别名或子查询的情况下。下面说明了使用子查询而不是连接的相同关系

subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

使用上述 A.b 关系的查询将呈现一个子查询

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id

如果我们想根据 A.b 连接添加额外的条件,我们必须以 B_viacd_subquery 而不是直接以 B 的形式进行

sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id WHERE anon_1.some_b_column = ? ORDER BY anon_1.id

使用窗口函数的行限制关系

AliasedClass 对象的关系的另一个有趣的用例是关系需要连接到任何形式的专门 SELECT 的情况。一种情况是希望使用窗口函数,例如限制关系应返回的行数。下面的示例说明了一个非主映射器关系,它将为每个集合加载前十个项目

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)


class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))


partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

我们可以将上述 partitioned_bs 关系与大多数加载器策略一起使用,例如 selectinload()

for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

其中,上面的“selectinload”查询看起来像

SELECT
    a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
    anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
    SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
    row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
    FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id

上面,对于 “a” 中的每个匹配主键,我们将获得前十个 “bs”,并按 “b.id” 排序。通过在 “a_id” 上进行分区,我们确保每个 “行号” 都位于父 “a_id” 的本地。

这样的映射通常还会包括从 “A” 到 “B” 的“普通”关系,用于持久化操作以及需要每个 “A” 的完整 “B” 对象集时。

构建启用查询的属性

非常雄心勃勃的自定义连接条件可能无法直接持久化,并且在某些情况下甚至可能无法正确加载。要删除持久化部分,请在 relationship() 上使用标志 relationship.viewonly,这将其建立为只读属性(写入集合的数据将在刷新时被忽略)。但是,在极端情况下,请考虑结合 Query 使用常规 Python 属性,如下所示

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

在其他情况下,可以构建描述符以利用现有的 Python 数据。有关特殊 Python 属性的更一般性讨论,请参阅关于 使用描述符和混合 的章节。

关于使用 viewonly 关系参数的注意事项

应用于 relationship() 构造的 relationship.viewonly 参数指示此 relationship() 不会参与任何 ORM 工作单元 操作,此外,该属性不希望参与其表示的集合的 Python 内部突变。这意味着,虽然 viewonly 关系可能引用可变的 Python 集合(如列表或集合),但对映射实例上存在的列表或集合进行更改将对 ORM 刷新过程没有影响

要探索这种情况,请考虑以下映射

from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")

以下各节将介绍此配置的不同方面。

Python 内部突变(包括反向引用)不适用于 viewonly=True

上面的映射将 User.current_week_tasks viewonly 关系作为 Task.user 属性的 反向引用 目标。SQLAlchemy 的 ORM 配置过程目前未标记此项,但这是一个配置错误。更改 Task 上的 .user 属性不会影响 .current_week_tasks 属性

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

还有另一个名为 relationship.sync_backrefs 的参数可以在此处打开以允许在这种情况下突变 .current_week_tasks,但这不被认为是 viewonly 关系的*最佳实践*,而 viewonly 关系不应依赖于 Python 内部突变。

在此映射中,可以在 User.all_tasksTask.user 之间配置反向引用,因为它们都不是 viewonly 的,并且会正常同步。

除了 viewonly 关系的反向引用突变被禁用之外,Python 中对 User.all_tasks 集合的简单更改也不会反映在 User.current_week_tasks 集合中,直到更改已刷新到数据库。

总的来说,对于自定义集合应立即响应 Python 内部突变的情况,viewonly 关系通常不适用。更好的方法是使用 SQLAlchemy 的 混合属性 功能,或者对于仅实例的情况,使用 Python @property,其中可以实现根据当前 Python 实例生成的自定义集合。为了使我们的示例以这种方式工作,我们修复了 Task.user 上的 relationship.back_populates 参数以引用 User.all_tasks,然后说明一个简单的 @property,它将根据立即的 User.all_tasks 集合传递结果

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

使用每次动态计算的 Python 内部集合,我们可以保证始终获得正确的答案,而根本无需使用数据库

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]

viewonly=True 集合/属性在过期之前不会重新查询

继续使用原始的 viewonly 属性,如果我们确实对 持久化 对象上的 User.all_tasks 集合进行了更改,则 viewonly 集合只能在两件事情发生后显示此更改的最终结果。首先是对 User.all_tasks 的更改被 刷新,以便新数据在数据库中可用,至少在本地事务的范围内。第二个是 User.current_week_tasks 属性 过期 并通过新的 SQL 查询重新加载到数据库。

为了支持此要求,最简单的使用流程是viewonly 关系仅在主要为只读的操作中使用。如下所示,如果我们从数据库中新鲜检索 User,则集合将是最新的

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们修改 u1.all_tasks 时,如果我们想在 u1.current_week_tasks viewonly 关系中看到这些更改,则需要刷新这些更改,并且 u1.current_week_tasks 属性需要过期,以便它在下次访问时 延迟加载。最简单的方法是使用 Session.commit(),并将 Session.expire_on_commit 参数设置为默认值 True

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

上面,调用 Session.commit() 将对 u1.all_tasks 的更改刷新到数据库,然后使所有对象过期,以便当我们访问 u1.current_week_tasks 时,发生 :term:`延迟加载`,从而从数据库中新鲜获取此属性的内容。

要在不实际提交事务的情况下拦截操作,首先需要显式地使属性 过期。一种简单的方法是直接调用它。在下面的示例中,Session.flush() 将挂起的更改发送到数据库,然后使用 Session.expire() 使 u1.current_week_tasks 集合过期,以便它在下次访问时重新获取

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

事实上,我们可以跳过对 Session.flush() 的调用,假设 SessionSession.autoflush 保留在其默认值 True,因为过期的 current_week_tasks 属性将在过期后访问时触发自动刷新

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续使用上述方法进行更详细的说明,我们可以在相关的 User.all_tasks 集合更改时以编程方式应用过期,使用 事件钩子。这是一个高级技术,应首先检查更简单的架构,例如 @property 或坚持只读用例。在我们的简单示例中,这将配置为

from sqlalchemy import event, inspect


@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

使用上面的钩子,突变操作被拦截,并导致 User.current_week_tasks 集合自动过期

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的 AttributeEvents 事件钩子也由反向引用突变触发,因此使用上面的钩子,对 Task.user 的更改也会被拦截

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]