SQLAlchemy 2.0 文档
常见问题解答
- 安装
- 连接 / 引擎¶
- MetaData / 模式
- SQL 表达式
- ORM 配置
- 性能
- 会话 / 查询
- 第三方集成问题
项目版本
- 上一篇: 安装
- 下一篇: MetaData / 模式
- 上级: 首页
- 本页内容
连接 / 引擎¶
如何配置日志记录?¶
请参阅 配置日志记录。
如何池化数据库连接?我的连接是否被池化?¶
在大多数情况下,SQLAlchemy 会自动执行应用程序级别的连接池。对于所有包含的方言(除了使用“内存”数据库的 SQLite),Engine
对象引用 QueuePool
作为连接源。
如何将自定义连接参数传递给我的数据库 API?¶
create_engine()
调用接受其他参数,可以直接通过 connect_args
关键字参数
e = create_engine(
"mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)
或者对于基本的字符串和整数参数,它们通常可以在 URL 的查询字符串中指定
e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")
“MySQL 服务器已消失”¶
此错误的主要原因是 MySQL 连接已超时并已被服务器关闭。MySQL 服务器关闭已空闲一段时间的连接,默认时间为八小时。为了解决这个问题,最直接的设置是启用 create_engine.pool_recycle
设置,这将确保当连接超过设定的秒数时,该连接将被丢弃,并在下次检出时替换为新连接。
对于更一般的情况,即适应数据库重启以及由于网络问题导致的其他临时连接丢失,池中的连接可能会响应更通用的断开连接检测技术而回收。处理断开连接 部分提供了关于“悲观”(例如,预 ping)和“乐观”(例如,优雅恢复)技术的背景知识。现代 SQLAlchemy 倾向于采用“悲观”方法。
另请参阅
“命令不同步;您现在无法运行此命令” / “此结果对象不返回行。它已自动关闭”¶
MySQL 驱动程序具有相当广泛的故障模式,其中与服务器的连接状态处于无效状态。通常,当再次使用连接时,会发生这两个错误消息之一。原因是服务器的状态已更改为客户端库不期望的状态,这样当客户端库在连接上发出新语句时,服务器不会按预期响应。
在 SQLAlchemy 中,由于数据库连接是池化的,因此连接上的消息传递不同步的问题变得更加重要,因为当操作失败时,如果连接本身处于不可用状态,如果它返回到连接池,则在再次检出时会发生故障。此问题的缓解措施是在发生此类故障模式时使连接无效,以便丢弃与 MySQL 的底层数据库连接。对于许多已知的故障模式,这种失效会自动发生,也可以通过 Connection.invalidate()
方法显式调用。
在这种类别中还有第二类故障模式,其中上下文管理器(例如 with session.begin_nested():
)希望在发生错误时“回滚”事务;但是,在某些连接故障模式中,回滚本身(也可能是 RELEASE SAVEPOINT 操作)也会失败,从而导致误导性的堆栈跟踪。
最初,此错误的原因曾经非常简单,这意味着多线程程序正在从多个线程在单个连接上调用命令。这适用于原始的 “MySQLdb” native-C 驱动程序,它几乎是当时唯一使用的驱动程序。但是,随着纯 Python 驱动程序(如 PyMySQL 和 MySQL-connector-Python)的引入,以及 gevent/eventlet、多进程(通常与 Celery)等工具的日益普及,已知有很多因素会导致此问题,其中一些因素已在 SQLAlchemy 版本中得到改进,但其他因素是不可避免的
在线程之间共享连接 - 这是最初发生此类错误的原因。程序在两个或多个线程中同时使用相同的连接,这意味着多组消息在连接上混合在一起,从而将服务器端会话置于客户端不再知道如何解释的状态。但是,今天其他原因通常更可能发生。
在进程之间共享连接的文件句柄 - 当程序使用
os.fork()
生成新进程时,通常会发生这种情况,并且父进程中存在的 TCP 连接被共享到一个或多个子进程中。由于多个进程现在正在向基本上相同的文件句柄发送消息,因此服务器接收到交错的消息并破坏了连接的状态。如果程序使用 Python 的 “multiprocessing” 模块并使用在父进程中创建的
Engine
,则这种情况很容易发生。当使用 Celery 等工具时,通常会使用 “multiprocessing”。正确的方法应该是,当子进程首次启动时,生成一个新的Engine
,丢弃从父进程传递下来的任何Engine
;或者,从父进程继承的Engine
可以通过调用Engine.dispose()
来处理其内部连接池。带有退出的 Greenlet 猴子补丁 - 当使用 gevent 或 eventlet 等猴子补丁 Python 网络 API 的库时,即使 PyMySQL 等库不是专门针对此模型开发的,它们现在也以异步模式运行。一个常见问题是绿线程被中断,通常是由于应用程序中的超时逻辑。这会导致引发
GreenletExit
异常,并且纯 Python MySQL 驱动程序的工作被中断,这可能是它正在接收来自服务器的响应或准备重置连接状态。当异常中断所有工作时,客户端和服务器之间的对话现在不同步,并且随后对连接的使用可能会失败。从 1.1.0 版本开始,SQLAlchemy 知道如何防止这种情况,因为如果数据库操作被所谓的 “退出异常” 中断,其中包括GreenletExit
和 PythonBaseException
的任何其他子类(但不是Exception
的子类),则连接将失效。回滚 / SAVEPOINT 释放失败 - 某些类别的错误会导致连接在事务上下文以及在 “SAVEPOINT” 块中操作时变得不可用。在这些情况下,连接上的故障已使任何 SAVEPOINT 不再存在,但是当 SQLAlchemy 或应用程序尝试 “回滚” 此保存点时,“RELEASE SAVEPOINT” 操作失败,通常会显示 “savepoint does not exist” 之类的消息。在这种情况下,在 Python 3 下,将输出异常链,其中错误的最终 “原因” 也将显示出来。在 Python 2 下,没有 “链式” 异常,但是最新版本的 SQLAlchemy 将尝试发出警告,说明原始故障原因,同时仍然抛出立即发生的错误,即 ROLLBACK 的失败。
如何自动“重试”语句执行?¶
文档部分 处理断开连接 讨论了可用于自上次检出特定连接以来已断开连接的池连接的策略。在这方面,最现代的功能是 create_engine.pre_ping
参数,该参数允许在从池中检索数据库连接时发出 “ping”,如果当前连接已断开连接,则重新连接。
重要的是要注意,此 “ping” 仅在实际用于操作之前发出。一旦连接传递给调用者,根据 Python DBAPI 规范,它现在受到自动开始操作的约束,这意味着当首次使用它时,它将自动 BEGIN 一个新的事务,该事务在后续语句中保持有效,直到调用 DBAPI 级别的 connection.commit()
或 connection.rollback()
方法。
在现代 SQLAlchemy 的使用中,一系列 SQL 语句始终在此事务状态内调用,假设未启用 DBAPI 自动提交模式(在下一节中详细介绍),这意味着没有单个语句会自动提交;如果操作失败,则当前事务中所有语句的效果将丢失。
这对 “重试” 语句的概念的含义是,在默认情况下,当连接丢失时,整个事务都会丢失。数据库无法“重新连接并重试” 并从中断处继续,因为数据已丢失。因此,对于数据库连接在使用时断开连接的情况,SQLAlchemy 没有在事务中间工作的透明 “重新连接” 功能。处理操作中途断开连接的规范方法是从事务开始处重试整个操作,通常使用自定义 Python 装饰器,该装饰器将 “重试” 特定函数多次,直到成功,或者以其他方式架构应用程序,使其能够抵抗被删除然后导致操作失败的事务。
还有扩展的概念,它可以跟踪事务中已进行的所有语句,然后在新事务中重放所有语句,以近似 “重试” 操作。SQLAlchemy 的 事件系统 确实允许构建这样的系统,但是这种方法通常也没有用,因为无法保证这些 DML 语句将针对相同的状态工作,因为一旦事务结束,新事务中数据库的状态可能完全不同。在事务操作开始和提交的点上将 “重试” 显式地架构到应用程序中仍然是更好的方法,因为应用程序级别的事务方法最了解如何重新运行其步骤。
否则,如果 SQLAlchemy 提供一个透明且静默地 “重新连接” 事务中间连接的功能,则效果将是数据被静默丢失。通过试图隐藏问题,SQLAlchemy 会使情况变得更糟。
但是,如果我们不使用事务,那么有更多选项可用,如下一节所述。
使用 DBAPI 自动提交允许透明重新连接的只读版本¶
在说明了没有透明重新连接机制的基本原理之后,上一节基于应用程序实际上正在使用 DBAPI 级别事务的假设。由于大多数 DBAPI 现在都提供 本机 “自动提交” 设置,我们可以利用这些功能为 只读、仅自动提交操作 提供有限形式的透明重新连接。透明语句重试可以应用于 DBAPI 的 cursor.execute()
方法,但是仍然不安全应用于 DBAPI 的 cursor.executemany()
方法,因为该语句可能已消耗给定的任何部分参数。
警告
以下配方不应用于写入数据的操作。用户应仔细阅读并理解配方的工作原理,并在生产中使用此配方之前针对特定目标 DBAPI 驱动程序非常仔细地测试故障模式。重试机制不能保证在所有情况下都能防止断开连接错误。
通过使用 DialectEvents.do_execute()
和 DialectEvents.do_execute_no_params()
挂钩,可以将简单的重试机制应用于 DBAPI 级别的 cursor.execute()
方法,这将能够拦截语句执行期间的断开连接。它将不会拦截结果集获取操作期间的连接失败,对于那些不完全缓冲结果集的 DBAPI。该配方要求数据库支持 DBAPI 级别的自动提交,并且不保证用于特定后端。提供了一个简单的函数 reconnecting_engine()
,它将事件挂钩应用于给定的 Engine
对象,返回一个始终自动提交的版本,该版本启用 DBAPI 级别的自动提交。对于单参数和无参数语句执行,连接将透明地重新连接
import time
from sqlalchemy import event
def reconnecting_engine(engine, num_retries, retry_interval):
def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
for retry in range(num_retries + 1):
try:
fn(cursor_obj, statement, context=context, *arg)
except engine.dialect.dbapi.Error as raw_dbapi_err:
connection = context.root_connection
if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
engine.logger.error(
"disconnection error, attempt %d/%d",
retry + 1,
num_retries + 1,
exc_info=True,
)
connection.invalidate()
# use SQLAlchemy 2.0 API if available
if hasattr(connection, "rollback"):
connection.rollback()
else:
trans = connection.get_transaction()
if trans:
trans.rollback()
if retry == num_retries:
raise
time.sleep(retry_interval)
context.cursor = cursor_obj = connection.connection.cursor()
else:
raise
else:
return True
e = engine.execution_options(isolation_level="AUTOCOMMIT")
@event.listens_for(e, "do_execute_no_params")
def do_execute_no_params(cursor_obj, statement, context):
return _run_with_retries(
context.dialect.do_execute_no_params, context, cursor_obj, statement
)
@event.listens_for(e, "do_execute")
def do_execute(cursor_obj, statement, parameters, context):
return _run_with_retries(
context.dialect.do_execute, context, cursor_obj, statement, parameters
)
return e
给定上述配方,可以使用以下概念验证脚本来演示事务中间的重新连接。运行后,它将每五秒向数据库发出 SELECT 1
语句
from sqlalchemy import create_engine
from sqlalchemy import select
if __name__ == "__main__":
engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)
def do_a_thing(engine):
with engine.begin() as conn:
while True:
print("ping: %s" % conn.execute(select([1])).scalar())
time.sleep(5)
e = reconnecting_engine(
create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
num_retries=5,
retry_interval=2,
)
do_a_thing(e)
在脚本运行时重新启动数据库以演示透明的重新连接操作
$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...
上述配方已针对 SQLAlchemy 1.4 进行测试。
为什么 SQLAlchemy 发出如此多的 ROLLBACK?¶
SQLAlchemy 当前假定 DBAPI 连接处于 “非自动提交” 模式 - 这是 Python 数据库 API 的默认行为,这意味着必须假定事务始终在进行中。当连接返回时,连接池会发出 connection.rollback()
。这是为了释放连接上剩余的任何事务资源。在像 PostgreSQL 或 MSSQL 这样的数据库上,表资源被积极锁定,这至关重要,这样行和表就不会在不再使用的连接中保持锁定状态。否则,应用程序可能会挂起。但这不仅仅是为了锁定,而且对于任何具有任何类型的事务隔离的数据库(包括带有 InnoDB 的 MySQL)也同样至关重要。任何仍处于旧事务中的连接都将返回过时的数据,如果该数据已在该连接上的隔离中查询过。有关为什么即使在 MySQL 上也可能看到过时数据的背景知识,请参阅 https://dev.mysqlserver.cn/doc/refman/5.1/en/innodb-transaction-model.html
我正在使用 MyISAM - 如何关闭它?¶
可以使用 reset_on_return
配置连接池的连接返回行为
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
engine = create_engine(
"mysql+mysqldb://scott:tiger@localhost/myisam_database",
pool=QueuePool(reset_on_return=False),
)
我正在使用 SQL Server - 如何将这些 ROLLBACK 转换为 COMMIT?¶
reset_on_return
除了 True
、False
和 None
之外,还接受值 commit
、rollback
。设置为 commit
将导致在任何连接返回到池时执行 COMMIT
engine = create_engine(
"mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)
我正在对 SQLite 数据库使用多个连接(通常用于测试事务操作),并且我的测试程序无法正常工作!¶
如果使用 SQLite :memory:
数据库,则默认连接池是 SingletonThreadPool
,它为每个线程维护一个 SQLite 连接。因此,在同一线程中使用的两个连接实际上将是相同的 SQLite 连接。确保您没有使用 :memory: 数据库,以便引擎将使用 QueuePool
(当前 SQLAlchemy 版本中非内存数据库的默认值)。
另请参阅
线程/池化行为 - 有关 PySQLite 行为的信息。
使用引擎时,如何访问原始 DBAPI 连接?¶
对于常规的 SA 引擎级连接,您可以通过 Connection.connection
属性在 Connection
上访问 DBAPI 连接的池代理版本,对于真正的 DBAPI 连接,您可以调用 PoolProxiedConnection.dbapi_connection
属性。在常规同步驱动程序上,通常不需要访问非池代理的 DBAPI 连接,因为所有方法都通过代理传递
engine = create_engine(...)
conn = engine.connect()
# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection
# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj
# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection
# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection
在版本 1.4.24 中更改: 添加了 PoolProxiedConnection.dbapi_connection
属性,该属性取代了之前的 PoolProxiedConnection.connection
属性,该属性仍然可用;此属性始终提供 pep-249 同步风格的连接对象。PoolProxiedConnection.driver_connection
属性也已添加,它将始终引用真正的驱动程序级连接,而不管它呈现什么 API。
访问 asyncio 驱动程序的底层连接¶
当使用 asyncio 驱动程序时,上述方案有两个变化。第一个是当使用 AsyncConnection
时,必须使用可等待方法 AsyncConnection.get_raw_connection()
访问 PoolProxiedConnection
。在这种情况下,返回的 PoolProxiedConnection
保留了同步风格的 pep-249 使用模式,并且 PoolProxiedConnection.dbapi_connection
属性引用了一个 SQLAlchemy 调整的连接对象,该对象将 asyncio 连接调整为同步风格的 pep-249 API,换句话说,当使用 asyncio 驱动程序时,存在两个级别的代理。实际的 asyncio 连接可从 driver_connection
属性获得。用 asyncio 的术语重述前面的示例看起来像
async def main():
engine = create_async_engine(...)
conn = await engine.connect()
# pep-249 style ConnectionFairy connection pool proxy object
# presents a sync interface
connection_fairy = await conn.get_raw_connection()
# beneath that proxy is a second proxy which adapts the
# asyncio driver into a pep-249 connection object, accessible
# via .dbapi_connection as is the same with a sync API
sqla_sync_conn = connection_fairy.dbapi_connection
# the really-real innermost driver connection is available
# from the .driver_connection attribute
raw_asyncio_connection = connection_fairy.driver_connection
# work with raw asyncio connection
result = await raw_asyncio_connection.execute(...)
在版本 1.4.24 中更改: 添加了 PoolProxiedConnection.dbapi_connection
和 PoolProxiedConnection.driver_connection
属性,以允许使用一致的接口访问 pep-249 连接、pep-249 适配层和底层驱动程序连接。
当使用 asyncio 驱动时,上述 “DBAPI” 连接实际上是 SQLAlchemy 适配的连接形式,它呈现同步风格的 pep-249 风格 API。要访问实际的 asyncio 驱动连接,它将呈现正在使用的驱动程序的原始 asyncio API,可以通过 PoolProxiedConnection.driver_connection
属性访问 PoolProxiedConnection
。对于标准的 pep-249 驱动,PoolProxiedConnection.dbapi_connection
和 PoolProxiedConnection.driver_connection
是同义的。
您必须确保在将连接返回到连接池之前,将连接上的任何隔离级别设置或其他特定于操作的设置恢复为正常。
作为恢复设置的替代方法,您可以调用 Connection.detach()
方法,无论是在 Connection
还是代理连接上,这将使连接与连接池解除关联,以便在调用 Connection.close()
时将其关闭并丢弃
conn = engine.connect()
conn.detach() # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close() # connection is closed for real, the pool replaces it with a new connection
如何将引擎 / 连接 / 会话与 Python 多进程或 os.fork() 一起使用?¶
这在 将连接池与多进程或 os.fork() 一起使用 部分中进行了介绍。
flambé! 龙和 炼金术士 图像设计由 Rotem Yaari 创作并慷慨捐赠。
使用 Sphinx 7.2.6 创建。文档最后生成时间:Tue 11 Mar 2025 02:40:17 PM EDT