配置关系联接方式

relationship() 通常会通过检查两个表之间的外键关系来创建两个表之间的联接,以确定应比较哪些列。在某些情况下,需要自定义此行为。

处理多个联接路径

最常见的情况之一是,两个表之间存在多个外键路径。

考虑一个包含两个指向 Address 类的外键的 Customer

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address")
    shipping_address = relationship("Address")


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

上面的映射在尝试使用时会产生错误

sqlalchemy.exc.AmbiguousForeignKeysError: Could not determine join
condition between parent/child tables on relationship
Customer.billing_address - there are multiple foreign key
paths linking the tables.  Specify the 'foreign_keys' argument,
providing a list of those columns which should be
counted as containing a foreign key reference to the parent table.

上面的消息很长。relationship() 可能返回许多潜在的消息,这些消息经过精心设计,可以检测各种常见的配置问题;大多数消息会建议需要进行的额外配置来解决歧义或其他缺少的信息。

在本例中,该消息希望我们通过为每个 relationship() 指示应考虑哪个外键列来限定每个 relationship(),合适的形式如下

class Customer(Base):
    __tablename__ = "customer"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)

    billing_address_id = mapped_column(Integer, ForeignKey("address.id"))
    shipping_address_id = mapped_column(Integer, ForeignKey("address.id"))

    billing_address = relationship("Address", foreign_keys=[billing_address_id])
    shipping_address = relationship("Address", foreign_keys=[shipping_address_id])

在上面,我们指定了 foreign_keys 参数,它是一个 ColumnColumn 对象列表,这些对象指示应将哪些列视为“外键”,换句话说,这些列包含引用父表的的值。从 Customer 对象加载 Customer.billing_address 关系将使用 billing_address_id 中的值来识别要加载的 Address 中的行;类似地,shipping_address_id 用于 shipping_address 关系。两个列的链接在持久化过程中也发挥作用;在刷新期间,刚插入的 Address 对象新生成的 主键将被复制到相关联 Customer 对象的相应外键列中。

在使用 Declarative 时,指定 foreign_keys 时,我们也可以使用字符串名称来指定,但重要的是,如果使用列表,列表是字符串的一部分

billing_address = relationship("Address", foreign_keys="[Customer.billing_address_id]")

在本例中,列表不是必需的,因为我们只需要一个 Column

billing_address = relationship("Address", foreign_keys="Customer.billing_address_id")

警告

当作为可评估的 Python 字符串传递时,relationship.foreign_keys 参数使用 Python 的 eval() 函数进行解释。请勿将不可信的输入传递给此字符串。有关 relationship() 参数的声明式评估详细信息,请参见 关系参数的评估

指定替代联接条件

在构建联接时,relationship() 的默认行为是将一侧上的主键列的值与另一侧上的外键引用列的值进行比较。我们可以使用 relationship.primaryjoin 参数(以及在使用“二级”表时使用 relationship.secondaryjoin 参数)将此标准更改为我们想要的任何内容。

在下面的示例中,使用 User 类以及存储街道地址的 Address 类,我们创建一个名为 boston_addresses 的关系,该关系只加载指定城市为“Boston”的那些 Address 对象

from sqlalchemy import Integer, ForeignKey, String, Column
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)
    name = mapped_column(String)
    boston_addresses = relationship(
        "Address",
        primaryjoin="and_(User.id==Address.user_id, Address.city=='Boston')",
    )


class Address(Base):
    __tablename__ = "address"
    id = mapped_column(Integer, primary_key=True)
    user_id = mapped_column(Integer, ForeignKey("user.id"))

    street = mapped_column(String)
    city = mapped_column(String)
    state = mapped_column(String)
    zip = mapped_column(String)

在此字符串 SQL 表达式中,我们使用了 and_() 连接构造来建立联接条件的两个不同的谓词 - 将 User.idAddress.user_id 列相互联接,并将 Address 中的行限制为仅 city='Boston'。在使用 Declarative 时,像 and_() 这样的基本 SQL 函数会自动在字符串 relationship() 参数的评估命名空间中可用。

警告

当作为可评估的 Python 字符串传递时,relationship.primaryjoin 参数使用 Python 的 eval() 函数进行解释。请勿将不可信的输入传递给此字符串。有关 relationship() 参数的声明式评估详细信息,请参见 关系参数的评估

