连接 / 引擎

如何配置日志记录?

参见 配置日志记录.

如何池化数据库连接?我的连接被池化了吗?

在大多数情况下,SQLAlchemy 会自动执行应用程序级连接池化。对于所有包含的方言(使用“内存”数据库的 SQLite 除外),Engine 对象将 QueuePool 作为连接来源。

有关更多详细信息,请参见 引擎配置连接池化.

如何将自定义连接参数传递给我的数据库 API?

create_engine() 调用接受通过 connect_args 关键字参数直接传递的附加参数

e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

或者,对于基本字符串和整型参数,通常可以在 URL 的查询字符串中指定它们

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

“MySQL Server has gone away”

此错误的主要原因是 MySQL 连接已超时并已由服务器关闭。MySQL 服务器会关闭已空闲一段时间(默认情况下为八小时)的连接。为了解决此问题,最直接的设置是启用 create_engine.pool_recycle 设置,这将确保比设定时间更旧的连接将在下次检出时被丢弃并替换为新连接。

对于更普遍的情况,例如处理数据库重启和其他由于网络问题导致的临时连接丢失,池中的连接可能会响应更通用的断开连接检测技术而被回收。部分 处理断开连接 提供了关于“悲观” (例如预先 ping) 和“乐观” (例如优雅恢复) 技术的背景。现代 SQLAlchemy 倾向于采用“悲观”方法。

另请参见

处理断开连接

“Commands out of sync; you can’t run this command now” / “This result object does not return rows. It has been closed automatically”

MySQL 驱动程序有一系列故障模式,这些模式会导致与服务器的连接状态无效。通常,当再次使用连接时,会发生这两个错误消息之一。原因是服务器的状态已更改为客户端库不期望的状态,因此当客户端库在连接上发出新语句时,服务器不会按预期响应。

在 SQLAlchemy 中,由于数据库连接被池化,因此连接上消息不同步的问题变得更加重要,因为当操作失败时,如果连接本身处于不可用状态,如果它返回连接池,则在再次检出时它将出现故障。对此问题的缓解措施是在发生此类故障模式时使连接失效,以便丢弃与 MySQL 的底层数据库连接。此失效对于许多已知的故障模式会自动发生,也可以通过 Connection.invalidate() 方法显式调用。

在此类别中还存在第二类故障模式,其中上下文管理器(例如 with session.begin_nested():)希望在发生错误时“回滚”事务;但是,在连接的某些故障模式下,回滚本身(也可以是 RELEASE SAVEPOINT 操作)也会失败,从而导致误导性的堆栈跟踪。

最初,此错误的原因非常简单,这意味着多线程程序正在从多个线程在单个连接上调用命令。这适用于最初的“MySQLdb”原生 C 驱动程序,它几乎是当时唯一使用的驱动程序。但是,随着纯 Python 驱动程序(如 PyMySQL 和 MySQL-connector-Python)的引入,以及对 gevent/eventlet、多处理(通常使用 Celery)等工具的使用增加,出现了一系列已知会导致此问题的原因,其中一些已在 SQLAlchemy 版本中得到改进,但其他问题则无法避免

  • 在多个线程之间共享连接 - 这是这些错误最初发生的原因。一个程序同时在两个或多个线程中使用了相同的连接,这意味着多个消息集在连接上混合在一起,将服务器端会话置于客户端不再知道如何解释的状态。但是,现在通常其他原因更有可能。

  • 在多个进程之间共享连接的文件句柄 - 通常发生在程序使用 os.fork() 生成新进程时,以及父进程中的 TCP 连接被共享到一个或多个子进程中。由于多个进程现在本质上向相同的文件句柄发送消息,因此服务器会收到交织的消息并破坏连接的状态。

    如果程序使用 Python 的“multiprocessing”模块并利用在父进程中创建的 Engine,则这种场景很容易发生。当使用 Celery 等工具时,通常会使用“multiprocessing”。正确的方法应该是:当子进程首次启动时,要么生成一个新的 Engine,丢弃来自父进程的任何 Engine;或者,从父进程继承的 Engine 可以通过调用 Engine.dispose() 来处理其内部连接池。

  • Greenlet Monkeypatching w/ Exits - 当使用 gevent 或 eventlet 等库来修补 Python 网络 API 时,PyMySQL 等库现在以异步模式运行,即使它们不是针对此模型专门开发的。一个常见的问题是绿线程被中断,通常是由于应用程序中的超时逻辑造成的。这会导致 GreenletExit 异常被抛出,纯 Python MySQL 驱动程序会被中断,它可能正在接收来自服务器的响应或准备重置连接的状态。当异常中断所有这些工作时,客户端和服务器之间的对话现在不同步,连接的后续使用可能会失败。从版本 1.1.0 开始,SQLAlchemy 知道如何防御这种情况,如果数据库操作被所谓的“退出异常”中断,其中包括 GreenletExit 和 Python BaseException 的任何其他子类,但不是 Exception 的子类,则连接会失效。

  • 回滚 / SAVEPOINT 释放失败 - 一些错误会导致连接在事务的上下文中以及在“SAVEPOINT”块中不可用。在这些情况下,连接上的故障会导致任何 SAVEPOINT 不再存在,但是当 SQLAlchemy 或应用程序尝试“回滚”此 SAVEPOINT 时,“RELEASE SAVEPOINT”操作会失败,通常会显示消息“savepoint does not exist”。在这种情况下,在 Python 3 中,会输出一系列异常,其中错误的最终“原因”将显示为最终原因。在 Python 2 中,没有“链式”异常,但是 SQLAlchemy 的最新版本会尝试发出警告说明原始故障原因,同时仍然抛出立即错误,即 ROLLBACK 失败。

