其他持久化技术

将 SQL 插入/更新表达式嵌入到 Flush 中

此功能允许将数据库列的值设置为 SQL 表达式而不是字面值。它对于原子更新、调用存储过程等特别有用。您只需将表达式分配给属性

class SomeClass(Base):
    __tablename__ = "some_table"

    # ...

    value = mapped_column(Integer)


someobject = session.get(SomeClass, 5)

# set 'value' attribute to a SQL expression adding one
someobject.value = SomeClass.value + 1

# issues "UPDATE some_table SET value=value+1"
session.commit()

此技术适用于 INSERT 和 UPDATE 语句。在 flush/commit 操作之后,上面 someobject 上的 value 属性将过期,以便下次访问时,将从数据库加载新生成的值。

该功能还具有条件支持,可以与主键列结合使用。对于具有 RETURNING 支持的后端(包括 Oracle 数据库、SQL Server、MariaDB 10.5、SQLite 3.35),SQL 表达式也可以分配给主键列。这允许评估 SQL 表达式,并允许服务器端触发器修改 INSERT 上的主键值,以便 ORM 成功检索为主键的一部分

class Foo(Base):
    __tablename__ = "foo"
    pk = mapped_column(Integer, primary_key=True)
    bar = mapped_column(Integer)


e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
Base.metadata.create_all(e)

session = Session(e)

foo = Foo(pk=sql.select(sql.func.coalesce(sql.func.max(Foo.pk) + 1, 1)))
session.add(foo)
session.commit()

在 PostgreSQL 上,上面的 Session 将发出以下 INSERT

INSERT INTO foo (foopk, bar) VALUES
((SELECT coalesce(max(foo.foopk) + %(max_1)s, %(coalesce_2)s) AS coalesce_1
FROM foo), %(bar)s) RETURNING foo.foopk

1.3 版本新增: SQL 表达式现在可以在 ORM flush 期间传递到主键列;如果数据库支持 RETURNING,或者如果正在使用 pysqlite,则 ORM 将能够检索服务器生成的值作为主键属性的值。

将 SQL 表达式与 Session 一起使用

SQL 表达式和字符串可以通过 Session 在其事务上下文中执行。这最容易通过 Session.execute() 方法完成,该方法返回一个 CursorResult,其方式与 EngineConnection 相同

Session = sessionmaker(bind=engine)
session = Session()

# execute a string statement
result = session.execute(text("select * from table where id=:id"), {"id": 7})

# execute a SQL expression construct
result = session.execute(select(mytable).where(mytable.c.id == 7))

当前由 Connection 持有的 Session 可以使用 Session.connection() 方法访问

connection = session.connection()

上面的示例处理绑定到单个 SessionEngineConnectionSession。要使用绑定到多个引擎或根本没有绑定的引擎(即依赖于绑定的元数据)的 Session 执行语句,Session.execute()Session.connection() 都接受绑定参数字典 Session.execute.bind_arguments,其中可能包括 “mapper”,它传递一个映射类或 Mapper 实例,用于查找所需引擎的正确上下文

Session = sessionmaker()
session = Session()

# need to specify mapper or class when executing
result = session.execute(
    text("select * from table where id=:id"),
    {"id": 7},
    bind_arguments={"mapper": MyMappedClass},
)

result = session.execute(
    select(mytable).where(mytable.c.id == 7), bind_arguments={"mapper": MyMappedClass}
)

connection = session.connection(MyMappedClass)

1.4 版本更改: Session.execute()mapperclause 参数现在作为字典的一部分传递,作为 Session.execute.bind_arguments 参数发送。以前的参数仍然被接受,但是这种用法已被弃用。

强制将默认值的列设置为 NULL

ORM 将对象上从未设置的任何属性都视为“默认”情况;该属性将从 INSERT 语句中省略

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True)


obj = MyObject(id=1)
session.add(obj)
session.commit()  # INSERT with the 'data' column omitted; the database
# itself will persist this as the NULL value

从 INSERT 中省略列意味着该列将被设置为 NULL 值,除非该列已设置默认值,在这种情况下,默认值将被持久化。这在纯 SQL 角度(使用服务器端默认值)以及 SQLAlchemy 的插入行为(使用客户端和服务器端默认值)中都成立

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True, server_default="default")


obj = MyObject(id=1)
session.add(obj)
session.commit()  # INSERT with the 'data' column omitted; the database
# itself will persist this as the value 'default'

