连接 / 引擎

如何配置日志记录?

请参阅 配置日志记录

如何池化数据库连接?我的连接是否被池化?

在大多数情况下,SQLAlchemy 会自动执行应用程序级别的连接池。对于所有包含的方言(除了使用“内存”数据库的 SQLite),Engine 对象引用 QueuePool 作为连接源。

有关更多详细信息,请参阅 引擎配置连接池

如何将自定义连接参数传递给我的数据库 API?

create_engine() 调用接受其他参数,可以直接通过 connect_args 关键字参数

e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

或者对于基本的字符串和整数参数,它们通常可以在 URL 的查询字符串中指定

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

“MySQL 服务器已消失”

此错误的主要原因是 MySQL 连接已超时并已被服务器关闭。MySQL 服务器关闭已空闲一段时间的连接,默认时间为八小时。为了解决这个问题,最直接的设置是启用 create_engine.pool_recycle 设置,这将确保当连接超过设定的秒数时,该连接将被丢弃,并在下次检出时替换为新连接。

对于更一般的情况,即适应数据库重启以及由于网络问题导致的其他临时连接丢失,池中的连接可能会响应更通用的断开连接检测技术而回收。处理断开连接 部分提供了关于“悲观”(例如,预 ping)和“乐观”(例如,优雅恢复)技术的背景知识。现代 SQLAlchemy 倾向于采用“悲观”方法。

另请参阅

处理断开连接

“命令不同步;您现在无法运行此命令” / “此结果对象不返回行。它已自动关闭”

MySQL 驱动程序具有相当广泛的故障模式,其中与服务器的连接状态处于无效状态。通常,当再次使用连接时,会发生这两个错误消息之一。原因是服务器的状态已更改为客户端库不期望的状态,这样当客户端库在连接上发出新语句时,服务器不会按预期响应。

在 SQLAlchemy 中,由于数据库连接是池化的,因此连接上的消息传递不同步的问题变得更加重要,因为当操作失败时,如果连接本身处于不可用状态,如果它返回到连接池,则在再次检出时会发生故障。此问题的缓解措施是在发生此类故障模式时使连接无效,以便丢弃与 MySQL 的底层数据库连接。对于许多已知的故障模式,这种失效会自动发生,也可以通过 Connection.invalidate() 方法显式调用。

在这种类别中还有第二类故障模式,其中上下文管理器(例如 with session.begin_nested():)希望在发生错误时“回滚”事务;但是,在某些连接故障模式中,回滚本身(也可能是 RELEASE SAVEPOINT 操作)也会失败,从而导致误导性的堆栈跟踪。

最初,此错误的原因曾经非常简单,这意味着多线程程序正在从多个线程在单个连接上调用命令。这适用于原始的 “MySQLdb” native-C 驱动程序,它几乎是当时唯一使用的驱动程序。但是,随着纯 Python 驱动程序(如 PyMySQL 和 MySQL-connector-Python)的引入,以及 gevent/eventlet、多进程(通常与 Celery)等工具的日益普及,已知有很多因素会导致此问题,其中一些因素已在 SQLAlchemy 版本中得到改进,但其他因素是不可避免的

  • 在线程之间共享连接 - 这是最初发生此类错误的原因。程序在两个或多个线程中同时使用相同的连接,这意味着多组消息在连接上混合在一起,从而将服务器端会话置于客户端不再知道如何解释的状态。但是,今天其他原因通常更可能发生。

  • 在进程之间共享连接的文件句柄 - 当程序使用 os.fork() 生成新进程时,通常会发生这种情况,并且父进程中存在的 TCP 连接被共享到一个或多个子进程中。由于多个进程现在正在向基本上相同的文件句柄发送消息,因此服务器接收到交错的消息并破坏了连接的状态。

    如果程序使用 Python 的 “multiprocessing” 模块并使用在父进程中创建的 Engine,则这种情况很容易发生。当使用 Celery 等工具时,通常会使用 “multiprocessing”。正确的方法应该是,当子进程首次启动时,生成一个新的 Engine,丢弃从父进程传递下来的任何 Engine;或者,从父进程继承的 Engine 可以通过调用 Engine.dispose() 来处理其内部连接池。

  • 带有退出的 Greenlet 猴子补丁 - 当使用 gevent 或 eventlet 等猴子补丁 Python 网络 API 的库时,即使 PyMySQL 等库不是专门针对此模型开发的,它们现在也以异步模式运行。一个常见问题是绿线程被中断,通常是由于应用程序中的超时逻辑。这会导致引发 GreenletExit 异常,并且纯 Python MySQL 驱动程序的工作被中断,这可能是它正在接收来自服务器的响应或准备重置连接状态。当异常中断所有工作时,客户端和服务器之间的对话现在不同步,并且随后对连接的使用可能会失败。从 1.1.0 版本开始,SQLAlchemy 知道如何防止这种情况,因为如果数据库操作被所谓的 “退出异常” 中断,其中包括 GreenletExit 和 Python BaseException 的任何其他子类(但不是 Exception 的子类),则连接将失效。

  • 回滚 / SAVEPOINT 释放失败 - 某些类别的错误会导致连接在事务上下文以及在 “SAVEPOINT” 块中操作时变得不可用。在这些情况下,连接上的故障已使任何 SAVEPOINT 不再存在,但是当 SQLAlchemy 或应用程序尝试 “回滚” 此保存点时,“RELEASE SAVEPOINT” 操作失败,通常会显示 “savepoint does not exist” 之类的消息。在这种情况下,在 Python 3 下,将输出异常链,其中错误的最终 “原因” 也将显示出来。在 Python 2 下,没有 “链式” 异常,但是最新版本的 SQLAlchemy 将尝试发出警告,说明原始故障原因,同时仍然抛出立即发生的错误,即 ROLLBACK 的失败。

