核心事件

本节介绍 SQLAlchemy 核心提供的事件接口。有关事件监听 API 的介绍,请参见 事件。ORM 事件在 ORM 事件 中介绍。

对象名称 描述

事件

为特定目标类型定义事件监听函数。

class sqlalchemy.event.base.Events

为特定目标类型定义事件监听函数。

成员

dispatch

类签名

class sqlalchemy.event.Events (sqlalchemy.event._HasEventsDispatch)

attribute sqlalchemy.event.base.Events.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.EventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

连接池事件

对象名称 描述

PoolEvents

针对 Pool 可用的事件。

PoolResetState

描述 DBAPI 连接在传递给 PoolEvents.reset() 连接池事件时的状态。

class sqlalchemy.events.PoolEvents

针对 Pool 可用的事件。

这里的方法定义了事件名称以及传递给监听器函数的成员名称。

例如:

from sqlalchemy import event

def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
    "handle an on checkout event"

event.listen(Pool, 'checkout', my_on_checkout)

除了接受 Pool 类和 Pool 实例外,PoolEvents 还接受 Engine 对象和 Engine 类作为目标,这些目标将被解析为给定引擎的 .pool 属性或 Pool

engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

# will associate with engine.pool
event.listen(engine, 'checkout', my_on_checkout)

类签名

class sqlalchemy.events.PoolEvents (sqlalchemy.event.Events)

method sqlalchemy.events.PoolEvents.checkin(dbapi_connection: DBAPIConnection | None, connection_record: ConnectionPoolEntry) None

当连接返回到池中时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'checkin')
def receive_checkin(dbapi_connection, connection_record):
    "listen for the 'checkin' event"

    # ... (event handling logic) ...

请注意,连接可能已关闭,如果连接已被失效,则可能为 None。对于分离的连接,不会调用 checkin。(它们不会返回到池中。)

参数:
method sqlalchemy.events.PoolEvents.checkout(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, connection_proxy: PoolProxiedConnection) None

当从池中检索连接时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'checkout')
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
    "listen for the 'checkout' event"

    # ... (event handling logic) ...
参数:

如果您引发 DisconnectionError,当前连接将被释放,并检索一个新的连接。所有检出监听器的处理将中止,并使用新连接重新启动。

另请参阅

ConnectionEvents.engine_connect() - 一个类似的事件,它在创建新的 Connection 时发生。

method sqlalchemy.events.PoolEvents.close(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

当 DBAPI 连接关闭时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'close')
def receive_close(dbapi_connection, connection_record):
    "listen for the 'close' event"

    # ... (event handling logic) ...

该事件在关闭发生之前发出。

连接的关闭可能会失败;通常这是因为连接已经关闭。如果关闭操作失败,则连接将被丢弃。

close() 事件对应于仍然与池关联的连接。要拦截分离连接的关闭事件,请使用 close_detached().

参数:
method sqlalchemy.events.PoolEvents.close_detached(dbapi_connection: DBAPIConnection) None

当分离的 DBAPI 连接关闭时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'close_detached')
def receive_close_detached(dbapi_connection):
    "listen for the 'close_detached' event"

    # ... (event handling logic) ...

该事件在关闭发生之前发出。

连接的关闭可能会失败;通常这是因为连接已经关闭。如果关闭操作失败,则连接将被丢弃。

参数:

dbapi_connection – DBAPI 连接。该 ConnectionPoolEntry.dbapi_connection 属性。

method sqlalchemy.events.PoolEvents.connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

在为给定的 Pool 首次创建特定 DBAPI 连接时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'connect')
def receive_connect(dbapi_connection, connection_record):
    "listen for the 'connect' event"

    # ... (event handling logic) ...

此事件允许您捕获在使用 DBAPI 模块级 .connect() 方法以生成新的 DBAPI 连接之后的点。

参数:
method sqlalchemy.events.PoolEvents.detach(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

当 DBAPI 连接从池中“分离”时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'detach')
def receive_detach(dbapi_connection, connection_record):
    "listen for the 'detach' event"

    # ... (event handling logic) ...

该事件在分离发生后发出。连接不再与给定的连接记录关联。

参数:
attribute sqlalchemy.events.PoolEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.PoolEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.PoolEvents.first_connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

当从特定 Pool 中检出 DBAPI 连接时,恰好调用一次。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'first_connect')
def receive_first_connect(dbapi_connection, connection_record):
    "listen for the 'first_connect' event"

    # ... (event handling logic) ...

PoolEvents.first_connect() 的基本原理是根据用于所有连接的设置来确定有关特定数据库连接系列的信息。由于特定 Pool 指向单个“创建者”函数(就 Engine 而言,指向使用的 URL 和连接选项),通常可以对单个连接进行观察,可以安全地假定该观察对所有后续连接有效,例如数据库版本、服务器和客户端编码设置、排序规则设置等等。

参数:
method sqlalchemy.events.PoolEvents.invalidate(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, exception: BaseException | None) None

当一个 DBAPI 连接要被“失效”时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'invalidate')
def receive_invalidate(dbapi_connection, connection_record, exception):
    "listen for the 'invalidate' event"

    # ... (event handling logic) ...

ConnectionPoolEntry.invalidate() 方法被调用时,无论是通过 API 使用还是通过“自动失效”,都会调用此事件,但没有 soft 标志。

该事件在最终尝试对连接调用 .close() 之前发生。

参数:
method sqlalchemy.events.PoolEvents.reset(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, reset_state: PoolResetState) None

