附加持久化技术

将 SQL 插入/更新表达式嵌入到 Flush 中

此功能允许将数据库列的值设置为 SQL 表达式,而不是字面值。它对于原子更新、调用存储过程等特别有用。您只需将表达式分配给属性即可

class SomeClass(Base):
    __tablename__ = "some_table"

    # ...

    value = mapped_column(Integer)


someobject = session.get(SomeClass, 5)

# set 'value' attribute to a SQL expression adding one
someobject.value = SomeClass.value + 1

# issues "UPDATE some_table SET value=value+1"
session.commit()

此技术适用于 INSERT 和 UPDATE 语句。在 flush/commit 操作之后,上述 someobject 上的 value 属性会失效,因此下次访问时会从数据库加载新生成的值。

该功能还具有条件支持,以便与主键列结合使用。对于支持 RETURNING 的后端(包括 Oracle、SQL Server、MariaDB 10.5、SQLite 3.35),也可以将 SQL 表达式分配给主键列。这允许评估 SQL 表达式,并允许 ORM 在 INSERT 时成功检索修改主键值的任何服务器端触发器,作为对象主键的一部分

class Foo(Base):
    __tablename__ = "foo"
    pk = mapped_column(Integer, primary_key=True)
    bar = mapped_column(Integer)


e = create_engine("postgresql+psycopg2://scott:tiger@localhost/test", echo=True)
Base.metadata.create_all(e)

session = Session(e)

foo = Foo(pk=sql.select(sql.func.coalesce(sql.func.max(Foo.pk) + 1, 1)))
session.add(foo)
session.commit()

在 PostgreSQL 上,上述 Session 将发出以下 INSERT

INSERT INTO foo (foopk, bar) VALUES
((SELECT coalesce(max(foo.foopk) + %(max_1)s, %(coalesce_2)s) AS coalesce_1
FROM foo), %(bar)s) RETURNING foo.foopk

新功能在版本 1.3 中添加: 现在可以在 ORM flush 期间将 SQL 表达式传递给主键列;如果数据库支持 RETURNING,或者如果使用的是 pysqlite,ORM 将能够检索服务器生成的作为主键属性的值。

在 Session 中使用 SQL 表达式

可以通过 Session 在其事务上下文中执行 SQL 表达式和字符串。这最容易使用 Session.execute() 方法完成,该方法返回 CursorResult,与 EngineConnection 相同

Session = sessionmaker(bind=engine)
session = Session()

# execute a string statement
result = session.execute(text("select * from table where id=:id"), {"id": 7})

# execute a SQL expression construct
result = session.execute(select(mytable).where(mytable.c.id == 7))

可以使用 Session.connection() 方法访问 Session 持有的当前 Connection

connection = session.connection()

上面的示例处理绑定到单个 EngineConnectionSession。要使用绑定到多个引擎或根本没有绑定(即依赖于绑定元数据)的 Session 执行语句,Session.execute()Session.connection() 都接受绑定参数字典 Session.execute.bind_arguments,其中可能包括 “mapper”,它传递映射类或 Mapper 实例,用于定位所需引擎的正确上下文

Session = sessionmaker()
session = Session()

# need to specify mapper or class when executing
result = session.execute(
    text("select * from table where id=:id"),
    {"id": 7},
    bind_arguments={"mapper": MyMappedClass},
)

result = session.execute(
    select(mytable).where(mytable.c.id == 7), bind_arguments={"mapper": MyMappedClass}
)

connection = session.connection(MyMappedClass)

版本 1.4 中的更改: 现在将 mapperclause 参数传递给 Session.execute() 作为发送到 Session.execute.bind_arguments 参数的字典的一部分。仍然接受以前的参数,但此用法已弃用。

强制将具有默认值的列设置为 NULL

ORM 将任何从未在对象上设置的属性视为 “默认” 案例;该属性将从 INSERT 语句中省略

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True)


obj = MyObject(id=1)
session.add(obj)
session.commit()  # INSERT with the 'data' column omitted; the database
# itself will persist this as the NULL value

从 INSERT 中省略列意味着该列将设置为 NULL 值,除非该列设置了默认值,在这种情况下,将持久化默认值。这不仅适用于服务器端默认值的纯 SQL 角度,而且适用于 SQLAlchemy 的插入行为,包括客户端和服务器端默认值

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True, server_default="default")


obj = MyObject(id=1)
session.add(obj)
session.commit()  # INSERT with the 'data' column omitted; the database
# itself will persist this as the value 'default'