如何自动“重试”语句执行?

文档部分 处理断开连接 讨论了可用于自上次检出特定连接以来已断开连接的池连接的策略。在这方面,最现代的功能是 create_engine.pre_ping 参数,该参数允许在从池中检索数据库连接时发出 “ping”,如果当前连接已断开连接,则重新连接。

重要的是要注意,此 “ping” 仅在实际用于操作之前发出。一旦连接传递给调用者,根据 Python DBAPI 规范,它现在受到自动开始操作的约束,这意味着当首次使用它时,它将自动 BEGIN 一个新的事务,该事务在后续语句中保持有效,直到调用 DBAPI 级别的 connection.commit()connection.rollback() 方法。

在现代 SQLAlchemy 的使用中,一系列 SQL 语句始终在此事务状态内调用,假设未启用 DBAPI 自动提交模式(在下一节中详细介绍),这意味着没有单个语句会自动提交;如果操作失败,则当前事务中所有语句的效果将丢失。

这对 “重试” 语句的概念的含义是,在默认情况下,当连接丢失时,整个事务都会丢失。数据库无法“重新连接并重试” 并从中断处继续,因为数据已丢失。因此,对于数据库连接在使用时断开连接的情况,SQLAlchemy 没有在事务中间工作的透明 “重新连接” 功能。处理操作中途断开连接的规范方法是从事务开始处重试整个操作,通常使用自定义 Python 装饰器,该装饰器将 “重试” 特定函数多次,直到成功,或者以其他方式架构应用程序,使其能够抵抗被删除然后导致操作失败的事务。

还有扩展的概念,它可以跟踪事务中已进行的所有语句,然后在新事务中重放所有语句,以近似 “重试” 操作。SQLAlchemy 的 事件系统 确实允许构建这样的系统,但是这种方法通常也没有用,因为无法保证这些 DML 语句将针对相同的状态工作,因为一旦事务结束,新事务中数据库的状态可能完全不同。在事务操作开始和提交的点上将 “重试” 显式地架构到应用程序中仍然是更好的方法,因为应用程序级别的事务方法最了解如何重新运行其步骤。

否则,如果 SQLAlchemy 提供一个透明且静默地 “重新连接” 事务中间连接的功能,则效果将是数据被静默丢失。通过试图隐藏问题,SQLAlchemy 会使情况变得更糟。

但是,如果我们使用事务,那么有更多选项可用,如下一节所述。

使用 DBAPI 自动提交允许透明重新连接的只读版本

在说明了没有透明重新连接机制的基本原理之后,上一节基于应用程序实际上正在使用 DBAPI 级别事务的假设。由于大多数 DBAPI 现在都提供 本机 “自动提交” 设置,我们可以利用这些功能为 只读、仅自动提交操作 提供有限形式的透明重新连接。透明语句重试可以应用于 DBAPI 的 cursor.execute() 方法,但是仍然不安全应用于 DBAPI 的 cursor.executemany() 方法,因为该语句可能已消耗给定的任何部分参数。

警告

以下配方不应用于写入数据的操作。用户应仔细阅读并理解配方的工作原理,并在生产中使用此配方之前针对特定目标 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下都能防止断开连接错误。