在对池化连接执行“重置”操作之前调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'reset')
def receive_reset(dbapi_connection, connection_record, reset_state):
    "listen for the 'reset' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-2.0, will be removed in a future release)
@event.listens_for(SomeEngineOrPool, 'reset')
def receive_reset(dbapi_connection, connection_record):
    "listen for the 'reset' event"

    # ... (event handling logic) ...

在 2.0 版更改: The PoolEvents.reset() 事件现在接受参数 PoolEvents.reset.dbapi_connection, PoolEvents.reset.connection_record, PoolEvents.reset.reset_state. 对接受上面列出的先前参数签名的监听器函数的“弃用”支持将在未来版本中删除。

该事件表示在将 DBAPI 连接返回池或丢弃之前,在 DBAPI 连接上调用 rollback() 方法时。可以使用此事件钩子实现自定义的“重置”策略,也可以将其与使用 Pool.reset_on_return 参数禁用默认“重置”行为相结合。

The PoolEvents.reset()PoolEvents.checkin() 事件的主要区别在于,PoolEvents.reset() 不仅对正在返回池的池化连接调用,还对使用 Connection.detach() 方法分离的连接以及由于在连接检查之前发生的垃圾回收而被丢弃的 asyncio 连接调用。

请注意,该事件不会针对使用 Connection.invalidate() 使失效的连接调用。可以使用 PoolEvents.soft_invalidate()PoolEvents.invalidate() 事件钩子拦截这些事件,并且可以使用 PoolEvents.close() 拦截所有“连接关闭”事件。

The PoolEvents.reset() 事件通常紧随 PoolEvents.checkin() 事件,但连接在重置后立即被丢弃的情况除外。

参数:
method sqlalchemy.events.PoolEvents.soft_invalidate(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, exception: BaseException | None) None

当一个 DBAPI 连接要被“软失效”时调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'soft_invalidate')
def receive_soft_invalidate(dbapi_connection, connection_record, exception):
    "listen for the 'soft_invalidate' event"

    # ... (event handling logic) ...

ConnectionPoolEntry.invalidate() 方法被调用时,无论是否有 soft 标志,都会调用此事件。

软失效是指跟踪此连接的连接记录将在当前连接被检查后强制重新连接。它不会在调用时主动关闭 dbapi_connection。

参数:
class sqlalchemy.events.PoolResetState

描述 DBAPI 连接在传递给 PoolEvents.reset() 连接池事件时的状态。

新增于版本 2.0.0b3。

attribute sqlalchemy.events.PoolResetState.asyncio_safe: bool

指示重置操作是否发生在 asyncio 应用程序中预期存在封闭事件循环的范围内。

如果连接正在被垃圾回收,则为 False。

attribute sqlalchemy.events.PoolResetState.terminate_only: bool

指示连接是否应立即终止,而不签入到连接池。

这发生在连接无效的情况下,以及 asyncio 连接未被调用代码干净地处理而被垃圾回收的情况下。 在后一种情况下,无法在垃圾回收中安全地在 asyncio 连接上运行操作,因为不一定存在事件循环。

attribute sqlalchemy.events.PoolResetState.transaction_was_reset: bool

指示 DBAPI 连接上的事务是否已由 Connection 对象基本“重置”。

如果 Connection 上存在事务状态,并且未使用 Connection.rollback()Connection.commit() 方法关闭,则此布尔值为 True;相反,事务是在 Connection.close() 方法中内联关闭的,因此当达到此事件时,保证不出现。

SQL 执行和连接事件

对象名称 描述

ConnectionEvents

ConnectionEngine 可用的事件。

DialectEvents

用于执行替换函数的事件接口。

class sqlalchemy.events.ConnectionEvents

ConnectionEngine 可用的事件。

这里的方法定义了事件名称以及传递给监听器函数的成员名称。

事件监听器可以与任何 ConnectionEngine 类或实例相关联,例如 Engine,例如

from sqlalchemy import event, create_engine

def before_cursor_execute(conn, cursor, statement, parameters, context,
                                                executemany):
    log.info("Received statement: %s", statement)

engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/test')
event.listen(engine, "before_cursor_execute", before_cursor_execute)

或与特定 Connection

with engine.begin() as conn:
    @event.listens_for(conn, 'before_cursor_execute')
    def before_cursor_execute(conn, cursor, statement, parameters,
                                    context, executemany):
        log.info("Received statement: %s", statement)

当方法使用 statement 参数调用时,例如在 after_cursor_execute()before_cursor_execute() 中,该语句是为传输到连接的 Dialect 中的 DBAPI cursor 而准备的准确 SQL 字符串。

before_execute()before_cursor_execute() 事件也可以使用 retval=True 标志建立,它允许修改发送到数据库的语句和参数。 before_cursor_execute() 事件在这里特别有用,可以对所有执行添加临时字符串转换,例如注释。

from sqlalchemy.engine import Engine
from sqlalchemy import event

@event.listens_for(Engine, "before_cursor_execute", retval=True)
def comment_sql_calls(conn, cursor, statement, parameters,
                                    context, executemany):
    statement = statement + " -- some comment"
    return statement, parameters

注意

ConnectionEvents 可以建立在 EngineConnection 以及每个类的实例的任何组合上。 对于给定的 Connection 实例,所有四个范围内的事件都会触发。 但是,出于性能原因,Connection 对象在实例化时确定其父 Engine 是否已建立事件监听器。 添加到 Engine 类或 Engine 实例上的事件监听器,相关联的 Connection 实例实例化之后,通常不会在该 Connection 实例上可用。 新添加的监听器将改为对在父 Engine 类或实例上建立这些事件监听器之后创建的 Connection 实例生效。