我们在 relationship.primaryjoin 中使用的自定义标准通常只在 SQLAlchemy 渲染 SQL 以加载或表示此关系时才很重要。也就是说,它用于发出 SQL 语句以执行按属性的延迟加载,或者在查询时构造联接,例如通过 Select.join(),或者通过“joined”或“subquery”的 eager 加载样式。当在内存中操作对象时,我们可以将任何 Address 对象放在 boston_addresses 集合中,而不管 .city 属性的值是什么。这些对象将保留在集合中,直到属性过期并从数据库中重新加载,并在其中应用标准。当发生刷新时,boston_addresses 内部的对象将无条件地刷新,将主键 user.id 列的值分配到每行的外键持有列 address.user_id 上。city 标准在这里不起作用,因为刷新过程只关心将主键值同步到引用的外键值。

创建自定义外键条件

主连接条件的另一个要素是确定哪些列被认为是“外键”。通常,Column 对象的子集将指定 ForeignKey,或者作为与连接条件相关的 ForeignKeyConstraint 的一部分。relationship() 会查看此外键状态,因为它决定了如何为这种关系加载和持久化数据。但是,relationship.primaryjoin 参数可以用来创建不涉及任何“模式”级别外键的连接条件。我们可以将 relationship.primaryjoinrelationship.foreign_keysrelationship.remote_side 显式地结合起来,以建立这样的连接。

下面,类 HostEntry 连接到自身,将字符串 content 列等同于 ip_address 列,它是一个名为 INET 的 PostgreSQL 类型。我们需要使用 cast() 将连接的一侧转换为另一侧的类型。

from sqlalchemy import cast, String, Column, Integer
from sqlalchemy.orm import relationship
from sqlalchemy.dialects.postgresql import INET

from sqlalchemy.orm import DeclarativeBase


class Base(DeclarativeBase):
    pass


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign_keys, remote_side
    parent_host = relationship(
        "HostEntry",
        primaryjoin=ip_address == cast(content, INET),
        foreign_keys=content,
        remote_side=ip_address,
    )

上面的关系将产生一个类似于下面的连接:

SELECT host_entry.id, host_entry.ip_address, host_entry.content
FROM host_entry JOIN host_entry AS host_entry_1
ON host_entry_1.ip_address = CAST(host_entry.content AS INET)

上面语法的一种替代方法是使用 foreign()remote() annotations,它们在 relationship.primaryjoin 表达式中内联。这种语法表示 relationship() 通常根据 relationship.foreign_keysrelationship.remote_side 参数自动应用于连接条件的注解。当存在显式连接条件时,这些函数可能更简洁,并且还可以准确地标记“外键”或“远程”的列,无论该列是否在复杂的 SQL 表达式中多次出现。

from sqlalchemy.orm import foreign, remote


class HostEntry(Base):
    __tablename__ = "host_entry"

    id = mapped_column(Integer, primary_key=True)
    ip_address = mapped_column(INET)
    content = mapped_column(String(50))

    # relationship() using explicit foreign() and remote() annotations
    # in lieu of separate arguments
    parent_host = relationship(
        "HostEntry",
        primaryjoin=remote(ip_address) == cast(foreign(content), INET),
    )

在连接条件中使用自定义操作符

关系的另一个用例是使用自定义操作符,例如 PostgreSQL 的“包含在内”<< 操作符,在与诸如 INETCIDR 这样的类型连接时。对于自定义布尔操作符,我们使用 Operators.bool_op() 函数。

inet_column.bool_op("<<")(cidr_column)

像上面这样的比较可以在构造 relationship() 时直接与 relationship.primaryjoin 一起使用。

class IPA(Base):
    __tablename__ = "ip_address"

    id = mapped_column(Integer, primary_key=True)
    v4address = mapped_column(INET)

    network = relationship(
        "Network",
        primaryjoin="IPA.v4address.bool_op('<<')(foreign(Network.v4representation))",
        viewonly=True,
    )


class Network(Base):
    __tablename__ = "network"

    id = mapped_column(Integer, primary_key=True)
    v4representation = mapped_column(CIDR)

上面,一个像这样的查询:

select(IPA).join(IPA.network)

将呈现为

SELECT ip_address.id AS ip_address_id, ip_address.v4address AS ip_address_v4address
FROM ip_address JOIN network ON ip_address.v4address << network.v4representation

基于 SQL 函数的自定义操作符

使用 Operators.op.is_comparison 的用例的一个变体是当我们不使用操作符,而是使用 SQL 函数时。这个用例的典型例子是 PostgreSQL 的 PostGIS 函数,但是任何数据库上解析为二元条件的 SQL 函数都可能适用。为了适应这个用例,FunctionElement.as_comparison() 方法可以修改任何 SQL 函数,例如从 func 命名空间调用的函数,以指示 ORM 该函数产生两个表达式的比较。下面的例子说明了这一点,使用了 Geoalchemy2 库。