但是,在 ORM 中,即使显式将 Python 值 None 分配给对象,这也与从未分配值的情况相同

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(String(50), nullable=True, server_default="default")


obj = MyObject(id=1, data=None)
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set to None;
# the ORM still omits it from the statement and the
# database will still persist this as the value 'default'

上述操作将持久化到 data 列中,值为服务器默认值 "default",而不是 SQL NULL,即使传递了 None;这是 ORM 的长期行为,许多应用程序都假设了这一行为。

那么,如果我们想真正将 NULL 放入此列,即使该列有默认值呢?有两种方法。一种是在每个实例级别上,我们使用 null SQL 结构分配属性

from sqlalchemy import null

obj = MyObject(id=1, data=null())
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set as null();
# the ORM uses this directly, bypassing all client-
# and server-side defaults, and the database will
# persist this as the NULL value

null SQL 结构始终转换为 SQL NULL 值直接出现在目标 INSERT 语句中。

如果我们希望能够使用 Python 值 None,并且即使存在列默认值,它也被持久化为 NULL,我们可以使用核心级修饰符 TypeEngine.evaluates_none() 为 ORM 配置此功能,该修饰符表示 ORM 应该将值 None 与任何其他值相同,并将其传递,而不是将其省略为 “丢失” 值

class MyObject(Base):
    __tablename__ = "my_table"
    id = mapped_column(Integer, primary_key=True)
    data = mapped_column(
        String(50).evaluates_none(),  # indicate that None should always be passed
        nullable=True,
        server_default="default",
    )


obj = MyObject(id=1, data=None)
session.add(obj)
session.commit()  # INSERT with the 'data' column explicitly set to None;
# the ORM uses this directly, bypassing all client-
# and server-side defaults, and the database will
# persist this as the NULL value

获取服务器生成的默认值

服务器调用 DDL - 显式默认表达式标记隐式生成的值、时间戳和触发列 部分介绍,Core 支持数据库列的概念,对于这些列,数据库本身会在 INSERT 操作时生成值,在较不常见的情况下也会在 UPDATE 语句中生成值。ORM 特性支持此类列,以便在 flush 时能够获取这些新生成的值。在由服务器生成的、作为主键的列的情况下,需要此行为,因为 ORM 必须在对象持久化后了解该对象的主键。

在绝大多数情况下,值由数据库自动生成的、作为主键的列,都是简单的整数列,数据库将这些列实现为所谓的“自动递增”列,或者来自与该列关联的序列。SQLAlchemy Core 中的每个数据库方言都支持一种方法来检索这些主键值,该方法通常是 Python DBAPI 的原生方法,并且通常这个过程是自动的。有关这方面的更多文档,请参见Column.autoincrement

对于不是主键的、由服务器生成的列,或者不是简单的自动递增整数列的列,ORM 要求使用适当的server_default 指令标记这些列,以便 ORM 能够检索此值。但是,并非所有方法都支持所有后端,因此必须注意使用适当的方法。需要回答的两个问题是:1. 此列是否是主键的一部分?2. 数据库是否支持 RETURNING 或等效方法,例如“OUTPUT inserted”;这些是 SQL 短语,它们在调用 INSERT 或 UPDATE 语句时返回服务器生成的值。RETURNING 目前受 PostgreSQL、Oracle、MariaDB 10.5、SQLite 3.35 和 SQL Server 支持。

情况 1:非主键,支持 RETURNING 或等效方法

在这种情况下,应该将列标记为FetchedValue 或显式Column.server_default。ORM 将自动在执行 INSERT 语句时将这些列添加到 RETURNING 子句中,假设Mapper.eager_defaults 参数设置为 True,或者如果将其保留在默认设置 "auto",则适用于支持 RETURNING 以及insertmanyvalues 的方言。

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)

    # server-side SQL date function generates a new timestamp
    timestamp = mapped_column(DateTime(), server_default=func.now())

    # some other server-side function not named here, such as a trigger,
    # populates a value into this column during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

    # set eager defaults to True.  This is usually optional, as if the
    # backend supports RETURNING + insertmanyvalues, eager defaults
    # will take place regardless on INSERT
    __mapper_args__ = {"eager_defaults": True}

在上面,一个从客户端侧没有为“timestamp”或“special_identifier”指定显式值的 INSERT 语句将包括 RETURNING 子句中的“timestamp”和“special_identifier”列,以便立即获取它们。在 PostgreSQL 数据库上,上面表格的 INSERT 将看起来像