参数:

retval=False – 仅适用于 before_execute()before_cursor_execute() 事件。 如果为 True,则用户定义的事件函数必须有一个返回值,该返回值是参数元组,用于替换给定的语句和参数。 请参阅这些方法以了解特定返回参数的描述。

类签名

sqlalchemy.events.ConnectionEvents (sqlalchemy.event.Events)

方法 sqlalchemy.events.ConnectionEvents.after_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) None

在执行后拦截低级别游标 execute() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'after_cursor_execute')
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    "listen for the 'after_cursor_execute' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • cursor – DBAPI 游标对象。如果语句是 SELECT,则将有结果待处理,但这些结果不应被消费,因为 CursorResult 会需要它们。

  • statement – 字符串 SQL 语句,如传递给 DBAPI

  • parameters – 传递给 DBAPI cursorexecute()executemany() 方法的字典、元组或参数列表。在某些情况下可能是 None

  • contextExecutionContext 对象正在使用中。可能是 None

  • executemany – 布尔值,如果为 True,则为 executemany() 调用,如果为 False,则为 execute() 调用。

方法 sqlalchemy.events.ConnectionEvents.after_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions, result: Result[Any]) None

在执行后拦截高级 execute() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'after_execute')
def receive_after_execute(conn, clauseelement, multiparams, params, execution_options, result):
    "listen for the 'after_execute' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-1.4, will be removed in a future release)
@event.listens_for(SomeEngine, 'after_execute')
def receive_after_execute(conn, clauseelement, multiparams, params, result):
    "listen for the 'after_execute' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • clauseelement – SQL 表达式构造、Compiled 实例或传递给 Connection.execute() 的字符串语句。

  • multiparams – 多个参数集,一个字典列表。

  • params – 单个参数集,一个字典。

  • execution_options

    与语句一起传递的执行选项字典(如果有)。这是将使用的所有选项的合并,包括语句、连接的选项,以及针对 2.0 风格执行传递给方法本身的选项。

  • result – 由执行生成的 CursorResult

方法 sqlalchemy.events.ConnectionEvents.before_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) Tuple[str, _DBAPIAnyExecuteParams] | None

在执行之前拦截低级别游标 execute() 事件,接收要针对游标调用的字符串 SQL 语句和 DBAPI 特定的参数列表。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    "listen for the 'before_cursor_execute' event"

    # ... (event handling logic) ...

此事件是记录和对 SQL 字符串进行后期修改的良好选择。它不太适合参数修改,除了那些特定于目标后端的修改。

此事件可以选择使用 retval=True 标志建立。在这种情况下,statementparameters 参数应作为二元组返回。

@event.listens_for(Engine, "before_cursor_execute", retval=True)
def before_cursor_execute(conn, cursor, statement,
                parameters, context, executemany):
    # do something with statement, parameters
    return statement, parameters

参见 ConnectionEvents 中的示例。

参数:
  • connConnection 对象

  • cursor – DBAPI 游标对象

  • statement – 字符串 SQL 语句,将被传递到 DBAPI

  • parameters – 字典、元组或参数列表,被传递到 DBAPI cursorexecute()executemany() 方法。在某些情况下可能是 None

  • contextExecutionContext 对象正在使用。可能是 None

  • executemany – 布尔值,如果为 True,则为 executemany() 调用,如果为 False,则为 execute() 调用。

method sqlalchemy.events.ConnectionEvents.before_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions) Tuple[Executable, _CoreMultiExecuteParams, _CoreSingleExecuteParams] | None

拦截高级 execute() 事件,在渲染成 SQL 之前接收未编译的 SQL 结构和其他对象。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'before_execute')
def receive_before_execute(conn, clauseelement, multiparams, params, execution_options):
    "listen for the 'before_execute' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-1.4, will be removed in a future release)
@event.listens_for(SomeEngine, 'before_execute')
def receive_before_execute(conn, clauseelement, multiparams, params):
    "listen for the 'before_execute' event"

    # ... (event handling logic) ...

版本 1.4 中变更: The ConnectionEvents.before_execute() 事件现在接受参数 ConnectionEvents.before_execute.connConnectionEvents.before_execute.clauseelementConnectionEvents.before_execute.multiparamsConnectionEvents.before_execute.paramsConnectionEvents.before_execute.execution_options。对接受上面列出的先前参数签名的监听器函数的“已弃用”支持将在未来的版本中移除。

此事件非常适合调试 SQL 编译问题,以及对发送到数据库的参数进行早期操作,因为参数列表在此处将采用一致的格式。

此事件可以选择使用 retval=True 标志建立。在这种情况下,clauseelementmultiparamsparams 参数应作为三元组返回。

@event.listens_for(Engine, "before_execute", retval=True)
def before_execute(conn, clauseelement, multiparams, params):
    # do something with clauseelement, multiparams, params
    return clauseelement, multiparams, params
参数:
  • connConnection 对象

  • clauseelement – SQL 表达式结构、Compiled 实例,或传递到 Connection.execute() 的字符串语句。

  • multiparams – 多个参数集,字典列表。

  • params – 单个参数集,单个字典。

  • execution_options

    与语句一起传递的执行选项字典(如果有)。这是将使用的所有选项的合并,包括语句、连接的选项,以及针对 2.0 风格执行传递给方法本身的选项。