from geoalchemy2 import Geometry
from sqlalchemy import Column, Integer, func
from sqlalchemy.orm import relationship, foreign


class Polygon(Base):
    __tablename__ = "polygon"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POLYGON", srid=4326))
    points = relationship(
        "Point",
        primaryjoin="func.ST_Contains(foreign(Polygon.geom), Point.geom).as_comparison(1, 2)",
        viewonly=True,
    )


class Point(Base):
    __tablename__ = "point"
    id = mapped_column(Integer, primary_key=True)
    geom = mapped_column(Geometry("POINT", srid=4326))

上面,FunctionElement.as_comparison() 指示 func.ST_Contains() SQL 函数正在比较 Polygon.geomPoint.geom 表达式。foreign() 注解还指出在特定关系中哪个列扮演“外键”角色。

版本 1.3 中的新增内容: 添加了 FunctionElement.as_comparison()

重叠外键

当使用复合外键时,可能会出现一种罕见的情况,即单个列可能是通过外键约束引用的多个列的主题。

考虑一个(不可否认很复杂)映射,例如 Magazine 对象,它由 Writer 对象和 Article 对象使用包括 magazine_id 的复合主键方案引用;然后为了让 Article 也引用 WriterArticle.magazine_id 参与了两个不同的关系;Article.magazineArticle.writer

class Magazine(Base):
    __tablename__ = "magazine"

    id = mapped_column(Integer, primary_key=True)


class Article(Base):
    __tablename__ = "article"

    article_id = mapped_column(Integer)
    magazine_id = mapped_column(ForeignKey("magazine.id"))
    writer_id = mapped_column()

    magazine = relationship("Magazine")
    writer = relationship("Writer")

    __table_args__ = (
        PrimaryKeyConstraint("article_id", "magazine_id"),
        ForeignKeyConstraint(
            ["writer_id", "magazine_id"], ["writer.id", "writer.magazine_id"]
        ),
    )


class Writer(Base):
    __tablename__ = "writer"

    id = mapped_column(Integer, primary_key=True)
    magazine_id = mapped_column(ForeignKey("magazine.id"), primary_key=True)
    magazine = relationship("Magazine")

当配置上述映射时,我们将看到发出此警告。

SAWarning: relationship 'Article.writer' will copy column
writer.magazine_id to column article.magazine_id,
which conflicts with relationship(s): 'Article.magazine'
(copies magazine.id to article.magazine_id). Consider applying
viewonly=True to read-only relationships, or provide a primaryjoin
condition marking writable columns with the foreign() annotation.

这指的是 Article.magazine_id 是两个不同外键约束的主题这一事实;它直接引用 Magazine.id 作为源列,但也引用 Writer.magazine_id 作为复合主键到 Writer 的源列。如果我们将一个 Article 与一个特定的 Magazine 关联,但随后将该 Article 与一个与不同 Magazine 关联的 Writer 关联,ORM 将非确定性地覆盖 Article.magazine_id, silently changing which magazine to which we refer; it may also attempt to place NULL into this column if we de-associate a Writer from an Article. The warning lets us know this is the case.

为了解决这个问题,我们需要将 Article 的行为分解为以下三个特征。

  1. Article 首先根据 Article.magazine 关系中持久化的数据写入 Article.magazine_id,即从 Magazine.id 复制的值。

  2. Article 可以代表存储在 Article.writer 关系中的数据写入 Article.writer_id,但只能写入 Writer.id 列;Writer.magazine_id 列不应写入 Article.magazine_id,因为它最终来源于 Magazine.id

  3. Article 在加载 Article.writer 时会考虑 Article.magazine_id,即使它代表此关系写入它。

为了只获得 #1 和 #2,我们可以只指定 Article.writer_id 作为 Article.writer 的“外键”。

class Article(Base):
    # ...

    writer = relationship("Writer", foreign_keys="Article.writer_id")

但是,这会造成 Article.writer 在针对 Writer 查询时不考虑 Article.magazine_id

SELECT article.article_id AS article_article_id,
    article.magazine_id AS article_magazine_id,
    article.writer_id AS article_writer_id
FROM article
JOIN writer ON writer.id = article.writer_id

因此,为了获得 #1、#2 和 #3,我们将联接条件以及要写入的列一起表达,通过结合 relationship.primaryjoin 以及 relationship.foreign_keys 参数,或者更简洁地用 foreign() 进行标注。

class Article(Base):
    # ...

    writer = relationship(
        "Writer",
        primaryjoin="and_(Writer.id == foreign(Article.writer_id), "
        "Writer.magazine_id == Article.magazine_id)",
    )

非关系型比较/物化路径

警告

本节详细介绍一项实验性功能。