INSERT INTO my_table DEFAULT VALUES RETURNING my_table.id, my_table.timestamp, my_table.special_identifier

Changed in version 2.0.0rc1: Mapper.eager_defaults 参数现在默认为一个新的设置 "auto",该设置将自动使用 RETURNING 来获取 INSERT 操作时服务器生成的默认值,前提是后端数据库同时支持 RETURNING 以及insertmanyvalues

注意

Mapper.eager_defaults"auto" 值仅适用于 INSERT 语句。UPDATE 语句不会使用 RETURNING,即使可用,除非Mapper.eager_defaults 设置为 True。这是因为 UPDATE 没有等效的“insertmanyvalues”功能,因此 UPDATE RETURNING 将需要为每个要 UPDATEd 的行单独发出 UPDATE 语句。

情况 2:表包含不兼容 RETURNING 的触发器生成的值

Mapper.eager_defaults"auto" 设置意味着,支持 RETURNING 的后端通常会使用 RETURNING 和 INSERT 语句来检索新生成的默认值。但是,使用触发器生成的服务器生成值的限制是,RETURNING 不能使用。

  • SQL Server 不允许在 INSERT 语句中使用 RETURNING 来检索触发器生成的值;语句将失败。

  • SQLite 在将 RETURNING 与触发器结合使用时存在限制,因此 RETURNING 子句将无法使用 INSERTed 的值。

  • 其他后端可能在将 RETURNING 与触发器或其他类型的服务器生成值结合使用时存在限制。

要禁用此类值的 RETURNING 使用,包括不仅禁用服务器生成的默认值,而且确保 ORM 永远不会对特定表使用 RETURNING,请将Table.implicit_returning 指定为映射的TableFalse。使用声明性映射,这看起来像

class MyModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

    # assume a database trigger populates a value into this column
    # during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

    # disable all use of RETURNING for the table
    __table_args__ = {"implicit_returning": False}

在 SQL Server 上使用 pyodbc 驱动程序,上面表格的 INSERT 将不会使用 RETURNING,而是会使用 SQL Server scope_identity() 函数来检索新生成的主键值。

INSERT INTO my_table (data) VALUES (?); select scope_identity()

另见

INSERT 行为 - 有关 SQL Server 方言获取新生成的主键值的方法的背景信息。

情况 3:非主键,不支持或不需要 RETURNING 或等效方法

此情况与上面情况 1 相同,只是我们通常不希望使用Mapper.eager_defaults,因为在没有 RETURNING 支持的情况下,其当前实现是发出每个行的 SELECT,这在性能上并不理想。因此,在下面的映射中省略了该参数。

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)
    timestamp = mapped_column(DateTime(), server_default=func.now())

    # assume a database trigger populates a value into this column
    # during INSERT
    special_identifier = mapped_column(String(50), server_default=FetchedValue())

在不支持 RETURNING 或“insertmanyvalues”的后端上,使用上面映射插入记录后,“timestamp”和“special_identifier”列将保持为空,并在首次访问(例如,标记为“过期”)时通过第二个 SELECT 语句获取。

如果Mapper.eager_defaults 显式提供 True 值,并且后端数据库不支持 RETURNING 或等效方法,ORM 将在 INSERT 语句之后立即发出 SELECT 语句来获取新生成的值;ORM 目前没有在 RETURNING 不可用的情况下,以批处理方式选择多个新插入行的功能。这通常是不可取的,因为它会在 flush 过程中添加可能不需要的额外 SELECT 语句。使用上面的映射,将Mapper.eager_defaults 标志设置为 True,针对 MySQL(而不是 MariaDB)进行操作,将在 flush 操作时产生以下 SQL。

INSERT INTO my_table () VALUES ()

-- when eager_defaults **is** used, but RETURNING is not supported
SELECT my_table.timestamp AS my_table_timestamp, my_table.special_identifier AS my_table_special_identifier
FROM my_table WHERE my_table.id = %s

SQLAlchemy 的未来版本可能会寻求提高在没有 RETURNING 的情况下,急切默认值的效率,以便在单个 SELECT 语句中批处理多个行。

情况 4:主键,支持 RETURNING 或等效方法

具有服务器生成值的、作为主键的列必须在 INSERT 之后立即获取;ORM 只能访问它有主键值的那些行,因此如果主键是由服务器生成的,ORM 需要一种方法来立即在 INSERT 之后检索该新值。