如何自动“重试”语句执行?

文档部分 处理断开连接 讨论了针对连接池中自上次检查后断开的连接的可用策略。这方面最现代的特性是 create_engine.pre_ping 参数,它允许在从连接池中检索数据库连接时发出一个“ping”,如果当前连接已断开,则重新连接。

需要注意的是,这个“ping”只在连接实际用于操作之前发出。一旦连接被传递给调用者,根据 Python DBAPI 规范,它现在将受到自动开始操作的影响,这意味着它将在第一次使用时自动开始一个新的事务,该事务对后续语句保持有效,直到 DBAPI 级的 connection.commit()connection.rollback() 方法被调用。

在 SQLAlchemy 的现代使用中,一系列 SQL 语句总是在这个事务状态内执行,假设 DBAPI 自动提交模式 未启用(下一节将详细介绍),这意味着没有单个语句会自动提交;如果操作失败,当前事务内所有语句的效果将丢失。

这对“重试”语句概念的影响是,在默认情况下,当连接丢失时,整个事务将丢失。数据库没有有效的方法可以“重新连接并重试”并从中断的地方继续,因为数据已经丢失。出于这个原因,SQLAlchemy 没有一个透明的“重新连接”功能,它可以在事务中间工作,用于数据库连接在使用过程中断开的情况。处理操作中间断开的规范方法是从事务开始重试整个操作,通常是使用自定义 Python 装饰器,该装饰器会多次“重试”特定函数,直到成功,或者以其他方式设计应用程序,使其对被丢弃的事务具有弹性,这些事务会导致操作失败。

还有一种概念是扩展,它可以跟踪事务中进行的所有语句,然后将它们全部重播到新的事务中,以近似于“重试”操作。SQLAlchemy 的 事件系统 确实允许构建这样的系统,但是这种方法通常也不实用,因为无法保证这些 DML 语句将针对相同的状态运行,因为一旦事务结束,数据库在新的事务中的状态可能完全不同。在事务操作开始和提交的点上显式地将“重试”设计到应用程序中仍然是更好的方法,因为应用程序级事务方法最了解如何重新运行其步骤。

否则,如果 SQLAlchemy 要提供一个透明地和静默地“重新连接”事务中连接的功能,其结果将是数据静默丢失。通过试图隐藏问题,SQLAlchemy 会使情况变得更糟。

但是,如果我们使用事务,那么将有更多选项可用,正如下一节所述。

使用 DBAPI 自动提交允许使用透明重新连接的只读版本

在说明了没有透明重新连接机制的原因之后,前一节依赖于应用程序实际上正在使用 DBAPI 级事务的假设。由于大多数 DBAPI 现在提供 原生“自动提交”设置,我们可以利用这些特性为只读、仅自动提交操作提供有限形式的透明重新连接。可以将透明语句重试应用于 DBAPI 的 cursor.execute() 方法,但是仍然不安全将其应用于 DBAPI 的 cursor.executemany() 方法,因为语句可能已使用给定参数的任何部分。

警告

以下方法不应用于写入数据的操作。用户应仔细阅读和理解该方法的工作原理,并在将该方法用于生产环境之前,针对特定的目标 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下都能防止断开连接错误。

通过利用 DialectEvents.do_execute()DialectEvents.do_execute_no_params() 钩子,可以将简单的重试机制应用于 DBAPI 级 cursor.execute() 方法,这些钩子将能够在语句执行期间拦截断开连接。它不会拦截结果集获取操作期间的连接失败,对于那些没有完全缓冲结果集的 DBAPI 来说。该方法要求数据库支持 DBAPI 级自动提交,并且不能保证适用于特定的后端。一个名为 reconnecting_engine() 的单个函数被呈现出来,它将事件钩子应用于给定的 Engine 对象,并返回一个始终自动提交的版本,该版本启用 DBAPI 级自动提交。连接将为单参数和无参数语句执行透明地重新连接。

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