使用自定义表达式意味着我们可以创建不遵循常规主键/外键模型的非正统联接条件。一个这样的例子是物化路径模式,我们比较字符串中重叠的路径标记以生成树状结构。

通过谨慎使用 foreign()remote(),我们可以构建一个关系,有效地生成一个基本的物化路径系统。本质上,当 foreign()remote() 位于比较表达式的同一侧时,关系被认为是“一对多”;当它们位于不同侧时,关系被认为是“多对一”。对于我们将在这里使用的比较,我们将处理集合,因此我们将配置保持为“一对多”。

class Element(Base):
    __tablename__ = "element"

    path = mapped_column(String, primary_key=True)

    descendants = relationship(
        "Element",
        primaryjoin=remote(foreign(path)).like(path.concat("/%")),
        viewonly=True,
        order_by=path,
    )

上面,如果给定一个路径属性为 "/foo/bar2"Element 对象,我们希望加载 Element.descendants,使其看起来像这样。

SELECT element.path AS element_path
FROM element
WHERE element.path LIKE ('/foo/bar2' || '/%') ORDER BY element.path

自引用多对多关系

另见

本节记录了“邻接表”模式的双表变体,该模式在 邻接表关系 中有记录。请务必查看小节 自引用查询策略配置自引用急切加载 中的自引用查询模式,它们同样适用于此处讨论的映射模式。

多对多关系可以通过 relationship.primaryjoinrelationship.secondaryjoin 之一或两者进行自定义 - 后者对于使用 relationship.secondary 参数指定多对多引用的关系非常重要。一个涉及使用 relationship.primaryjoinrelationship.secondaryjoin 的常见情况是在建立从一个类到自身的许多对多关系时,如下所示。

from typing import List

from sqlalchemy import Integer, ForeignKey, Column, Table
from sqlalchemy.orm import DeclarativeBase, Mapped
from sqlalchemy.orm import mapped_column, relationship


class Base(DeclarativeBase):
    pass


node_to_node = Table(
    "node_to_node",
    Base.metadata,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)


class Node(Base):
    __tablename__ = "node"
    id: Mapped[int] = mapped_column(primary_key=True)
    label: Mapped[str]
    right_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.left_node_id,
        secondaryjoin=id == node_to_node.c.right_node_id,
        back_populates="left_nodes",
    )
    left_nodes: Mapped[List["Node"]] = relationship(
        "Node",
        secondary=node_to_node,
        primaryjoin=id == node_to_node.c.right_node_id,
        secondaryjoin=id == node_to_node.c.left_node_id,
        back_populates="right_nodes",
    )

上面,SQLAlchemy 无法自动知道应该将哪些列连接到 right_nodesleft_nodes 关系的哪些列。 relationship.primaryjoinrelationship.secondaryjoin 参数建立了我们希望如何联接到关联表。在上面的声明式形式中,由于我们是在与 Node 类相对应的 Python 块内声明这些条件的,因此 id 变量直接可用,作为我们希望与其联接的 Column 对象。

或者,我们可以使用字符串定义 relationship.primaryjoinrelationship.secondaryjoin 参数,这在我们的配置中没有 Node.id 列对象可用或 node_to_node 表可能尚未可用时很合适。在声明式字符串中引用普通 Table 对象时,我们将使用该表在 MetaData 中的字符串名称。

class Node(Base):
    __tablename__ = "node"
    id = mapped_column(Integer, primary_key=True)
    label = mapped_column(String)
    right_nodes = relationship(
        "Node",
        secondary="node_to_node",
        primaryjoin="Node.id==node_to_node.c.left_node_id",
        secondaryjoin="Node.id==node_to_node.c.right_node_id",
        backref="left_nodes",
    )

警告

当作为可评估的 Python 字符串传递时,relationship.primaryjoinrelationship.secondaryjoin 参数将使用 Python 的 eval() 函数进行解释。不要将不可信的输入传递给这些字符串。有关声明式评估 relationship() 参数的详细信息,请参阅 关系参数的评估

这里一个经典的映射情况类似,其中 node_to_node 可以联接到 node.c.id

from sqlalchemy import Integer, ForeignKey, String, Column, Table, MetaData
from sqlalchemy.orm import relationship, registry

metadata_obj = MetaData()
mapper_registry = registry()

node_to_node = Table(
    "node_to_node",
    metadata_obj,
    Column("left_node_id", Integer, ForeignKey("node.id"), primary_key=True),
    Column("right_node_id", Integer, ForeignKey("node.id"), primary_key=True),
)

node = Table(
    "node",
    metadata_obj,
    Column("id", Integer, primary_key=True),
    Column("label", String),
)