但是,在 ORM 中,即使将 Python 值 None 显式分配给对象,其处理方式也与从未分配值相同

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True, server_default="default")


obj = MyObject(id=1, data=None)
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set to None;
# the ORM still omits it from the statement and the
# database will still persist this as the value 'default'

即使传递了 None,上述操作也会将服务器默认值 "default" 而不是 SQL NULL 持久化到 data 列中;这是 ORM 的长期行为,许多应用程序都将其作为假设。

那么,即使列具有默认值,如果我们想实际将 NULL 放入此列中怎么办?有两种方法。一种是在每个实例级别,我们使用 null SQL 构造分配属性

from sqlalchemy import null

obj = MyObject(id=1, data=null())
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set as null();
# the ORM uses this directly, bypassing all client-
# and server-side defaults, and the database will
# persist this as the NULL value

null SQL 构造始终转换为直接存在于目标 INSERT 语句中的 SQL NULL 值。

如果我们希望能够使用 Python 值 None,并且即使存在列默认值,也将其持久化为 NULL,我们可以使用 Core 级修饰符 TypeEngine.evaluates_none() 为 ORM 配置此功能,该修饰符指示 ORM 应将值 None 视为与任何其他值相同,并将其传递,而不是将其作为“缺失”值省略的类型

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(
        String(50).evaluates_none(),  # indicate that None should always be passed
        nullable=True,
        server_default="default",
    )


obj = MyObject(id=1, data=None)
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set to None;
# the ORM uses this directly, bypassing all client-
# and server-side defaults, and the database will
# persist this as the NULL value

获取服务器生成的默认值

正如 服务器调用的 DDL 显式默认表达式标记隐式生成的值、时间戳和触发器列 章节中介绍的那样,Core 支持数据库列的概念,数据库本身会在 INSERT 语句上生成值,而在不太常见的情况下会在 UPDATE 语句上生成值。ORM 功能支持此类列,能够获取 flush 时新生成的值。在主键列由服务器生成的情况下,此行为是必需的,因为 ORM 必须知道对象持久化后的主键。

在绝大多数情况下,其值由数据库自动生成的主键列是简单的整数列,这些列由数据库实现为所谓的“自增”列,或来自与该列关联的序列。SQLAlchemy Core 中的每个数据库方言都支持一种检索这些主键值的方法,该方法通常是 Python DBAPI 原生的,并且通常此过程是自动的。有关此的更多文档,请参见 Column.autoincrement

对于非主键列或非简单自增整数列的服务器生成列,ORM 要求使用适当的 server_default 指令标记这些列,以便 ORM 可以检索此值。并非所有方法都受所有后端支持,因此必须注意使用适当的方法。要回答的两个问题是,1. 此列是否是主键的一部分,以及 2. 数据库是否支持 RETURNING 或等效项,例如 “OUTPUT inserted”;这些是 SQL 短语,在调用 INSERT 或 UPDATE 语句的同时返回服务器生成的值。PostgreSQL、Oracle 数据库、MariaDB 10.5、SQLite 3.35 和 SQL Server 当前支持 RETURNING。

情况 1:非主键,支持 RETURNING 或等效项

在这种情况下,列应标记为 FetchedValue 或显式的 Column.server_default。当执行 INSERT 语句时,ORM 将自动将这些列添加到 RETURNING 子句中,假设 Mapper.eager_defaults 参数设置为 True,或者如果保留其默认设置 "auto",则对于支持 RETURNING 和 insertmanyvalues 的方言

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)

    # server-side SQL date function generates a new timestamp
    timestamp = mapped_column(DateTime(), server_default=func.now())

    # some other server-side function not named here, such as a trigger,
    # populates a value into this column during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

    # set eager defaults to True.  This is usually optional, as if the
    # backend supports RETURNING + insertmanyvalues, eager defaults
    # will take place regardless on INSERT
    __mapper_args__ = {"eager_defaults": True}

在上面,一个不从客户端指定 “timestamp” 或 “special_identifier” 显式值的 INSERT 语句将在 RETURNING 子句中包含 “timestamp” 和 “special_identifier” 列,以便它们可以立即使用。在 PostgreSQL 数据库上,上面表的 INSERT 将如下所示

INSERT INTO my_table DEFAULT VALUES RETURNING my_table.id, my_table.timestamp, my_table.special_identifier