method sqlalchemy.events.ConnectionEvents.begin(conn: Connection) None

拦截 begin() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'begin')
def receive_begin(conn):
    "listen for the 'begin' event"

    # ... (event handling logic) ...
参数:

connConnection 对象

method sqlalchemy.events.ConnectionEvents.begin_twophase(conn: Connection, xid: Any) None

拦截 begin_twophase() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'begin_twophase')
def receive_begin_twophase(conn, xid):
    "listen for the 'begin_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.commit(conn: Connection) None

拦截 commit() 事件,由 Transaction 启动。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'commit')
def receive_commit(conn):
    "listen for the 'commit' event"

    # ... (event handling logic) ...

请注意,如果将 reset_on_return 标志设置为值 'commit'Pool 还会在检入时“自动提交”DBAPI 连接。要拦截此提交,请使用 PoolEvents.reset() 钩子。

参数:

connConnection 对象

method sqlalchemy.events.ConnectionEvents.commit_twophase(conn: Connection, xid: Any, is_prepared: bool) None

拦截 commit_twophase() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'commit_twophase')
def receive_commit_twophase(conn, xid, is_prepared):
    "listen for the 'commit_twophase' event"

    # ... (event handling logic) ...
参数:
attribute sqlalchemy.events.ConnectionEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.ConnectionEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.ConnectionEvents.engine_connect(conn: Connection) None

拦截新创建的 Connection

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn):
    "listen for the 'engine_connect' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-2.0, will be removed in a future release)
@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn, branch):
    "listen for the 'engine_connect' event"

    # ... (event handling logic) ...

在版本 2.0 中更改: 现在,ConnectionEvents.engine_connect() 事件接受参数 ConnectionEvents.engine_connect.conn。在将来的版本中,将删除对接受上面列出的先前参数签名的监听器函数的支持,这些签名将被视为“已弃用”。

此事件通常是在直接调用 Engine.connect() 方法时被调用。

它与 PoolEvents.connect() 方法不同,后者指的是 DBAPI 级别上的实际数据库连接;DBAPI 连接可以被池化并在许多操作中重复使用。相反,此事件仅指的是在这样的 DBAPI 连接上生成一个更高级别的 Connection 包装器。

它也与 PoolEvents.checkout() 事件不同,因为它与 Connection 对象有关,而不是 PoolEvents.checkout() 处理的 DBAPI 连接,尽管可以通过 Connection.connection 属性在此处获得此 DBAPI 连接。但请注意,如果该 Connection 无效并重新建立,在单个 Connection 对象的生命周期内实际上可能有多个 PoolEvents.checkout() 事件。

参数:

connConnection 对象。

另请参阅

PoolEvents.checkout() 是针对单个 DBAPI 连接的低级池检出事件

method sqlalchemy.events.ConnectionEvents.engine_disposed(engine: Engine) None

拦截调用 Engine.dispose() 方法时的事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'engine_disposed')
def receive_engine_disposed(engine):
    "listen for the 'engine_disposed' event"

    # ... (event handling logic) ...

Engine.dispose() 方法指示引擎“处理”它的连接池(例如,Pool),并将其替换为一个新的池。处理旧池会关闭已签入的现有连接。新池不会建立任何新的连接,直到它第一次被使用。

此事件可用于指示与 Engine 相关的资源也应清理,请记住,Engine 仍可用于新请求,在这种情况下,它会重新获取连接资源。

method sqlalchemy.events.ConnectionEvents.prepare_twophase(conn: Connection, xid: Any) None

拦截 prepare_twophase() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'prepare_twophase')
def receive_prepare_twophase(conn, xid):
    "listen for the 'prepare_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.release_savepoint(conn: Connection, name: str, context: None) None

拦截 release_savepoint() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'release_savepoint')
def receive_release_savepoint(conn, name, context):
    "listen for the 'release_savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 为保存点指定的名字。

  • context – 不使用

method sqlalchemy.events.ConnectionEvents.rollback(conn: Connection) None

拦截由 Transaction 启动的 rollback() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback')
def receive_rollback(conn):
    "listen for the 'rollback' event"

    # ... (event handling logic) ...

注意,如果 reset_on_return 标志设置为其默认值 'rollback',则 Pool 也会在检入时“自动回滚”DBAPI连接。要拦截此回滚,请使用 PoolEvents.reset() 钩子。

参数:

connConnection 对象

另请参阅

PoolEvents.reset()

method sqlalchemy.events.ConnectionEvents.rollback_savepoint(conn: Connection, name: str, context: None) None

拦截 rollback_savepoint() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback_savepoint')
def receive_rollback_savepoint(conn, name, context):
    "listen for the 'rollback_savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 为保存点指定的名字。

  • context – 不使用

method sqlalchemy.events.ConnectionEvents.rollback_twophase(conn: Connection, xid: Any, is_prepared: bool) None

拦截 rollback_twophase() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback_twophase')
def receive_rollback_twophase(conn, xid, is_prepared):
    "listen for the 'rollback_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.savepoint(conn: Connection, name: str) None

拦截 savepoint() 事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'savepoint')
def receive_savepoint(conn, name):
    "listen for the 'savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 为保存点指定的名字。

method sqlalchemy.events.ConnectionEvents.set_connection_execution_options(conn: Connection, opts: Dict[str, Any]) None

拦截 Connection.execution_options() 方法被调用时的情况。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'set_connection_execution_options')
def receive_set_connection_execution_options(conn, opts):
    "listen for the 'set_connection_execution_options' event"

    # ... (event handling logic) ...