class Node:
    pass


mapper_registry.map_imperatively(
    Node,
    node,
    properties={
        "right_nodes": relationship(
            Node,
            secondary=node_to_node,
            primaryjoin=node.c.id == node_to_node.c.left_node_id,
            secondaryjoin=node.c.id == node_to_node.c.right_node_id,
            backref="left_nodes",
        )
    },
)

请注意,在这两个示例中,relationship.backref 关键字指定一个 left_nodes 反向引用 - 当 relationship() 在反方向创建第二个关系时,它足够聪明,可以反转 relationship.primaryjoinrelationship.secondaryjoin 参数。

另见

复合“次要”联接

注意

本节介绍了 SQLAlchemy 在某种程度上支持的边缘案例,但是建议尽可能以更简单的方式解决这些问题,例如使用合理的关联布局和/或 Python 内属性

有时,当人们想要在两个表之间构建一个 relationship() 时,需要不止两个或三个表来参与联接。这是 relationship() 中一个需要突破限制的地方,通常这些奇特的用例的最终解决方案需要在 SQLAlchemy 邮件列表中进行讨论。

在较新版本的 SQLAlchemy 中,relationship.secondary 参数可以在某些情况下使用,以提供包含多个表的复合目标。下面是一个这样的联接条件示例(至少需要版本 0.9.2 才能正常工作)。

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))

    d = relationship(
        "D",
        secondary="join(B, D, B.d_id == D.id).join(C, C.d_id == D.id)",
        primaryjoin="and_(A.b_id == B.id, A.id == C.a_id)",
        secondaryjoin="D.id == B.d_id",
        uselist=False,
        viewonly=True,
    )


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)
    d_id = mapped_column(ForeignKey("d.id"))


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))
    d_id = mapped_column(ForeignKey("d.id"))


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)

在上面的例子中,我们提供了所有三个relationship.secondaryrelationship.primaryjoin,和relationship.secondaryjoin,在声明式风格中引用了命名表abcd。从AD的查询看起来像

sess.scalars(select(A).join(A.d)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN ( b AS b_1 JOIN d AS d_1 ON b_1.d_id = d_1.id JOIN c AS c_1 ON c_1.d_id = d_1.id) ON a.b_id = b_1.id AND a.id = c_1.a_id JOIN d ON d.id = b_1.d_id

在上面的例子中,我们利用了将多个表塞入“secondary”容器的能力,这样我们就可以跨多个表进行连接,同时仍然保持relationship()的“简单性”,因为在“左边”和“右边”都只有一个表;复杂性被保留在中间。

警告

像上面的关系通常被标记为viewonly=True,使用relationship.viewonly,并且应该被视为只读。虽然有时有一些方法可以使像上面这样的关系可写,但这通常很复杂且容易出错。

与别名类的关系

在上一节中,我们演示了一种技术,我们使用relationship.secondary将额外的表放置在连接条件中。有一种复杂的连接情况,即使这种技术也不够;当我们试图从A连接到B时,利用任意数量的CD等在它们之间,但是AB之间也存在直接的连接条件。在这种情况下,从AB的连接可能很难用一个复杂的relationship.primaryjoin条件来表达,因为中间表可能需要特殊处理,并且也不能用relationship.secondary对象来表达,因为A->secondary->B模式不支持AB之间的任何直接引用。当这种**极其高级**的情况出现时,我们可以求助于创建一个第二个映射作为关系的目标。这就是我们使用AliasedClass来创建一个包含我们需要的此连接的所有额外表的类的映射。为了将此映射器作为我们类的“替代”映射,我们使用aliased()函数生成新的构造,然后使用relationship()针对对象,就好像它是一个简单的映射类一样。

下面说明了relationship(),它有一个简单的连接,从AB,但是primaryjoin条件用两个额外的实体CD进行了增强,这两个实体也必须具有与AB中的行同时对齐的行

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


class B(Base):
    __tablename__ = "b"

    id = mapped_column(Integer, primary_key=True)


class C(Base):
    __tablename__ = "c"

    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))

    some_c_value = mapped_column(String)


class D(Base):
    __tablename__ = "d"

    id = mapped_column(Integer, primary_key=True)
    c_id = mapped_column(ForeignKey("c.id"))
    b_id = mapped_column(ForeignKey("b.id"))

    some_d_value = mapped_column(String)


# 1. set up the join() as a variable, so we can refer
# to it in the mapping multiple times.
j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)

# 2. Create an AliasedClass to B
B_viacd = aliased(B, j, flat=True)

A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

使用上面的映射,一个简单的连接看起来像

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) ON a.b_id = b.id