2.0.0rc1 版本更改: Mapper.eager_defaults 参数现在默认为新设置 "auto",如果后备数据库同时支持 RETURNING 和 insertmanyvalues,则会自动使用 RETURNING 来获取 INSERT 上服务器生成的默认值。

注意

Mapper.eager_defaults"auto" 值仅适用于 INSERT 语句。即使 Mapper.eager_defaults 设置为 True,UPDATE 语句也不会使用 RETURNING。这是因为 UPDATE 没有等效的 “insertmanyvalues” 功能,因此 UPDATE RETURNING 将要求为正在 UPDATEd 的每一行单独发出 UPDATE 语句。

情况 2:表包含与 RETURNING 不兼容的触发器生成的值

Mapper.eager_defaults"auto" 设置意味着支持 RETURNING 的后端通常会在 INSERT 语句中使用 RETURNING,以便检索新生成的默认值。但是,使用触发器生成的服务器生成值存在限制,因此无法使用 RETURNING

  • SQL Server 不允许在 INSERT 语句中使用 RETURNING 来检索触发器生成的值;该语句将失败。

  • SQLite 在将 RETURNING 与触发器结合使用方面存在限制,因此 RETURNING 子句将不具有可用的 INSERTed 值

  • 其他后端可能在 RETURNING 与触发器或其他类型的服务器生成值结合使用方面存在限制。

要禁用对此类值使用 RETURNING,不仅包括服务器生成的默认值,还包括确保 ORM 永远不会对特定表使用 RETURNING,请为映射的 Table 指定 Table.implicit_returningFalse。使用声明式映射,如下所示

class MyModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

    # assume a database trigger populates a value into this column
    # during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

    # disable all use of RETURNING for the table
    __table_args__ = {"implicit_returning": False}

在使用 pyodbc 驱动程序的 SQL Server 上,上面表的 INSERT 将不会使用 RETURNING,而是使用 SQL Server scope_identity() 函数来检索新生成的主键值

INSERT INTO my_table (data) VALUES (?); select scope_identity()

另请参阅

INSERT 行为 - 关于 SQL Server 方言的获取新生成的主键值的方法的背景信息

情况 3:非主键,不支持或不需要 RETURNING 或等效项

这种情况与上面的情况 1 相同,除了我们通常不希望使用 Mapper.eager_defaults,因为其当前在缺少 RETURNING 支持的情况下的实现是发出每个行的 SELECT,这性能不高。因此,在下面的映射中省略了该参数

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)
    timestamp = mapped_column(DateTime(), server_default=func.now())

    # assume a database trigger populates a value into this column
    # during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

在不支持 RETURNING 或 “insertmanyvalues” 的后端上 INSERT 具有上述映射的记录后,“timestamp” 和 “special_identifier” 列将保持为空,并且将在 flush 后首次访问时通过第二个 SELECT 语句获取它们,例如,它们被标记为 “expired”。

如果显式提供了 Mapper.eager_defaults 值为 True,并且后端数据库不支持 RETURNING 或等效项,则 ORM 将在 INSERT 语句之后立即发出 SELECT 语句,以便获取新生成的值;如果 RETURNING 不可用,则 ORM 当前无法批量 SELECT 多个新插入的行。这通常是不希望的,因为它会向 flush 过程添加可能不需要的额外 SELECT 语句。使用上述映射,并将 Mapper.eager_defaults 标志设置为 True 对 MySQL(非 MariaDB)进行操作,会导致 flush 时产生如下 SQL

INSERT INTO my_table () VALUES ()

-- when eager_defaults **is** used, but RETURNING is not supported
SELECT my_table.timestamp AS my_table_timestamp, my_table.special_identifier AS my_table_special_identifier
FROM my_table WHERE my_table.id = %s

未来版本的 SQLAlchemy 可能会寻求提高在缺少 RETURNING 的情况下急切默认值的效率,以便在单个 SELECT 语句中批量处理多行。

情况 4:主键,支持 RETURNING 或等效项

具有服务器生成值的主键列必须在 INSERT 后立即获取;ORM 只能访问其具有主键值的行,因此如果主键由服务器生成,则 ORM 需要一种在 INSERT 后立即检索该新值的方法。