此方法在生成新的 Connection 并更新其执行选项集合后,但在 Dialect 对任何这些新选项进行操作之前被调用。

注意,当生成新的 Connection 并且该连接继承其父 Engine 的执行选项时,不会调用此方法;要拦截这种情况,请使用 ConnectionEvents.engine_connect() 事件。

参数:
  • conn – 新复制的 Connection 对象

  • opts

    传递给 Connection.execution_options() 方法的选项字典。可以对该字典进行就地修改,以影响最终生效的选项。

    2.0 版新增: 可以对 opts 字典进行就地修改。

另请参阅

ConnectionEvents.set_engine_execution_options() - 当调用 Engine.execution_options() 时调用的事件。

method sqlalchemy.events.ConnectionEvents.set_engine_execution_options(engine: Engine, opts: Dict[str, Any]) None

拦截调用 Engine.execution_options() 方法时的操作。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'set_engine_execution_options')
def receive_set_engine_execution_options(engine, opts):
    "listen for the 'set_engine_execution_options' event"

    # ... (event handling logic) ...

Engine.execution_options() 方法会生成 Engine 的浅拷贝,该拷贝存储新的选项。这里会传递该新的 Engine 对象。此方法的一个特定应用是,将 ConnectionEvents.engine_connect() 事件处理程序添加到给定的 Engine,该处理程序将针对这些执行选项执行一些特定于每个 Connection 的任务。

参数:
  • conn – 新复制的 Engine 对象

  • opts

    传递给 Connection.execution_options() 方法的选项字典。可以对该字典进行就地修改,以影响最终生效的选项。

    2.0 版新增: 可以对 opts 字典进行就地修改。

class sqlalchemy.events.DialectEvents

用于执行替换函数的事件接口。

这些事件允许直接检测和替换与 DBAPI 交互的关键方言函数。

注意

DialectEvents 挂钩应被视为 **半公开** 且处于实验阶段。这些挂钩不适用于一般用途,仅适用于必须将 DBAPI 机制精细地重新声明到现有方言上的情况。对于通用语句拦截事件,请使用 ConnectionEvents 接口。

类签名

class sqlalchemy.events.DialectEvents (sqlalchemy.event.Events)

attribute sqlalchemy.events.DialectEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DialectEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.DialectEvents.do_connect(dialect: Dialect, conn_rec: ConnectionPoolEntry, cargs: Tuple[Any, ...], cparams: Dict[str, Any]) DBAPIConnection | None

在建立连接之前接收连接参数。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    "listen for the 'do_connect' event"

    # ... (event handling logic) ...

此事件很有用,因为它允许处理程序操作控制如何调用 DBAPI connect() 函数的 cargs 和/或 cparams 集合。 cargs 将始终是 Python 列表,可以在其中就地修改,而 cparams 是 Python 字典,也可以在其中修改

e = create_engine("postgresql+psycopg2://user@host/dbname")