将别名类映射与类型集成并避免早期映射器配置

针对映射类创建aliased()构造会强制执行configure_mappers()步骤,该步骤将解析所有当前类及其关系。如果当前映射所需的无关映射类尚未声明,或者如果关系本身的配置需要访问尚未声明的类,这可能会出现问题。此外,当关系预先声明时,SQLAlchemy 的声明式模式与 Python 类型最有效地协同工作。

为了组织关系的构建以解决这些问题,可以使用像MapperEvents.before_mapper_configured()这样的配置级事件挂钩,该挂钩仅在所有映射都准备好进行配置时才会调用配置代码

from sqlalchemy import event


class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)
    b_id = mapped_column(ForeignKey("b.id"))


@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # do the above configuration in a configuration hook

    j = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, j, flat=True)
    A.b = relationship(B_viacd, primaryjoin=A.b_id == j.c.b_id)

在上面,函数_configure_ab_relationship()只会在请求完全配置的A版本时调用,此时类BDC将可用。

对于与内联类型集成的方案,可以使用类似的技术来有效地为别名类生成“单例”创建模式,该模式在该类作为全局变量延迟初始化时,可以在关系内联中使用

from typing import Any

B_viacd: Any = None
b_viacd_join: Any = None


class A(Base):
    __tablename__ = "a"

    id: Mapped[int] = mapped_column(primary_key=True)
    b_id: Mapped[int] = mapped_column(ForeignKey("b.id"))

    # 1. the relationship can be declared using lambdas, allowing it to resolve
    #    to targets that are late-configured
    b: Mapped[B] = relationship(
        lambda: B_viacd, primaryjoin=lambda: A.b_id == b_viacd_join.c.b_id
    )


# 2. configure the targets of the relationship using a before_mapper_configured
#    hook.
@event.listens_for(A, "before_mapper_configured")
def _configure_ab_relationship(mapper, cls):
    # 3. set up the join() and AliasedClass as globals from within
    #    the configuration hook.

    global B_viacd, b_viacd_join

    b_viacd_join = join(B, D, D.b_id == B.id).join(C, C.id == D.c_id)
    B_viacd = aliased(B, b_viacd_join, flat=True)

在查询中使用别名类目标

在前面的例子中,A.b关系引用B_viacd实体作为目标,而不是直接引用B类。要添加涉及A.b关系的额外标准,通常需要直接引用B_viacd,而不是使用B,尤其是在A.b的目标实体要转换为别名或子查询的情况下。下面说明了使用子查询而不是连接的相同关系

subq = select(B).join(D, D.b_id == B.id).join(C, C.id == D.c_id).subquery()

B_viacd_subquery = aliased(B, subq)

A.b = relationship(B_viacd_subquery, primaryjoin=A.b_id == subq.c.id)

使用上面A.b关系的查询将呈现一个子查询

sess.scalars(select(A).join(A.b)).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id

如果我们想基于A.b连接添加额外的标准,我们必须根据B_viacd_subquery而不是直接根据B进行操作

sess.scalars(
    select(A)
    .join(A.b)
    .where(B_viacd_subquery.some_b_column == "some b")
    .order_by(B_viacd_subquery.id)
).all()

SELECT a.id AS a_id, a.b_id AS a_b_id FROM a JOIN (SELECT b.id AS id, b.some_b_column AS some_b_column FROM b JOIN d ON d.b_id = b.id JOIN c ON c.id = d.c_id) AS anon_1 ON a.b_id = anon_1.id WHERE anon_1.some_b_column = ? ORDER BY anon_1.id

使用窗口函数的行限制关系

另一个有趣的关系到AliasedClass对象的用例是,关系需要连接到任何形式的专用SELECT的情况。一种情况是当需要使用窗口函数时,例如为了限制为关系返回的行数。下面的例子说明了一个非主映射器关系,它将为每个集合加载前十项

class A(Base):
    __tablename__ = "a"

    id = mapped_column(Integer, primary_key=True)


class B(Base):
    __tablename__ = "b"
    id = mapped_column(Integer, primary_key=True)
    a_id = mapped_column(ForeignKey("a.id"))


partition = select(
    B, func.row_number().over(order_by=B.id, partition_by=B.a_id).label("index")
).alias()

partitioned_b = aliased(B, partition)

A.partitioned_bs = relationship(
    partitioned_b, primaryjoin=and_(partitioned_b.a_id == A.id, partition.c.index < 10)
)

我们可以使用上面的partitioned_bs关系与大多数加载器策略一起使用,例如selectinload()

for a1 in session.scalars(select(A).options(selectinload(A.partitioned_bs))):
    print(a1.partitioned_bs)  # <-- will be no more than ten objects