如上所述,对于整数 “自增” 列,以及标记为 Identity 和特殊构造(例如 PostgreSQL SERIAL)的列,Core 会自动处理这些类型;数据库包括在不支持 RETURNING 的情况下获取 “last inserted id” 的函数,并且在支持 RETURNING 的情况下,SQLAlchemy 将使用它。

例如,将 Oracle 数据库与标记为 Identity 的列一起使用时,会自动使用 RETURNING 来获取新的主键值

class MyOracleModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(Identity(), primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

在 Oracle 数据库上,上述模型的 INSERT 如下所示

INSERT INTO my_table (data) VALUES (:data) RETURNING my_table.id INTO :ret_0

SQLAlchemy 为 “data” 字段呈现 INSERT,但仅在 RETURNING 子句中包含 “id”,以便对 “id” 进行服务器端生成,并且立即返回新值。

对于由服务器端函数或触发器生成的非整数值,以及来自表本身之外的构造(包括显式序列和触发器)的整数值,必须在表元数据中标记服务器默认值生成。再次以 Oracle 数据库为例,我们可以说明一个类似的表,该表使用 Sequence 构造命名显式序列

class MyOracleModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(Sequence("my_oracle_seq"), primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

Oracle 数据库上此版本的模型的 INSERT 将如下所示

INSERT INTO my_table (id, data) VALUES (my_oracle_seq.nextval, :data) RETURNING my_table.id INTO :ret_0

在上面,SQLAlchemy 为主键列呈现 my_sequence.nextval,以便将其用于新的主键生成,并使用 RETURNING 立即取回新值。

如果数据源不由简单的 SQL 函数或 Sequence 表示,例如在使用触发器或生成新值的数据库特定数据类型时,值生成默认值的存在可以通过在列定义中使用 FetchedValue 来指示。以下模型使用 SQL Server TIMESTAMP 列作为主键;在 SQL Server 上,此数据类型会自动生成新值,因此这在表元数据中通过指示 FetchedValue 作为 Column.server_default 参数来指示

class MySQLServerModel(Base):
    __tablename__ = "my_table"

    timestamp: Mapped[datetime.datetime] = mapped_column(
        TIMESTAMP(), server_default=FetchedValue(), primary_key=True
    )
    data: Mapped[str] = mapped_column(String(50))

SQL Server 上上述表的 INSERT 如下所示

INSERT INTO my_table (data) OUTPUT inserted.timestamp VALUES (?)

情况 5:主键,不支持 RETURNING 或等效项

在此区域中,我们正在为 MySQL 等数据库生成行,其中服务器上正在发生某种生成默认值的方式,但这超出了数据库通常的自增例程。在这种情况下,我们必须确保 SQLAlchemy 可以“预执行”默认值,这意味着它必须是显式的 SQL 表达式。

注意

本节将说明涉及 MySQL 日期时间值的多个配方,因为此后端上的日期时间数据类型具有额外的特殊要求,这些要求有助于说明。但是请记住,MySQL 要求为任何用作主键的自动生成数据类型使用显式的 “预执行” 默认生成器,而不是通常的单列自增整数值。

MySQL 与 DateTime 主键

以 MySQL 的 DateTime 列为例,我们使用 “NOW()” SQL 函数添加显式的预执行支持的默认值

class MyModel(Base):
    __tablename__ = "my_table"

    timestamp = mapped_column(DateTime(), default=func.now(), primary_key=True)

在上面,我们选择 “NOW()” 函数来为列传递日期时间值。上面生成的 SQL 是

SELECT now() AS anon_1
INSERT INTO my_table (timestamp) VALUES (%s)
('2018-08-09 13:08:46',)

MySQL 与 TIMESTAMP 主键

当将 TIMESTAMP 数据类型与 MySQL 一起使用时,MySQL 通常会自动将服务器端默认值与此数据类型关联。但是,当我们将其用作主键时,除非我们自己执行该函数,否则 Core 无法检索新生成的值。由于 MySQL 上的 TIMESTAMP 实际上存储的是二进制值,因此我们需要在 “NOW()” 的用法中添加额外的 “CAST”,以便我们检索可以持久化到列中的二进制值

from sqlalchemy import cast, Binary


class MyModel(Base):
    __tablename__ = "my_table"

    timestamp = mapped_column(
        TIMESTAMP(), default=cast(func.now(), Binary), primary_key=True
    )

在上面,除了选择 “NOW()” 函数之外,我们还结合 cast() 使用 Binary 数据类型,以便返回的值是二进制的。从上面的 INSERT 中呈现的 SQL 如下所示

SELECT CAST(now() AS BINARY) AS anon_1
INSERT INTO my_table (timestamp) VALUES (%s)
(b'2018-08-09 13:08:46',)

关于急切获取客户端调用的 SQL 表达式用于 INSERT 或 UPDATE 的说明

前面的示例指示使用 Column.server_default 创建在其 DDL 中包含默认值生成函数的表。

SQLAlchemy 还支持非 DDL 服务器端默认值,如 客户端调用的 SQL 表达式 中所述;这些 “客户端调用的 SQL 表达式” 使用 Column.defaultColumn.onupdate 参数设置。

这些 SQL 表达式目前在 ORM 中受到与真实服务器端默认值相同的限制;除非 FetchedValue 指令与 Column 关联,否则即使 Mapper.eager_defaults 设置为 "auto"True,它们也不会使用 RETURNING 急切获取,即使这些表达式不是 DDL 服务器默认值,并且是由 SQLAlchemy 本身主动呈现的。此限制可能会在未来的 SQLAlchemy 版本中得到解决。

FetchedValue 构造可以应用于 Column.server_defaultColumn.server_onupdate,同时将 SQL 表达式与 Column.defaultColumn.onupdate 一起使用,例如在下面的示例中,func.now() 构造用作 Column.defaultColumn.onupdate 的客户端调用 SQL 表达式。为了使 Mapper.eager_defaults 的行为包括在使用 RETURNING 时获取这些值,Column.server_defaultColumn.server_onupdateFetchedValue 一起使用,以确保发生获取

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)

    created = mapped_column(
        DateTime(), default=func.now(), server_default=FetchedValue()
    )
    updated = mapped_column(
        DateTime(),
        onupdate=func.now(),
        server_default=FetchedValue(),
        server_onupdate=FetchedValue(),
    )

    __mapper_args__ = {"eager_defaults": True}

使用类似于上述的映射,ORM 为 INSERT 和 UPDATE 呈现的 SQL 将在 RETURNING 子句中包含 createdupdated

INSERT INTO my_table (created) VALUES (now()) RETURNING my_table.id, my_table.created, my_table.updated

UPDATE my_table SET updated=now() WHERE my_table.id = %(my_table_id)s RETURNING my_table.updated

使用 INSERT、UPDATE 和 ON CONFLICT(即 upsert)返回 ORM 对象

SQLAlchemy 2.0 包含增强的功能,用于发出多种类型的支持 ORM 的 INSERT、UPDATE 和 upsert 语句。有关文档,请参阅 支持 ORM 的 INSERT、UPDATE 和 DELETE 语句 文档。有关 upsert,请参阅 ORM “upsert” 语句

使用 PostgreSQL ON CONFLICT 和 RETURNING 返回 upsert 后的 ORM 对象

本节已移至 ORM “upsert” 语句

分区策略(例如,每个 Session 多个数据库后端)

简单的垂直分区

垂直分区通过配置 SessionSession.binds 参数,将不同的类、类层次结构或映射表放置在多个数据库中。此参数接收一个字典,其中包含 ORM 映射的类、映射层次结构中的任意类(例如声明性基类或 mixin)、 Table 对象和 Mapper 对象的任意组合作为键,这些键通常指向 Engine 对象,或者较少情况下指向 Connection 对象作为目标。当 Session 需要代表特定类型的映射类发出 SQL 以查找适当的数据库连接源时,将查询此字典

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker()

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

如上所述,针对任何一个类的 SQL 操作都将使用链接到该类的 Engine。此功能全面覆盖读取和写入操作;针对映射到 engine1 的实体(通过查看请求项列表中的第一个实体来确定)的 Query 将使用 engine1 来运行查询。flush 操作将在每个类的基础上使用两个引擎,因为它会 flush 类型为 UserAccount 的对象。

在更常见的情况下,通常存在基类或 mixin 类,可用于区分目标为不同数据库连接的操作。Session.binds 参数可以接受任何任意 Python 类作为键,如果在特定映射类的 __mro__ (Python 方法解析顺序) 中找到该键,则将使用该键。假设两个声明性基类代表两个不同的数据库连接

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Session


class BaseA(DeclarativeBase):
    pass


class BaseB(DeclarativeBase):
    pass


class User(BaseA): ...


class Address(BaseA): ...


class GameInfo(BaseB): ...


class GameStats(BaseB): ...


Session = sessionmaker()

# all User/Address operations will be on engine 1, all
# Game operations will be on engine 2
Session.configure(binds={BaseA: engine1, BaseB: engine2})

如上所述,从 BaseABaseB 继承的类,其 SQL 操作将根据它们继承自哪个超类(如果有)路由到两个引擎之一。如果一个类从多个“绑定”超类继承,则将选择目标类层次结构中最高的超类来表示应使用哪个引擎。

另请参阅

Session.binds

多引擎 Session 的事务协调

使用多个绑定引擎的一个注意事项是,在提交操作在一个后端成功后,可能会在另一个后端失败的情况下。这是一个不一致性问题,在关系数据库中,通过使用“两阶段事务”来解决,这在提交序列中增加了一个额外的“准备”步骤,允许多个数据库在实际完成事务之前同意提交。

由于 DBAPI 中的支持有限,SQLAlchemy 对跨后端的两阶段事务的支持也有限。最常见的情况是,它已知与 PostgreSQL 后端配合良好,并在较小程度上与 MySQL 后端配合良好。但是,当后端支持两阶段事务功能时,Session 完全能够利用该功能,方法是在 sessionmakerSession 中设置 Session.use_twophase 标志。有关示例,请参阅 启用两阶段提交

自定义垂直分区

可以通过覆盖 Session.get_bind() 方法来构建更全面的基于规则的类级别分区。下面我们演示一个自定义的 Session,它提供以下规则

  1. Flush 操作以及批量“update”和“delete”操作,都将传递到名为 leader 的引擎。

  2. 对子类化 MyOtherClass 的对象执行的所有操作都在 other 引擎上进行。

  3. 所有其他类的读取操作都在 follower1follower2 数据库的随机选择中进行。

engines = {
    "leader": create_engine("sqlite:///leader.db"),
    "other": create_engine("sqlite:///other.db"),
    "follower1": create_engine("sqlite:///follower1.db"),
    "follower2": create_engine("sqlite:///follower2.db"),
}

from sqlalchemy.sql import Update, Delete
from sqlalchemy.orm import Session, sessionmaker
import random


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"]
        elif self._flushing or isinstance(clause, (Update, Delete)):
            # NOTE: this is for example, however in practice reader/writer
            # splits are likely more straightforward by using two distinct
            # Sessions at the top of a "reader" or "writer" operation.
            # See note below
            return engines["leader"]
        else:
            return engines[random.choice(["follower1", "follower2"])]

上面的 Session 类是使用 class_ 参数插入到 sessionmaker 中的

Session = sessionmaker(class_=RoutingSession)

这种方法可以与多个 MetaData 对象结合使用,例如使用声明性 __abstract__ 关键字的方法,如 __abstract__ 中所述。

注意

虽然上面的示例说明了如何根据语句是否期望写入数据将特定 SQL 语句路由到所谓的“leader”或“follower”数据库,但这可能不是一种实用的方法,因为它会导致同一操作中读取和写入之间出现不协调的事务行为。在实践中,最好根据正在进行的整体操作/事务,预先将 Session 构建为“reader”或“writer”会话。这样,将写入数据的操作也将在同一事务范围内发出其读取查询。有关如何设置一个 sessionmaker 用于使用自动提交连接的“只读”操作,以及另一个用于将包含 DML/COMMIT 的“写入”操作的配方,请参阅 为 Sessionmaker/Engine 范围设置隔离级别 中的示例。

另请参阅

SQLAlchemy 中的 Django 风格数据库路由器 - 关于 Session.get_bind() 更全面示例的博客文章

水平分区

水平分区将单个表(或一组表)的行跨多个数据库进行分区。SQLAlchemy Session 包含对此概念的支持,但是要完全使用它,需要使用 SessionQuery 子类。这些子类的基本版本在 水平分片 ORM 扩展中可用。使用示例位于:水平分片

批量操作

遗留功能

SQLAlchemy 2.0 已将 Session 的“批量插入”和“批量更新”功能集成到 2.0 风格的 Session.execute() 方法中,直接使用 InsertUpdate 构造。有关文档,请参阅 支持 ORM 的 INSERT、UPDATE 和 DELETE 语句 文档,包括 遗留 Session 批量 INSERT 方法,其中说明了从旧方法迁移到新方法的过程。