鉴于上述方法,可以使用以下概念验证脚本演示事务中重新连接。运行后,它将每五秒向数据库发出一个 SELECT 1 语句。

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

在脚本运行时重新启动数据库,以演示透明重新连接操作。

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

以上方法已针对 SQLAlchemy 1.4 进行测试。

为什么 SQLAlchemy 会发出如此多的 ROLLBACK?

SQLAlchemy 目前假设 DBAPI 连接处于“非自动提交”模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假设始终正在进行事务。当连接被返回时,连接池会发出 connection.rollback()。这是为了释放连接上剩余的任何事务资源。在 PostgreSQL 或 MSSQL 等数据库中,表资源会积极地锁定,这至关重要,这样行和表就不会在不再使用的连接中保持锁定状态。否则应用程序可能会挂起。但是,这不仅限于锁定,对于任何具有任何类型的事务隔离的数据库来说,包括具有 InnoDB 的 MySQL,它也同样重要。任何仍然处于旧事务中的连接将返回过时的数据,如果该数据已在该连接中使用隔离进行过查询。有关在 MySQL 上可能看到过时数据的背景信息,请参见 https://dev.mysqlserver.cn/doc/refman/5.1/en/innodb-transaction-model.html

我在使用 MyISAM - 如何关闭它?

可以使用 reset_on_return 配置连接池的连接返回行为。

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

我在使用 SQL Server - 如何将这些 ROLLBACK 转换为 COMMIT?

reset_on_return 除了接受 TrueFalseNone 之外,还接受值 commitrollback。设置为 commit 将在任何连接返回到池时导致 COMMIT。

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

我正在使用 SQLite 数据库的多个连接(通常用于测试事务操作),我的测试程序无法工作!

如果使用 SQLite :memory: 数据库,默认连接池是 SingletonThreadPool,它为每个线程维护正好一个 SQLite 连接。因此,在同一线程中使用的两个连接实际上将是同一个 SQLite 连接。确保您没有使用 :memory: 数据库,这样引擎将使用 QueuePool(在当前 SQLAlchemy 版本中,非内存数据库的默认值)。

另请参见

线程/池行为 - 有关 PySQLite 行为的信息。

如何在使用 Engine 时获取原始 DBAPI 连接?

使用常规的 SA 引擎级连接,您可以通过 Connection.connection 属性获取 Connection 上的池代理版本 DBAPI 连接,对于真正真实的 DBAPI 连接,您可以调用该连接上的 PoolProxiedConnection.dbapi_connection 属性。在常规同步驱动程序上,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都通过代理传递。

engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

在版本 1.4.24 中更改:添加了 PoolProxiedConnection.dbapi_connection 属性,它取代了先前仍然可用的 PoolProxiedConnection.connection 属性;此属性始终提供 pep-249 同步样式连接对象。还添加了 PoolProxiedConnection.driver_connection 属性,它将始终引用实际的驱动程序级连接,无论其呈现的是什么 API。

访问 asyncio 驱动程序的底层连接

当使用 asyncio 驱动程序时,上述方案有两处变化。首先,当使用 AsyncConnection 时,必须使用可等待方法 AsyncConnection.get_raw_connection() 访问 PoolProxiedConnection。在这种情况下,返回的 PoolProxiedConnection 保留了同步风格的 pep-249 使用模式,而 PoolProxiedConnection.dbapi_connection 属性引用了一个 SQLAlchemy 适配的连接对象,它将 asyncio 连接适配为同步风格的 pep-249 API,换句话说,当使用 asyncio 驱动程序时,存在 _两_ 级代理。实际的 asyncio 连接可以通过 driver_connection 属性获得。为了用 asyncio 重述前面的例子,看起来像

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

在版本 1.4.24 中变更: 添加了 PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 属性,以便使用一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。

当使用 asyncio 驱动程序时,上述“DBAPI”连接实际上是 SQLAlchemy 适配形式的连接,它提供了同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动程序连接,它将提供所用驱动程序的原始 asyncio API,这可以通过 PoolProxiedConnection.driver_connection 属性访问 PoolProxiedConnection。对于标准的 pep-249 驱动程序,PoolProxiedConnection.dbapi_connectionPoolProxiedConnection.driver_connection 是同义词。

必须确保在将连接返回到连接池之前,将连接上的任何隔离级别设置或其他操作特定设置恢复为正常状态。

作为恢复设置的替代方法,可以调用 Connection.detach() 方法 Connection 或代理连接,这将使连接与连接池分离,以便在调用 Connection.close() 时关闭并丢弃连接

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

如何使用引擎/连接/会话与 Python 多进程或 os.fork()?

这在 使用连接池进行多进程或 os.fork() 部分介绍。