在上面,“selectinload”查询看起来像

SELECT
    a_1.id AS a_1_id, anon_1.id AS anon_1_id, anon_1.a_id AS anon_1_a_id,
    anon_1.data AS anon_1_data, anon_1.index AS anon_1_index
FROM a AS a_1
JOIN (
    SELECT b.id AS id, b.a_id AS a_id, b.data AS data,
    row_number() OVER (PARTITION BY b.a_id ORDER BY b.id) AS index
    FROM b) AS anon_1
ON anon_1.a_id = a_1.id AND anon_1.index < %(index_1)s
WHERE a_1.id IN ( ... primary key collection ...)
ORDER BY a_1.id

在上面,对于“a”中的每个匹配主键,我们将获得前十个“bs”,按“b.id”排序。通过对“a_id”进行分区,我们确保每个“行号”都是局部于父“a_id”的。

这样的映射通常还包括从“A”到“B”的“普通”关系,用于持久化操作以及需要每个“A”的完整“B”对象集时。

构建查询启用属性

非常雄心勃勃的自定义连接条件可能无法直接持久化,在某些情况下甚至可能无法正确加载。要删除等式的持久化部分,请在relationship()上使用标志relationship.viewonly,这将它设置为只读属性(写入集合的数据在刷新时将被忽略)。但是,在极端情况下,请考虑使用常规 Python 属性与Query结合使用,如下所示

class User(Base):
    __tablename__ = "user"
    id = mapped_column(Integer, primary_key=True)

    @property
    def addresses(self):
        return object_session(self).query(Address).with_parent(self).filter(...).all()

在其他情况下,可以构建描述符以利用现有的 Python 内数据。有关特殊 Python 属性的更一般讨论,请参阅使用描述符和混合属性部分。

关于使用 viewonly 关系参数的说明

当应用于 relationship() 结构时,relationship.viewonly 参数表示此 relationship() 不会参与任何 ORM 工作单元 操作,并且该属性不希望参与其表示的集合在 Python 中的修改。这意味着,虽然 viewonly 关系可以引用可变的 Python 集合,如列表或集合,但对映射实例中存在的列表或集合进行更改将对 ORM 刷新过程 **没有影响**。

要探索这种情况,请考虑以下映射

from __future__ import annotations

import datetime

from sqlalchemy import and_
from sqlalchemy import ForeignKey
from sqlalchemy import func
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Mapped
from sqlalchemy.orm import mapped_column
from sqlalchemy.orm import relationship


class Base(DeclarativeBase):
    pass


class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship()

    current_week_tasks: Mapped[list[Task]] = relationship(
        primaryjoin=lambda: and_(
            User.id == Task.user_account_id,
            # this expression works on PostgreSQL but may not be supported
            # by other database engines
            Task.task_date >= func.now() - datetime.timedelta(days=7),
        ),
        viewonly=True,
    )


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="current_week_tasks")

以下部分将说明此配置的不同方面。

在 viewonly=True 时,包括反向引用在内的 Python 内部修改不适用

上面的映射将 User.current_week_tasks viewonly 关系作为 Task.user 属性的 反向引用 目标。这目前没有被 SQLAlchemy 的 ORM 配置过程标记,但这是一个配置错误。更改 Task 上的 .user 属性不会影响 .current_week_tasks 属性。

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[]

还有一个名为 relationship.sync_backrefs 的参数,可以在此处打开它以允许 .current_week_tasks 在这种情况下被修改,但是这不被认为是 viewonly 关系的最佳实践,viewonly 关系不应该依赖于 Python 内部的修改。

在此映射中,反向引用可以在 User.all_tasksTask.user 之间进行配置,因为它们都不是 viewonly 并且会正常同步。

除了反向引用修改对 viewonly 关系被禁用之外,对 Python 中 User.all_tasks 集合的普通更改也不会反映在 User.current_week_tasks 集合中,直到更改被刷新到数据库。

总的来说,对于自定义集合应该立即响应 Python 内部修改的用例,viewonly 关系通常不适合。更好的方法是使用 SQLAlchemy 的 混合属性 功能,或者对于仅实例的用例,使用 Python @property,其中可以实现根据当前 Python 实例生成的自定义集合。要将我们的示例更改为这种方式,我们将修复 Task.user 上的 relationship.back_populates 参数以引用 User.all_tasks,然后说明一个简单的 @property,它将根据直接的 User.all_tasks 集合提供结果