通过使用 DialectEvents.do_execute()DialectEvents.do_execute_no_params() 挂钩,可以将简单的重试机制应用于 DBAPI 级别的 cursor.execute() 方法,这将能够拦截语句执行期间的断开连接。它将不会拦截结果集获取操作期间的连接失败,对于那些不完全缓冲结果集的 DBAPI。该配方要求数据库支持 DBAPI 级别的自动提交,并且不保证用于特定后端。提供了一个简单的函数 reconnecting_engine(),它将事件挂钩应用于给定的 Engine 对象,返回一个始终自动提交的版本,该版本启用 DBAPI 级别的自动提交。对于单参数和无参数语句执行,连接将透明地重新连接

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    engine.logger.error(
                        "disconnection error, attempt %d/%d",
                        retry + 1,
                        num_retries + 1,
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    if retry == num_retries:
                        raise

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

给定上述配方,可以使用以下概念验证脚本来演示事务中间的重新连接。运行后,它将每五秒向数据库发出 SELECT 1 语句

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库以演示透明的重新连接操作

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

上述配方已针对 SQLAlchemy 1.4 进行测试。

为什么 SQLAlchemy 发出如此多的 ROLLBACK?

SQLAlchemy 当前假定 DBAPI 连接处于 “非自动提交” 模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。当连接返回时,连接池会发出 connection.rollback()。这是为了释放连接上剩余的任何事务资源。在像 PostgreSQL 或 MSSQL 这样的数据库上,表资源被积极锁定,这至关重要,这样行和表就不会在不再使用的连接中保持锁定状态。否则,应用程序可能会挂起。但这不仅仅是为了锁定,而且对于任何具有任何类型的事务隔离的数据库(包括带有 InnoDB 的 MySQL)也同样至关重要。任何仍处于旧事务中的连接都将返回过时的数据,如果该数据已在该连接上的隔离中查询过。有关为什么即使在 MySQL 上也可能看到过时数据的背景知识,请参阅 https://dev.mysqlserver.cn/doc/refman/5.1/en/innodb-transaction-model.html

我正在使用 MyISAM - 如何关闭它?

可以使用 reset_on_return 配置连接池的连接返回行为

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

我正在使用 SQL Server - 如何将这些 ROLLBACK 转换为 COMMIT?

reset_on_return 除了 TrueFalseNone 之外,还接受值 commitrollback。设置为 commit 将导致在任何连接返回到池时执行 COMMIT

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在对 SQLite 数据库使用多个连接(通常用于测试事务操作),并且我的测试程序无法正常工作!

如果使用 SQLite :memory: 数据库,则默认连接池是 SingletonThreadPool,它为每个线程维护一个 SQLite 连接。因此,在同一线程中使用的两个连接实际上将是相同的 SQLite 连接。确保您没有使用 :memory: 数据库,以便引擎将使用 QueuePool(当前 SQLAlchemy 版本中非内存数据库的默认值)。

另请参阅

线程/池化行为 - 有关 PySQLite 行为的信息。

使用引擎时,如何访问原始 DBAPI 连接?

对于常规的 SA 引擎级连接,您可以通过 Connection.connection 属性在 Connection 上访问 DBAPI 连接的池代理版本,对于真正的 DBAPI 连接,您可以调用 PoolProxiedConnection.dbapi_connection 属性。在常规同步驱动程序上,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都通过代理传递

engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

在版本 1.4.24 中更改: 添加了 PoolProxiedConnection.dbapi_connection 属性,该属性取代了之前的 PoolProxiedConnection.connection 属性,该属性仍然可用;此属性始终提供 pep-249 同步风格的连接对象。PoolProxiedConnection.driver_connection 属性也已添加,它将始终引用真正的驱动程序级连接,而不管它呈现什么 API。

访问 asyncio 驱动程序的底层连接

当使用 asyncio 驱动程序时,上述方案有两个变化。第一个是当使用 AsyncConnection 时,必须使用可等待方法 AsyncConnection.get_raw_connection() 访问 PoolProxiedConnection。在这种情况下,返回的 PoolProxiedConnection 保留了同步风格的 pep-249 使用模式,并且 PoolProxiedConnection.dbapi_connection 属性引用了一个 SQLAlchemy 调整的连接对象,该对象将 asyncio 连接调整为同步风格的 pep-249 API,换句话说,当使用 asyncio 驱动程序时,存在个级别的代理。实际的 asyncio 连接可从 driver_connection 属性获得。用 asyncio 的术语重述前面的示例看起来像

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

在版本 1.4.24 中更改: 添加了 PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 属性,以允许使用一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

当使用 asyncio 驱动时,上述 “DBAPI” 连接实际上是 SQLAlchemy 适配的连接形式,它呈现同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动连接,它将呈现正在使用的驱动程序的原始 asyncio API,可以通过 PoolProxiedConnection.driver_connection 属性访问 PoolProxiedConnection。对于标准的 pep-249 驱动,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 是同义的。

您必须确保在将连接返回到连接池之前,将连接上的任何隔离级别设置或其他特定于操作的设置恢复为正常。

作为恢复设置的替代方法,您可以调用 Connection.detach() 方法,无论是在 Connection 还是代理连接上,这将使连接与连接池解除关联,以便在调用 Connection.close() 时将其关闭并丢弃

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

如何将引擎 / 连接 / 会话与 Python 多进程或 os.fork() 一起使用?

这在 将连接池与多进程或 os.fork() 一起使用 部分中进行了介绍。