如上所述,对于整数“自动递增”列,以及用Identity 标记的列和特殊结构(例如 PostgreSQL SERIAL),这些类型由 Core 自动处理;数据库包括用于获取“最后插入的 ID”的函数,在不支持 RETURNING 的情况下,以及在支持 RETURNING 的情况下,SQLAlchemy 将使用它。

例如,使用 Oracle 和用Identity 标记的列时,RETURNING 将自动用于获取新的主键值。

class MyOracleModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(Identity(), primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

上面模型在 Oracle 上的 INSERT 看起来像

INSERT INTO my_table (data) VALUES (:data) RETURNING my_table.id INTO :ret_0

SQLAlchemy 呈现了“data”字段的 INSERT,但只在 RETURNING 子句中包含“id”,以便服务器端的“id”生成将发生,并且新值将立即返回。

对于由服务器端函数或触发器生成的非整数值,以及来自表本身外部结构的整数值(包括显式序列和触发器),服务器默认生成必须在表元数据中标记。再次以 Oracle 为例,我们可以说明上面类似的表,使用 Sequence 结构命名一个显式序列

class MyOracleModel(Base):
    __tablename__ = "my_table"

    id: Mapped[int] = mapped_column(Sequence("my_oracle_seq"), primary_key=True)
    data: Mapped[str] = mapped_column(String(50))

对 Oracle 上此模型版本的 INSERT 操作将如下所示

INSERT INTO my_table (id, data) VALUES (my_oracle_seq.nextval, :data) RETURNING my_table.id INTO :ret_0

在上面,SQLAlchemy 为主键列呈现 my_sequence.nextval,以便它用于新的主键生成,并且还使用 RETURNING 立即获取新值。

如果数据源不由简单的 SQL 函数或 Sequence 表示,例如使用触发器或产生新值的特定于数据库的数据类型时,可以使用 FetchedValue 在列定义中指示值生成默认值的存在。下面是一个使用 SQL Server TIMESTAMP 列作为主键的模型;在 SQL Server 上,此数据类型会自动生成新值,因此这在表元数据中通过指示 FetchedValueColumn.server_default 参数来指示

class MySQLServerModel(Base):
    __tablename__ = "my_table"

    timestamp: Mapped[datetime.datetime] = mapped_column(
        TIMESTAMP(), server_default=FetchedValue(), primary_key=True
    )
    data: Mapped[str] = mapped_column(String(50))

在 SQL Server 上对上述表的 INSERT 操作如下所示

INSERT INTO my_table (data) OUTPUT inserted.timestamp VALUES (?)

情况 5:不支持主键、RETURNING 或等效项

在这个区域,我们正在为诸如 MySQL 之类的数据库生成行,其中一些生成默认值的方法正在服务器端发生,但超出了数据库的常规自动递增例程。在这种情况下,我们必须确保 SQLAlchemy 可以“预执行”默认值,这意味着它必须是一个显式 SQL 表达式。

注意

本节将说明涉及 MySQL 日期时间值的多种方法,因为此后端上的日期时间数据类型具有额外的特殊要求,这些要求有助于说明。但请记住,MySQL 要求为任何用作主键的自动生成数据类型提供显式“预执行”默认生成器,而不是常规的单列自动递增整数值。

带 DateTime 主键的 MySQL

以 MySQL 上的 DateTime 列为例,我们使用“NOW()”SQL 函数添加一个显式的预执行支持默认值

class MyModel(Base):
    __tablename__ = "my_table"

    timestamp = mapped_column(DateTime(), default=func.now(), primary_key=True)

在上面,我们选择“NOW()”函数来向列提供日期时间值。上面生成的 SQL 为

SELECT now() AS anon_1
INSERT INTO my_table (timestamp) VALUES (%s)
('2018-08-09 13:08:46',)

带 TIMESTAMP 主键的 MySQL

当在 MySQL 中使用 TIMESTAMP 数据类型时,MySQL 通常会自动将服务器端默认值与该数据类型相关联。但是,当我们将其用作主键时,Core 无法检索新生成的 value,除非我们自己执行函数。由于 MySQL 上的 TIMESTAMP 实际上存储的是二进制值,我们需要在使用“NOW()”时添加额外的“CAST”,以便我们检索可以持久化到列中的二进制值

from sqlalchemy import cast, Binary


class MyModel(Base):
    __tablename__ = "my_table"

    timestamp = mapped_column(
        TIMESTAMP(), default=cast(func.now(), Binary), primary_key=True
    )

在上面,除了选择“NOW()”函数之外,我们还与 cast() 结合使用 Binary 数据类型,以便返回的值为二进制。从上面在 INSERT 中呈现的 SQL 如下所示

SELECT CAST(now() AS BINARY) AS anon_1
INSERT INTO my_table (timestamp) VALUES (%s)
(b'2018-08-09 13:08:46',)

关于为 INSERT 或 UPDATE 积极获取客户端调用 SQL 表达式的说明

前面的示例指示使用 Column.server_default 创建包含其 DDL 中的默认生成函数的表。

SQLAlchemy 还支持非 DDL 服务器端默认值,如 客户端调用 SQL 表达式 中所述;这些“客户端调用 SQL 表达式”是使用 Column.defaultColumn.onupdate 参数设置的。

这些 SQL 表达式当前在 ORM 中受到与真实服务器端默认值相同的限制;它们不会在 Mapper.eager_defaults 设置为 "auto"True 时使用 RETURNING 积极获取,除非 FetchedValue 指令与 Column 相关联,即使这些表达式不是 DDL 服务器端默认值,并且是由 SQLAlchemy 本身积极呈现的。此限制可能会在将来的 SQLAlchemy 版本中解决。

可以将 FetchedValue 结构应用于 Column.server_defaultColumn.server_onupdate,同时将 SQL 表达式与 Column.defaultColumn.onupdate 一起使用,例如在下面的示例中,func.now() 结构用作 Column.defaultColumn.onupdate 的客户端调用 SQL 表达式。为了使 Mapper.eager_defaults 的行为包括它使用 RETURNING 获取这些值(如果可用),Column.server_defaultColumn.server_onupdateFetchedValue 一起使用以确保获取发生

class MyModel(Base):
    __tablename__ = "my_table"

    id = mapped_column(Integer, primary_key=True)

    created = mapped_column(
        DateTime(), default=func.now(), server_default=FetchedValue()
    )
    updated = mapped_column(
        DateTime(),
        onupdate=func.now(),
        server_default=FetchedValue(),
        server_onupdate=FetchedValue(),
    )

    __mapper_args__ = {"eager_defaults": True}

使用与上面类似的映射,ORM 为 INSERT 和 UPDATE 生成的 SQL 将在 RETURNING 子句中包含 createdupdated

INSERT INTO my_table (created) VALUES (now()) RETURNING my_table.id, my_table.created, my_table.updated

UPDATE my_table SET updated=now() WHERE my_table.id = %(my_table_id)s RETURNING my_table.updated

使用 INSERT、UPDATE 和 ON CONFLICT(即 upsert)返回 ORM 对象

SQLAlchemy 2.0 包含用于发出多种 ORM 启用的 INSERT、UPDATE 和 upsert 语句的增强功能。有关文档,请参阅 ORM 启用的 INSERT、UPDATE 和 DELETE 语句 中的文档。有关 upsert,请参阅 ORM“upsert”语句

使用 PostgreSQL ON CONFLICT 和 RETURNING 返回 upserted ORM 对象

本节已移至 ORM“upsert”语句

分区策略(例如,每个 Session 多个数据库后端)

简单的垂直分区

垂直分区将不同的类、类层次结构或映射表分布在多个数据库中,方法是使用 SessionSession.binds 参数进行配置。此参数接收一个字典,该字典包含 ORM 映射类的任何组合、映射层次结构中的任意类(例如声明性基类或混合类)、Table 对象和 Mapper 对象作为键,这些键通常引用 Engine,较少情况下引用 Connection 对象作为目标。每当 Session 需要代表特定类型的映射类发出 SQL 以定位适当的数据库连接源时,都会参考该字典。

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker()

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

在上述示例中,针对任一类的 SQL 操作都将使用与该类关联的 Engine。此功能在读写操作中都非常全面;一个针对映射到 engine1(通过查看请求项列表中的第一个实体来确定)的实体的 Query 将使用 engine1 来运行查询。刷新操作将基于每个类使用**两个**引擎,因为它会刷新类型为 UserAccount 的对象。

在更常见的情况下,通常存在基类或混合类,这些类可用于区分目的地为不同数据库连接的操作。 Session.binds 参数可以接受任何任意 Python 类作为键,如果发现该类存在于特定映射类的 __mro__(Python 方法解析顺序)中,则将使用该类。假设两个声明性基类代表两个不同的数据库连接。

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy.orm import Session


class BaseA(DeclarativeBase):
    pass


class BaseB(DeclarativeBase):
    pass


class User(BaseA): ...


class Address(BaseA): ...


class GameInfo(BaseB): ...


class GameStats(BaseB): ...


Session = sessionmaker()

# all User/Address operations will be on engine 1, all
# Game operations will be on engine 2
Session.configure(binds={BaseA: engine1, BaseB: engine2})

在上述示例中,从 BaseABaseB 派生的类将根据它们所继承的超类(如果有)将其 SQL 操作路由到两个引擎之一。对于从多个“绑定”超类派生的类,将选择目标类层次结构中最高的超类来表示应使用哪个引擎。

另见

Session.binds

多引擎 Session 的事务协调

使用多个绑定引擎的一个警告是在一个后端上的提交操作可能在另一个后端上提交成功后失败的情况下。这是一个一致性问题,在关系型数据库中,该问题通过使用“两阶段提交”来解决,该提交在提交序列中添加一个额外的“准备”步骤,允许多个数据库在实际完成事务之前同意提交。

由于 DBAPI 中的支持有限,SQLAlchemy 对跨后端的两阶段提交的支持也有限。最典型的是,它已知在 PostgreSQL 后端运行良好,在 MySQL 后端运行良好,但程度较低。但是, Session 完全能够在后端支持的情况下利用两阶段提交功能,方法是在 sessionmakerSession 中设置 Session.use_twophase 标志。有关示例,请参阅 启用两阶段提交

自定义垂直分区

可以通过覆盖 Session.get_bind() 方法来构建更全面的基于规则的类级别分区。下面,我们将说明一个自定义 Session,它提供以下规则。

  1. 刷新操作以及批量“更新”和“删除”操作都将传递给名为 leader 的引擎。

  2. 对子类化 MyOtherClass 的对象的任何操作都将在 other 引擎上发生。

  3. 所有其他类的读取操作都将在 follower1follower2 数据库的随机选择中发生。

engines = {
    "leader": create_engine("sqlite:///leader.db"),
    "other": create_engine("sqlite:///other.db"),
    "follower1": create_engine("sqlite:///follower1.db"),
    "follower2": create_engine("sqlite:///follower2.db"),
}

from sqlalchemy.sql import Update, Delete
from sqlalchemy.orm import Session, sessionmaker
import random


class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None):
        if mapper and issubclass(mapper.class_, MyOtherClass):
            return engines["other"]
        elif self._flushing or isinstance(clause, (Update, Delete)):
            # NOTE: this is for example, however in practice reader/writer
            # splits are likely more straightforward by using two distinct
            # Sessions at the top of a "reader" or "writer" operation.
            # See note below
            return engines["leader"]
        else:
            return engines[random.choice(["follower1", "follower2"])]