class User(Base):
    __tablename__ = "user_account"

    id: Mapped[int] = mapped_column(primary_key=True)
    name: Mapped[str | None]

    all_tasks: Mapped[list[Task]] = relationship(back_populates="user")

    @property
    def current_week_tasks(self) -> list[Task]:
        past_seven_days = datetime.datetime.now() - datetime.timedelta(days=7)
        return [t for t in self.all_tasks if t.task_date >= past_seven_days]


class Task(Base):
    __tablename__ = "task"

    id: Mapped[int] = mapped_column(primary_key=True)
    user_account_id: Mapped[int] = mapped_column(ForeignKey("user_account.id"))
    description: Mapped[str | None]
    task_date: Mapped[datetime.datetime] = mapped_column(server_default=func.now())

    user: Mapped[User] = relationship(back_populates="all_tasks")

使用每次动态计算的 Python 内部集合,我们始终可以保证得到正确的结果,而无需使用数据库。

>>> u1 = User()
>>> t1 = Task(task_date=datetime.datetime.now())
>>> t1.user = u1
>>> u1.current_week_tasks
[<__main__.Task object at 0x7f3d699523c0>]

viewonly=True 集合/属性直到过期才会重新查询

继续使用原始的 viewonly 属性,如果我们确实对 持久 对象上的 User.all_tasks 集合进行更改,viewonly 集合只有在发生 **两** 件事之后才能显示此更改的净结果。首先是 User.all_tasks 的更改被 刷新,以便新数据在数据库中可用,至少在本地事务范围内是可用的。第二是 User.current_week_tasks 属性已 过期 并且通过对数据库的新 SQL 查询重新加载。

为了支持此要求,最简单的流程是使用 **viewonly 关系仅在主要用于只读操作的操作中被使用**。例如,如下所示,如果我们从数据库中检索一个 User,集合将是最新的

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b906b0>]

当我们对 u1.all_tasks 进行修改时,如果我们希望在 u1.current_week_tasks viewonly 关系中看到这些更改的反映,这些更改需要被刷新,并且 u1.current_week_tasks 属性需要被过期,以便它在下次访问时会进行 延迟加载。最简单的方法是使用 Session.commit(),保持 Session.expire_on_commit 参数设置为默认值 True

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.commit()
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f8711b90ec0>, <__main__.Task object at 0x7f8711b90a10>]

在上面,对 Session.commit() 的调用将 u1.all_tasks 的更改刷新到数据库,然后过期所有对象,因此当我们访问 u1.current_week_tasks 时,会发生 :term:`延迟加载`,它会从数据库中重新获取此属性的内容。

要拦截操作而不实际提交事务,属性需要先被明确地 过期。一个简单的方法是直接调用它。在下面的示例中,Session.flush() 将待处理的更改发送到数据库,然后 Session.expire() 用于过期 u1.current_week_tasks 集合,以便它在下次访问时重新获取。

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.flush()
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

我们实际上可以跳过对 Session.flush() 的调用,假设 Session 保持 Session.autoflush 处于其默认值 True,因为过期的 current_week_tasks 属性将在过期后被访问时触发自动刷新。

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     sess.expire(u1, ["current_week_tasks"])
...     print(u1.current_week_tasks)  # triggers autoflush before querying
[<__main__.Task object at 0x7fd95a4c8c50>, <__main__.Task object at 0x7fd95a4c8c80>]

继续使用上述方法进行更复杂的处理,我们可以使用 事件钩子,在相关 User.all_tasks 集合发生更改时以编程方式应用过期。这是一种 **高级技术**,首先应该检查更简单的架构,比如 @property 或坚持使用只读用例。在我们的简单示例中,将配置为

from sqlalchemy import event, inspect


@event.listens_for(User.all_tasks, "append")
@event.listens_for(User.all_tasks, "remove")
@event.listens_for(User.all_tasks, "bulk_replace")
def _expire_User_current_week_tasks(target, value, initiator):
    inspect(target).session.expire(target, ["current_week_tasks"])

使用上述钩子,修改操作会被拦截,并导致 User.current_week_tasks 集合自动过期。

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     u1.all_tasks.append(Task(task_date=datetime.datetime.now()))
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f66d093ccb0>, <__main__.Task object at 0x7f66d093cce0>]

上面使用的 AttributeEvents 事件钩子也会被反向引用修改触发,因此,使用上述钩子,对 Task.user 的更改也会被拦截。

>>> with Session(e) as sess:
...     u1 = sess.scalar(select(User).where(User.id == 1))
...     t1 = Task(task_date=datetime.datetime.now())
...     t1.user = u1
...     sess.add(t1)
...     print(u1.current_week_tasks)
[<__main__.Task object at 0x7f3b0c070d10>, <__main__.Task object at 0x7f3b0c057d10>]