@event.listens_for(e, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    cparams["password"] = "some_password"

事件挂钩也可以用于完全覆盖对 connect() 的调用,方法是返回一个非 None DBAPI 连接对象

e = create_engine("postgresql+psycopg2://user@host/dbname")

@event.listens_for(e, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    return psycopg2.connect(*cargs, **cparams)
method sqlalchemy.events.DialectEvents.do_execute(cursor: DBAPICursor, statement: str, parameters: _DBAPISingleExecuteParams, context: ExecutionContext) Literal[True] | None

接收一个要执行 execute() 的游标。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_execute')
def receive_do_execute(cursor, statement, parameters, context):
    "listen for the 'do_execute' event"

    # ... (event handling logic) ...

返回值 True 以停止进一步的事件调用,并表明游标执行已在事件处理程序中完成。

method sqlalchemy.events.DialectEvents.do_execute_no_params(cursor: DBAPICursor, statement: str, context: ExecutionContext) Literal[True] | None

接收一个游标,以便调用不带参数的 execute() 方法。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_execute_no_params')
def receive_do_execute_no_params(cursor, statement, context):
    "listen for the 'do_execute_no_params' event"

    # ... (event handling logic) ...

返回值 True 以停止进一步的事件调用,并表明游标执行已在事件处理程序中完成。

method sqlalchemy.events.DialectEvents.do_executemany(cursor: DBAPICursor, statement: str, parameters: _DBAPIMultiExecuteParams, context: ExecutionContext) Literal[True] | None

接收一个游标,以便调用 executemany() 方法。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_executemany')
def receive_do_executemany(cursor, statement, parameters, context):
    "listen for the 'do_executemany' event"

    # ... (event handling logic) ...

返回值 True 以停止进一步的事件调用,并表明游标执行已在事件处理程序中完成。

method sqlalchemy.events.DialectEvents.do_setinputsizes(inputsizes: Dict[BindParameter[Any], Any], cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext) None

接收 setinputsizes 字典以供修改。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_setinputsizes')
def receive_do_setinputsizes(inputsizes, cursor, statement, parameters, context):
    "listen for the 'do_setinputsizes' event"

    # ... (event handling logic) ...

当方言使用 DBAPI 的 cursor.setinputsizes() 方法传递有关特定语句的参数绑定的信息时,会发出此事件。给定的 inputsizes 字典将包含 BindParameter 对象作为键,链接到 DBAPI 特定类型的对象作为值;对于未绑定的参数,它们将被添加到字典中,值是 None,这意味着该参数不会包含在最终的 setinputsizes 调用中。此事件可用于检查和/或记录正在绑定的数据类型,以及修改字典。可以在此字典中添加、修改或删除参数。调用者通常需要检查给定绑定对象的 BindParameter.type 属性以做出有关 DBAPI 对象的决定。

在事件发生后,inputsizes 字典将被转换为适合传递给 cursor.setinputsizes 的数据结构;对于位置绑定参数执行样式,它是列表,对于命名绑定参数执行样式,它是字符串参数键到 DBAPI 类型对象的字典。

setinputsizes 钩子通常仅用于包含标志 use_setinputsizes=True 的方言。使用此标志的方言包括 cx_Oracle、pg8000、asyncpg 和 pyodbc 方言。

注意

对于 pyodbc 的使用,必须将 use_setinputsizes 标志传递给方言,例如:

create_engine("mssql+pyodbc://...", use_setinputsizes=True)

另请参阅

Setinputsizes 支持

新添加于 1.2.9 版本。

method sqlalchemy.events.DialectEvents.handle_error(exception_context: ExceptionContext) BaseException | None

拦截由 Dialect 处理的所有异常,通常包括但在不限于在 Connection 范围内发出的异常。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'handle_error')
def receive_handle_error(exception_context):
    "listen for the 'handle_error' event"

    # ... (event handling logic) ...

更改于 2.0 版本: DialectEvents.handle_error() 事件被移动到 DialectEvents 类,从 ConnectionEvents 类移动,以便它也可以参与使用 create_engine.pool_pre_ping 参数配置的“预 ping”操作。该事件仍然使用 Engine 作为事件目标进行注册,但请注意,将 Connection 作为 DialectEvents.handle_error() 的事件目标不再受支持。

这包括 DBAPI 发出的所有异常以及 SQLAlchemy 语句调用过程中发出的异常,包括编码错误和其他语句验证错误。该事件调用的其他区域包括事务开始和结束、结果行提取、游标创建。

请注意,handle_error() 可能会在 任何时候 支持与异常相对应的新类型异常和新调用场景。使用此事件的代码必须期望在次要版本中出现新的调用模式。

为了支持与异常相对应的各种成员,以及允许在不破坏向后兼容性的情况下扩展事件,接收的唯一参数是 ExceptionContext 的实例。此对象包含表示有关异常的详细信息的数据成员。

此钩子支持的用例包括

  • 用于记录和调试目的的只读、低级别异常处理

  • 确定 DBAPI 连接错误消息是否指示需要重新连接数据库连接,包括某些方言使用的“pre_ping”处理程序

  • 根据特定异常,设置或禁用连接或所属连接池是否被标记为无效或过期。

  • 异常重写

当失败操作(如果有)的游标仍然处于打开且可访问状态时,会调用该钩子。可以对该游标调用特殊的清理操作;SQLAlchemy 会尝试在调用该钩子之后关闭该游标。

从 SQLAlchemy 2.0 开始,使用 create_engine.pool_pre_ping 参数启用的“pre_ping”处理程序也将参与 handle_error() 过程,**对于那些依赖断开连接代码来检测数据库活动状态的方言来说**。请注意,某些方言,例如 psycopg、psycopg2 以及大多数 MySQL 方言,使用 DBAPI 提供的原生 ping() 方法,该方法不使用断开连接代码。

版本 2.0.0 中的变更: DialectEvents.handle_error() 事件钩子参与连接池的“预 ping”操作。在这种情况下,ExceptionContext.engine 属性将为 None,但是正在使用的 Dialect 始终可以通过 ExceptionContext.dialect 属性获取。

版本 2.0.5 中的变更: 添加了 ExceptionContext.is_pre_ping 属性,当 DialectEvents.handle_error() 事件钩子在连接池预 ping 操作中触发时,该属性将被设置为 True

版本 2.0.5 中的变更: 修复了一个问题,该问题允许 PostgreSQL psycopgpsycopg2 驱动程序以及所有 MySQL 驱动程序在连接池“预 ping”操作期间正确参与 DialectEvents.handle_error() 事件钩子;之前,这些驱动程序的实现无法正常工作。

处理程序函数有两个选项可以将 SQLAlchemy 生成的异常替换为用户定义的异常。它可以直接抛出此新异常,在这种情况下,将绕过所有其他事件监听器,并且异常将在进行适当的清理后抛出。

@event.listens_for(Engine, "handle_error")
def handle_exception(context):
    if isinstance(context.original_exception,
        psycopg2.OperationalError) and \
        "failed" in str(context.original_exception):
        raise MySpecialException("failed operation")

警告

由于 DialectEvents.handle_error() 事件专门允许将异常重新抛出为失败语句最终抛出的异常,如果用户定义的事件处理程序本身失败并抛出意外异常,堆栈跟踪将具有误导性;堆栈跟踪可能无法显示实际失败的代码行!建议在此处谨慎编写代码,并在发生意外异常时使用日志记录和/或内联调试。

或者,可以使用一种“链式”的事件处理方式,通过使用 retval=True 修饰符配置处理程序,并从函数中返回新的异常实例。在这种情况下,事件处理将继续进行到下一个处理程序。“链式”异常可以通过 ExceptionContext.chained_exception 获取。

@event.listens_for(Engine, "handle_error", retval=True)
def handle_exception(context):
    if context.chained_exception is not None and \
        "special" in context.chained_exception.message:
        return MySpecialException("failed",
            cause=context.chained_exception)

返回 None 的处理程序可以在链中使用;当处理程序返回 None 时,将保留以前的异常实例(如果有)作为当前异常,并将其传递到下一个处理程序。

当抛出或返回自定义异常时,SQLAlchemy 会照原样抛出此新异常,不会将其包装在任何 SQLAlchemy 对象中。如果异常不是 sqlalchemy.exc.StatementError 的子类,则某些功能可能不可用;目前,这包括 ORM 在自动刷新过程中抛出的异常中添加有关“自动刷新”的详细信息提示的功能。

参数:

context – 一个 ExceptionContext 对象。有关所有可用成员的详细信息,请参见此类。

模式事件

对象名称 描述

DDLEvents

为模式对象定义事件监听器,即 SchemaItem 和其他 SchemaEventTarget 子类,包括 MetaDataTableColumn 等。

SchemaEventTarget

作为 DDLEvents 事件目标的元素的基类。

class sqlalchemy.events.DDLEvents

为模式对象定义事件监听器,即 SchemaItem 和其他 SchemaEventTarget 子类,包括 MetaDataTableColumn 等。

创建/删除事件

当向数据库发出 CREATE 和 DROP 命令时,会发出事件。此类别的事件钩子包括 DDLEvents.before_create()DDLEvents.after_create()DDLEvents.before_drop()DDLEvents.after_drop()

当使用模式级方法(例如 MetaData.create_all()MetaData.drop_all())时,会发出这些事件。每个对象的创建/删除方法(例如 Table.create()Table.drop()Index.create())也包括在内,以及方言特定的方法,例如 ENUM.create()

版本 2.0 中的新增功能: DDLEvents 事件钩子现在适用于非表格对象,包括约束、索引和方言特定的模式类型。

事件钩子可以直接附加到 Table 对象或 MetaData 集合,以及可以单独创建和删除的任何 SchemaItem 类或对象,这些对象可以使用不同的 SQL 命令进行单独创建和删除。此类类包括 IndexSequence 以及方言特定的类,例如 ENUM

使用 DDLEvents.after_create() 事件的示例,其中自定义事件钩子将在发出 CREATE TABLE 之后,在当前连接上发出 ALTER TABLE 命令

from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import Table, Column, Metadata, Integer

m = MetaData()
some_table = Table('some_table', m, Column('data', Integer))

@event.listens_for(some_table, "after_create")
def after_create(target, connection, **kw):
    connection.execute(text(
        "ALTER TABLE %s SET name=foo_%s" % (target.name, target.name)
    ))


some_engine = create_engine("postgresql://scott:tiger@host/test")

# will emit "CREATE TABLE some_table" as well as the above
# "ALTER TABLE" statement afterwards
m.create_all(some_engine)

约束对象,例如 ForeignKeyConstraintUniqueConstraintCheckConstraint 也可以订阅这些事件,但是它们通常不会产生事件,因为这些对象通常在封闭的 CREATE TABLE 语句中内联渲染,并在 DROP TABLE 语句中隐式删除。

对于 Index 结构,事件钩子将为 CREATE INDEX 发出,但是 SQLAlchemy 通常不会在删除表时发出 DROP INDEX,因为这在 DROP TABLE 语句中是隐式的。

版本 2.0 中新增: SchemaItem 对象在创建/删除事件中的支持扩展了其之前对 MetaDataTable 的支持,还包括 Constraint 和所有子类,IndexSequence 和一些与类型相关的结构,例如 ENUM

注意

这些事件钩子仅在 SQLAlchemy 的创建/删除方法范围内发出;它们不一定受 alembic 等工具支持。

附加事件

提供附加事件来自定义行为,无论何时将子模式元素与父元素关联,例如,当将 Column 与其 Table 关联时,当将 ForeignKeyConstraintTable 关联时,等等。这些事件包括 DDLEvents.before_parent_attach()DDLEvents.after_parent_attach()

反射事件

当对数据库表的 反射 进行时,DDLEvents.column_reflect() 事件用于拦截和修改数据库列的 Python 中定义。

与通用 DDL 结合使用

DDL 事件与 DDL 类和 ExecutableDDLElement 的 DDL 子句结构层次结构紧密集成,这些结构本身也适合作为监听器可调用对象

from sqlalchemy import DDL
event.listen(
    some_table,
    "after_create",
    DDL("ALTER TABLE %(table)s SET name=foo_%(table)s")
)

事件传播到 MetaData 副本

对于所有 DDLEvent 事件,propagate=True 关键字参数将确保将给定的事件处理程序传播到对象的副本,这些副本是在使用 Table.to_metadata() 方法时创建的

from sqlalchemy import DDL

metadata = MetaData()
some_table = Table("some_table", metadata, Column("data", Integer))

event.listen(
    some_table,
    "after_create",
    DDL("ALTER TABLE %(table)s SET name=foo_%(table)s"),
    propagate=True
)

new_metadata = MetaData()
new_table = some_table.to_metadata(new_metadata)

上面的 DDL 对象将与 some_tablenew_tableDDLEvents.after_create() 事件关联 Table 对象。

类签名

class sqlalchemy.events.DDLEvents (sqlalchemy.event.Events)

method sqlalchemy.events.DDLEvents.after_create(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 CREATE 语句后调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_create')
def receive_after_create(target, connection, **kw):
    "listen for the 'after_create' event"

    # ... (event handling logic) ...
参数:
  • target

    the SchemaObject, such as a MetaData or Table but also including all create/drop objects such as Index, Sequence, etc., object which is the target of the event.

    版本 2.0 中新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – the Connection where the CREATE statement or statements have been emitted.

  • **kw – additional keyword arguments relevant to the event. The contents of this dictionary may vary across releases, and include the list of tables being generated for a metadata-level event, the checkfirst flag, and other elements used by internal events.

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

method sqlalchemy.events.DDLEvents.after_drop(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 DROP 语句后调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_drop')
def receive_after_drop(target, connection, **kw):
    "listen for the 'after_drop' event"

    # ... (event handling logic) ...
参数:
  • target

    the SchemaObject, such as a MetaData or Table but also including all create/drop objects such as Index, Sequence, etc., object which is the target of the event.

    版本 2.0 中新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 发出 DROP 语句的 Connection

  • **kw – 与事件相关的其他关键字参数。此字典的内容可能在不同版本之间有所不同,并包含为元数据级事件生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

method sqlalchemy.events.DDLEvents.after_parent_attach(target: SchemaEventTarget, parent: SchemaItem) None

SchemaItem 与父 SchemaItem 关联后调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_parent_attach')
def receive_after_parent_attach(target, parent):
    "listen for the 'after_parent_attach' event"

    # ... (event handling logic) ...
参数:
  • target – 目标对象

  • parent – 目标附加到的父对象。

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

method sqlalchemy.events.DDLEvents.before_create(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 CREATE 语句之前调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_create')
def receive_before_create(target, connection, **kw):
    "listen for the 'before_create' event"

    # ... (event handling logic) ...
参数:
  • target

    the SchemaObject, such as a MetaData or Table but also including all create/drop objects such as Index, Sequence, etc., object which is the target of the event.

    版本 2.0 中新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 将发出 CREATE 语句的 Connection

  • **kw – 与事件相关的其他关键字参数。此字典的内容可能在不同版本之间有所不同,并包含为元数据级事件生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

listen() 接受 insert=True 修饰符用于此事件;如果为 True,则监听器函数将在发现时被追加到内部事件列表,并在未传递此参数的已注册监听器函数之前执行。

method sqlalchemy.events.DDLEvents.before_drop(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 DROP 语句之前调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_drop')
def receive_before_drop(target, connection, **kw):
    "listen for the 'before_drop' event"

    # ... (event handling logic) ...
参数:
  • target

    the SchemaObject, such as a MetaData or Table but also including all create/drop objects such as Index, Sequence, etc., object which is the target of the event.

    版本 2.0 中新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 将发出 DROP 语句的 Connection

  • **kw – 与事件相关的其他关键字参数。此字典的内容可能在不同版本之间有所不同,并包含为元数据级事件生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

method sqlalchemy.events.DDLEvents.before_parent_attach(target: SchemaEventTarget, parent: SchemaItem) None

SchemaItem 与父 SchemaItem 关联之前调用。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_parent_attach')
def receive_before_parent_attach(target, parent):
    "listen for the 'before_parent_attach' event"

    # ... (event handling logic) ...
参数:
  • target – 目标对象

  • parent – 目标附加到的父对象。

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

方法 sqlalchemy.events.DDLEvents.column_reflect(inspector: Inspector, table: Table, column_info: ReflectedColumn) None

在反射 Table 时,针对每个检索到的“列信息”单元调用此事件。

示例参数形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'column_reflect')
def receive_column_reflect(inspector, table, column_info):
    "listen for the 'column_reflect' event"

    # ... (event handling logic) ...

此事件最常用于将其应用于特定 MetaData 实例,该事件将对该 MetaData 中所有经过反射的 Table 对象生效。

metadata = MetaData()

@event.listens_for(metadata, 'column_reflect')
def receive_column_reflect(inspector, table, column_info):
    # receives for all Table objects that are reflected
    # under this MetaData


# will use the above event hook
my_table = Table("my_table", metadata, autoload_with=some_engine)

版本 1.4.0b2 中新增: 现在可以将 DDLEvents.column_reflect() 钩子应用于 MetaData 对象以及 MetaData 类本身,它将在与目标 MetaData 关联的所有 Table 对象上生效。

它还可以应用于全局的 Table 类。

from sqlalchemy import Table

@event.listens_for(Table, 'column_reflect')
def receive_column_reflect(inspector, table, column_info):
    # receives for all Table objects that are reflected

它也可以在使用 Table.listeners 参数反射特定 Table 时应用于该特定 Table

t1 = Table(
    "my_table",
    autoload_with=some_engine,
    listeners=[
        ('column_reflect', receive_column_reflect)
    ]
)

将传递方言返回的列信息字典,并且可以对其进行修改。该字典是 Inspector.get_columns() 返回的列表中每个元素返回的字典。

在对该字典执行任何操作之前调用该事件,并且可以修改其内容;可以将以下附加键添加到字典中,以进一步修改 Column 的构造方式。

listen() 还接受 propagate=True 修饰符用于此事件;如果为 True,则监听器函数将为目标对象的任何副本建立,即在使用 Table.to_metadata() 时生成的副本。

另请参阅

从反射表自动执行列命名方案 - 在 ORM 映射文档中。

拦截列定义 - 在 Automap 文档中。

使用数据库无关类型进行反射 - 在 反射数据库对象 文档中。

属性 sqlalchemy.events.DDLEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DDLEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

sqlalchemy.events.SchemaEventTarget

作为 DDLEvents 事件目标的元素的基类。

这包括 SchemaItem 以及 SchemaType

类签名

class sqlalchemy.events.SchemaEventTarget (sqlalchemy.event.registry.EventTarget)