上述 Session 类使用 class_ 参数插入到 sessionmaker

Session = sessionmaker(class_=RoutingSession)

此方法可以与多个 MetaData 对象结合使用,方法类似于使用声明性 __abstract__ 关键字,如 __abstract__ 中所述。

注意

虽然上面的示例说明了根据语句是否期望写入数据将特定 SQL 语句路由到所谓的“leader”或“follower”数据库,但这可能不是一种实用的方法,因为它会导致同一操作中读写之间的事务行为不协调。实际上,最好是基于正在进行的整体操作/事务,预先构建一个“reader”或“writer”会话,作为 Session。这样,将写入数据的操作也会在同一个事务范围内发出它的读取查询。有关设置一个 sessionmaker 以使用自动提交连接进行“只读”操作,而另一个 sessionmaker 用于“写入”操作(包括 DML/COMMIT)的方案,请参阅 为 Sessionmaker/Engine 范围设置隔离 中的示例。

另见

SQLAlchemy 中的 Django 风格数据库路由器 - 关于更全面的 Session.get_bind() 示例的博客文章

水平分区

水平分区将单个表(或一组表)的行分布在多个数据库中。SQLAlchemy Session 包含对该概念的支持,但要完全使用它,需要使用 SessionQuery 子类。 水平分片 ORM 扩展提供了这些子类的基本版本。使用示例请参阅:水平分片

批量操作

传统功能

SQLAlchemy 2.0 已将 Session 的“批量插入”和“批量更新”功能集成到 2.0 风格的 Session.execute() 方法中,直接使用 InsertUpdate 结构。有关文档,包括 传统 Session 批量 INSERT 方法(说明了从旧方法迁移到新方法),请参阅 ORM 启用的 INSERT、UPDATE 和 DELETE 语句 中的文档。