核心事件

本节介绍 SQLAlchemy Core 中提供的事件接口。 有关事件监听 API 的介绍,请参阅 事件。 ORM 事件在 ORM 事件 中描述。

对象名称 描述

事件

为特定目标类型定义事件监听函数。

class sqlalchemy.event.base.Events

为特定目标类型定义事件监听函数。

成员

dispatch

类签名

class sqlalchemy.event.Events (sqlalchemy.event._HasEventsDispatch)

attribute sqlalchemy.event.base.Events.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.EventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

连接池事件

对象名称 描述

PoolEvents

Pool 的可用事件。

PoolResetState

描述 DBAPI 连接在传递给 PoolEvents.reset() 连接池事件时的状态。

class sqlalchemy.events.PoolEvents

Pool 的可用事件。

这里的方法定义了事件的名称以及传递给监听器函数的成员名称。

例如:

from sqlalchemy import event


def my_on_checkout(dbapi_conn, connection_rec, connection_proxy):
    "handle an on checkout event"


event.listen(Pool, "checkout", my_on_checkout)

除了接受 Pool 类和 Pool 实例之外,PoolEvents 也接受 Engine 对象和 Engine 类作为目标,这些目标将被解析为给定 engine 的 .pool 属性或 Pool 类。

engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

# will associate with engine.pool
event.listen(engine, "checkout", my_on_checkout)

类签名

class sqlalchemy.events.PoolEvents (sqlalchemy.event.Events)

method sqlalchemy.events.PoolEvents.checkin(dbapi_connection: DBAPIConnection | None, connection_record: ConnectionPoolEntry) None

当连接返回到连接池时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'checkin')
def receive_checkin(dbapi_connection, connection_record):
    "listen for the 'checkin' event"

    # ... (event handling logic) ...

请注意,连接可能已关闭,如果连接已失效,则可能为 None。checkin 不会为分离的连接调用。(它们不返回到连接池。)

参数:
method sqlalchemy.events.PoolEvents.checkout(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, connection_proxy: PoolProxiedConnection) None

当从连接池中检索连接时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'checkout')
def receive_checkout(dbapi_connection, connection_record, connection_proxy):
    "listen for the 'checkout' event"

    # ... (event handling logic) ...
参数:

如果您引发 DisconnectionError,则当前连接将被释放,并检索一个新的连接。 所有检出监听器的处理都将中止并使用新连接重新启动。

另请参阅

ConnectionEvents.engine_connect() - 类似的事件,在新 Connection 创建时发生。

method sqlalchemy.events.PoolEvents.close(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

当 DBAPI 连接关闭时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'close')
def receive_close(dbapi_connection, connection_record):
    "listen for the 'close' event"

    # ... (event handling logic) ...

该事件在关闭发生之前发出。

连接的关闭可能会失败;通常这是因为连接已经关闭。 如果关闭操作失败,则连接将被丢弃。

close() 事件对应于仍然与连接池关联的连接。 要拦截分离连接的关闭事件,请使用 close_detached()

参数:
method sqlalchemy.events.PoolEvents.close_detached(dbapi_connection: DBAPIConnection) None

当分离的 DBAPI 连接关闭时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'close_detached')
def receive_close_detached(dbapi_connection):
    "listen for the 'close_detached' event"

    # ... (event handling logic) ...

该事件在关闭发生之前发出。

连接的关闭可能会失败;通常这是因为连接已经关闭。 如果关闭操作失败,则连接将被丢弃。

参数:

dbapi_connection – 一个 DBAPI 连接。ConnectionPoolEntry.dbapi_connection 属性。

method sqlalchemy.events.PoolEvents.connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

在为给定的 Pool 首次创建特定 DBAPI 连接时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'connect')
def receive_connect(dbapi_connection, connection_record):
    "listen for the 'connect' event"

    # ... (event handling logic) ...

此事件允许捕获在使用 DBAPI 模块级 .connect() 方法以生成新的 DBAPI 连接之后立即发生的点。

参数:
method sqlalchemy.events.PoolEvents.detach(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

当 DBAPI 连接从连接池“分离”时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'detach')
def receive_detach(dbapi_connection, connection_record):
    "listen for the 'detach' event"

    # ... (event handling logic) ...

此事件在分离发生后发出。 该连接不再与给定的连接记录关联。

参数:
attribute sqlalchemy.events.PoolEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.PoolEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.PoolEvents.first_connect(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry) None

对于从特定 Pool 检出 DBAPI 连接的首次,只调用一次。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'first_connect')
def receive_first_connect(dbapi_connection, connection_record):
    "listen for the 'first_connect' event"

    # ... (event handling logic) ...

PoolEvents.first_connect() 的基本原理是基于用于所有连接的设置,确定有关特定系列数据库连接的信息。 由于特定的 Pool 指的是单个“创建器”函数(就 Engine 而言,指的是 URL 和使用的连接选项),因此通常可以对单个连接进行观察,并可以安全地假定这些观察结果对所有后续连接都有效,例如数据库版本、服务器和客户端编码设置、排序规则设置等等。

参数:
method sqlalchemy.events.PoolEvents.invalidate(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, exception: BaseException | None) None

当 DBAPI 连接要“失效”时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'invalidate')
def receive_invalidate(dbapi_connection, connection_record, exception):
    "listen for the 'invalidate' event"

    # ... (event handling logic) ...

每当调用 ConnectionPoolEntry.invalidate() 方法时,无论来自 API 使用还是通过“自动失效”,并且没有 soft 标志,都会调用此事件。

该事件发生在最终尝试在连接上调用 .close() 之前。

参数:

另请参阅

更多关于失效

method sqlalchemy.events.PoolEvents.reset(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, reset_state: PoolResetState) None

在池连接的“重置”操作发生之前调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'reset')
def receive_reset(dbapi_connection, connection_record, reset_state):
    "listen for the 'reset' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-2.0, will be removed in a future release)
@event.listens_for(SomeEngineOrPool, 'reset')
def receive_reset(dbapi_connection, connection_record):
    "listen for the 'reset' event"

    # ... (event handling logic) ...

在版本 2.0 中更改: PoolEvents.reset() 事件现在接受参数 PoolEvents.reset.dbapi_connectionPoolEvents.reset.connection_recordPoolEvents.reset.reset_state。 对接受上面列出的先前参数签名作为“已弃用”的监听器函数的支持将在未来版本中删除。

此事件表示在 DBAPI 连接返回到连接池或被丢弃之前,rollback() 方法何时在 DBAPI 连接上被调用。 可以使用此事件挂钩实现自定义“重置”策略,该策略也可以与使用 Pool.reset_on_return 参数禁用默认“重置”行为相结合。

PoolEvents.reset()PoolEvents.checkin() 事件之间的主要区别在于,PoolEvents.reset() 不仅对正在返回到连接池的池连接调用,而且还对使用 Connection.detach() 方法分离的连接以及由于垃圾回收发生在连接检入之前而在连接上进行的 asyncio 连接调用。

请注意,对于使用 Connection.invalidate() 失效的连接,不会调用该事件。 可以使用 PoolEvents.soft_invalidate()PoolEvents.invalidate() 事件挂钩拦截这些事件,并且可以使用 PoolEvents.close() 拦截所有“连接关闭”事件。

PoolEvents.reset() 事件通常后跟 PoolEvents.checkin() 事件,除非在连接在重置后立即被丢弃的情况下。

参数:
method sqlalchemy.events.PoolEvents.soft_invalidate(dbapi_connection: DBAPIConnection, connection_record: ConnectionPoolEntry, exception: BaseException | None) None

当 DBAPI 连接要被“软失效”时调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngineOrPool, 'soft_invalidate')
def receive_soft_invalidate(dbapi_connection, connection_record, exception):
    "listen for the 'soft_invalidate' event"

    # ... (event handling logic) ...

每当使用 soft 标志调用 ConnectionPoolEntry.invalidate() 方法时,都会调用此事件。

软失效指的是跟踪此连接的连接记录将在当前连接检入后强制重新连接。 它不会在调用时主动关闭 dbapi_connection。

参数:
class sqlalchemy.events.PoolResetState

描述 DBAPI 连接在传递给 PoolEvents.reset() 连接池事件时的状态。

2.0.0b3 版本新增。

attribute sqlalchemy.events.PoolResetState.asyncio_safe: bool

指示重置操作是否发生在期望为 asyncio 应用程序提供封闭事件循环的范围内。

如果连接正在被垃圾回收,则将为 False。

attribute sqlalchemy.events.PoolResetState.terminate_only: bool

指示连接是否将立即终止,而不是检入连接池。

这发生在失效的连接,以及未被调用代码干净处理,而是被垃圾回收的 asyncio 连接。 在后一种情况下,由于不一定存在事件循环,因此无法在垃圾回收中安全地对 asyncio 连接运行操作。

attribute sqlalchemy.events.PoolResetState.transaction_was_reset: bool

指示 DBAPI 连接上的事务是否已被 Connection 对象基本“重置”回滚。

如果 Connection 在其上存在事务状态,然后未使用 Connection.rollback()Connection.commit() 方法关闭事务状态,则此布尔值为 True;相反,事务在 Connection.close() 方法中内联关闭,因此保证在此事件到达时保持不存在。

SQL 执行和连接事件

对象名称 描述

ConnectionEvents

用于 ConnectionEngine 的可用事件。

DialectEvents

用于执行替换功能的事件接口。

class sqlalchemy.events.ConnectionEvents

用于 ConnectionEngine 的可用事件。

这里的方法定义了事件的名称以及传递给监听器函数的成员名称。

事件监听器可以与任何 ConnectionEngine 类或实例关联,例如 Engine,例如

from sqlalchemy import event, create_engine


def before_cursor_execute(
    conn, cursor, statement, parameters, context, executemany
):
    log.info("Received statement: %s", statement)


engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
event.listen(engine, "before_cursor_execute", before_cursor_execute)

或与特定的 Connection

with engine.begin() as conn:

    @event.listens_for(conn, "before_cursor_execute")
    def before_cursor_execute(
        conn, cursor, statement, parameters, context, executemany
    ):
        log.info("Received statement: %s", statement)

当使用 statement 参数调用方法时,例如在 after_cursor_execute()before_cursor_execute() 中,statement 是为传输到连接的 cursor 中的 DBAPI Dialect 准备的确切 SQL 字符串。

before_execute()before_cursor_execute() 事件也可以使用 retval=True 标志建立,这允许修改要发送到数据库的 statement 和参数。 before_cursor_execute() 事件在这里特别有用,可以向所有执行添加临时的字符串转换,例如注释

from sqlalchemy.engine import Engine
from sqlalchemy import event


@event.listens_for(Engine, "before_cursor_execute", retval=True)
def comment_sql_calls(
    conn, cursor, statement, parameters, context, executemany
):
    statement = statement + " -- some comment"
    return statement, parameters

注意

ConnectionEvents 可以在 EngineConnection 以及这些类的实例的任何组合上建立。 对于给定的 Connection 实例,所有四个范围的事件都将触发。 但是,出于性能原因,Connection 对象在实例化时确定其父 Engine 是否已建立事件侦听器。 在依赖的 Connection 实例实例化添加到 Engine 类或 Engine 实例的事件侦听器通常不会在该 Connection 实例上可用。 新添加的侦听器将改为在父 Engine 类或实例上建立这些事件侦听器之后创建的 Connection 实例上生效。

参数:

retval=False – 仅适用于 before_execute()before_cursor_execute() 事件。 当为 True 时,用户定义的事件函数必须具有返回值,该返回值是替换给定 statement 和参数的参数元组。 有关特定返回参数的说明,请参阅这些方法。

method sqlalchemy.events.ConnectionEvents.after_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) None

在执行后拦截低级游标 execute() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'after_cursor_execute')
def receive_after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    "listen for the 'after_cursor_execute' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • cursor – DBAPI 游标对象。 如果语句是 SELECT,则将有结果挂起,但不应使用这些结果,因为 CursorResult 将需要它们。

  • statement – 字符串 SQL 语句,按原样传递到 DBAPI

  • parameters – 字典、元组或参数列表,这些参数将传递到 DBAPI cursorexecute()executemany() 方法。 在某些情况下,可能为 None

  • context – 使用中的 ExecutionContext 对象。 可以是 None

  • executemany – 布尔值,如果 True,则为 executemany() 调用,如果 False,则为 execute() 调用。

method sqlalchemy.events.ConnectionEvents.after_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions, result: Result[Any]) None

在执行后拦截高级 execute() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'after_execute')
def receive_after_execute(conn, clauseelement, multiparams, params, execution_options, result):
    "listen for the 'after_execute' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-1.4, will be removed in a future release)
@event.listens_for(SomeEngine, 'after_execute')
def receive_after_execute(conn, clauseelement, multiparams, params, result):
    "listen for the 'after_execute' event"

    # ... (event handling logic) ...

Changed in version 1.4: ConnectionEvents.after_execute() 事件现在接受参数 ConnectionEvents.after_execute.conn, ConnectionEvents.after_execute.clauseelement, ConnectionEvents.after_execute.multiparams, ConnectionEvents.after_execute.params, ConnectionEvents.after_execute.execution_options, ConnectionEvents.after_execute.result。 对接受上面列出的先前参数签名作为“已弃用”的侦听器函数的支持将在未来版本中删除。

参数:
  • connConnection 对象

  • clauseelement – SQL 表达式构造,Compiled 实例,或传递给 Connection.execute() 的字符串语句。

  • multiparams – 多个参数集,字典列表。

  • params – 单个参数集,单个字典。

  • execution_options

    与语句一起传递的执行选项字典(如果有)。 这是将要使用的所有选项的合并,包括语句、连接的选项以及为 2.0 风格的执行传递到方法本身的选项。

  • result – 执行生成的 CursorResult

method sqlalchemy.events.ConnectionEvents.before_cursor_execute(conn: Connection, cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext | None, executemany: bool) Tuple[str, _DBAPIAnyExecuteParams] | None

在执行前拦截低级游标 execute() 事件,接收将针对游标调用的字符串 SQL 语句和 DBAPI 特定的参数列表。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'before_cursor_execute')
def receive_before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    "listen for the 'before_cursor_execute' event"

    # ... (event handling logic) ...

此事件是日志记录以及 SQL 字符串的后期修改的不错选择。 对于参数修改,除了特定于目标后端的参数修改外,它不太理想。

此事件可以选择使用 retval=True 标志建立。 在这种情况下,statementparameters 参数应作为二元组返回

@event.listens_for(Engine, "before_cursor_execute", retval=True)
def before_cursor_execute(
    conn, cursor, statement, parameters, context, executemany
):
    # do something with statement, parameters
    return statement, parameters

请参阅 ConnectionEvents 中的示例。

参数:
  • connConnection 对象

  • cursor – DBAPI 游标对象

  • statement – 字符串 SQL 语句,将按原样传递到 DBAPI

  • parameters – 字典、元组或参数列表,这些参数将传递到 DBAPI cursorexecute()executemany() 方法。 在某些情况下,可能为 None

  • context – 正在使用的 ExecutionContext 对象。可能是 None

  • executemany – 布尔值。如果为 True,则这是一个 executemany() 调用;如果为 False,则这是一个 execute() 调用。

method sqlalchemy.events.ConnectionEvents.before_execute(conn: Connection, clauseelement: Executable, multiparams: _CoreMultiExecuteParams, params: _CoreSingleExecuteParams, execution_options: _ExecuteOptions) Tuple[Executable, _CoreMultiExecuteParams, _CoreSingleExecuteParams] | None

拦截高阶 execute() 事件,在渲染为 SQL 之前接收未编译的 SQL 构造和其他对象。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'before_execute')
def receive_before_execute(conn, clauseelement, multiparams, params, execution_options):
    "listen for the 'before_execute' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-1.4, will be removed in a future release)
@event.listens_for(SomeEngine, 'before_execute')
def receive_before_execute(conn, clauseelement, multiparams, params):
    "listen for the 'before_execute' event"

    # ... (event handling logic) ...

在版本 1.4 中更改: ConnectionEvents.before_execute() 事件现在接受参数 ConnectionEvents.before_execute.connConnectionEvents.before_execute.clauseelementConnectionEvents.before_execute.multiparamsConnectionEvents.before_execute.paramsConnectionEvents.before_execute.execution_options。对接受上述作为“已弃用”列出的先前参数签名的监听器函数的支持将在未来的版本中移除。

此事件非常适合调试 SQL 编译问题以及早期操作发送到数据库的参数,因为参数列表在此处将采用一致的格式。

此事件可以选择使用 retval=True 标志建立。在这种情况下,clauseelementmultiparamsparams 参数应作为三元组返回

@event.listens_for(Engine, "before_execute", retval=True)
def before_execute(conn, clauseelement, multiparams, params):
    # do something with clauseelement, multiparams, params
    return clauseelement, multiparams, params
参数:
  • connConnection 对象

  • clauseelement – SQL 表达式构造,Compiled 实例,或传递给 Connection.execute() 的字符串语句。

  • multiparams – 多个参数集,字典列表。

  • params – 单个参数集,单个字典。

  • execution_options

    与语句一起传递的执行选项字典(如果有)。 这是将要使用的所有选项的合并,包括语句、连接的选项以及为 2.0 风格的执行传递到方法本身的选项。

method sqlalchemy.events.ConnectionEvents.begin(conn: Connection) None

拦截 begin() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'begin')
def receive_begin(conn):
    "listen for the 'begin' event"

    # ... (event handling logic) ...
参数:

connConnection 对象

method sqlalchemy.events.ConnectionEvents.begin_twophase(conn: Connection, xid: Any) None

拦截 begin_twophase() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'begin_twophase')
def receive_begin_twophase(conn, xid):
    "listen for the 'begin_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.commit(conn: Connection) None

拦截由 Transaction 发起的 commit() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'commit')
def receive_commit(conn):
    "listen for the 'commit' event"

    # ... (event handling logic) ...

请注意,如果 reset_on_return 标志设置为 'commit' 值,则 Pool 也可能在检入时“自动提交” DBAPI 连接。要拦截此提交,请使用 PoolEvents.reset() 钩子。

参数:

connConnection 对象

method sqlalchemy.events.ConnectionEvents.commit_twophase(conn: Connection, xid: Any, is_prepared: bool) None

拦截 commit_twophase() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'commit_twophase')
def receive_commit_twophase(conn, xid, is_prepared):
    "listen for the 'commit_twophase' event"

    # ... (event handling logic) ...
参数:
attribute sqlalchemy.events.ConnectionEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.ConnectionEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.ConnectionEvents.engine_connect(conn: Connection) None

拦截新 Connection 的创建。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn):
    "listen for the 'engine_connect' event"

    # ... (event handling logic) ...

# DEPRECATED calling style (pre-2.0, will be removed in a future release)
@event.listens_for(SomeEngine, 'engine_connect')
def receive_engine_connect(conn, branch):
    "listen for the 'engine_connect' event"

    # ... (event handling logic) ...

在版本 2.0 中更改: ConnectionEvents.engine_connect() 事件现在接受参数 ConnectionEvents.engine_connect.conn。对接受上述作为“已弃用”列出的先前参数签名的监听器函数的支持将在未来的版本中移除。

此事件通常在调用 Engine.connect() 方法的直接结果时调用。

它与 PoolEvents.connect() 方法不同,后者指的是在 DBAPI 级别实际连接到数据库;DBAPI 连接可以被池化并重用于许多操作。相比之下,此事件仅指围绕此类 DBAPI 连接生成更高级别的 Connection 包装器。

它也与 PoolEvents.checkout() 事件不同,因为它特定于 Connection 对象,而不是 PoolEvents.checkout() 处理的 DBAPI 连接,尽管此 DBAPI 连接可通过 Connection.connection 属性在此处获得。但请注意,如果 Connection 对象失效并重新建立,则在单个 Connection 对象的生命周期内,实际上可能存在多个 PoolEvents.checkout() 事件。

参数:

connConnection 对象。

另请参阅

PoolEvents.checkout() 用于个体 DBAPI 连接的较低级别池检出事件

method sqlalchemy.events.ConnectionEvents.engine_disposed(engine: Engine) None

拦截 Engine.dispose() 方法被调用时。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'engine_disposed')
def receive_engine_disposed(engine):
    "listen for the 'engine_disposed' event"

    # ... (event handling logic) ...

Engine.dispose() 方法指示引擎“处置”其连接池 (例如 Pool),并用新的连接池替换它。处置旧池的效果是现有检入的连接被关闭。新池在首次使用之前不会建立任何新连接。

此事件可用于指示与 Engine 相关的资源也应被清理,请记住,Engine 仍然可以用于新的请求,在这种情况下,它会重新获取连接资源。

method sqlalchemy.events.ConnectionEvents.prepare_twophase(conn: Connection, xid: Any) None

拦截 prepare_twophase() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'prepare_twophase')
def receive_prepare_twophase(conn, xid):
    "listen for the 'prepare_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.release_savepoint(conn: Connection, name: str, context: None) None

拦截 release_savepoint() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'release_savepoint')
def receive_release_savepoint(conn, name, context):
    "listen for the 'release_savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 用于保存点的指定名称。

  • context – 未使用

method sqlalchemy.events.ConnectionEvents.rollback(conn: Connection) None

拦截由 Transaction 发起的 rollback() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback')
def receive_rollback(conn):
    "listen for the 'rollback' event"

    # ... (event handling logic) ...

请注意,如果 reset_on_return 标志设置为其默认值 'rollback',则 Pool 也会在检入时“自动回滚” DBAPI 连接。要拦截此回滚,请使用 PoolEvents.reset() 钩子。

参数:

connConnection 对象

另请参阅

PoolEvents.reset()

method sqlalchemy.events.ConnectionEvents.rollback_savepoint(conn: Connection, name: str, context: None) None

拦截 rollback_savepoint() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback_savepoint')
def receive_rollback_savepoint(conn, name, context):
    "listen for the 'rollback_savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 用于保存点的指定名称。

  • context – 未使用

method sqlalchemy.events.ConnectionEvents.rollback_twophase(conn: Connection, xid: Any, is_prepared: bool) None

拦截 rollback_twophase() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'rollback_twophase')
def receive_rollback_twophase(conn, xid, is_prepared):
    "listen for the 'rollback_twophase' event"

    # ... (event handling logic) ...
参数:
method sqlalchemy.events.ConnectionEvents.savepoint(conn: Connection, name: str) None

拦截 savepoint() 事件。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'savepoint')
def receive_savepoint(conn, name):
    "listen for the 'savepoint' event"

    # ... (event handling logic) ...
参数:
  • connConnection 对象

  • name – 用于保存点的指定名称。

method sqlalchemy.events.ConnectionEvents.set_connection_execution_options(conn: Connection, opts: Dict[str, Any]) None

拦截 Connection.execution_options() 方法被调用时。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'set_connection_execution_options')
def receive_set_connection_execution_options(conn, opts):
    "listen for the 'set_connection_execution_options' event"

    # ... (event handling logic) ...

此方法在新 Connection 生成后调用,带有新更新的执行选项集合,但在 Dialect 对任何新选项执行操作之前调用。

请注意,当生成新的 Connection,并且该连接从其父 Engine 继承执行选项时,不会调用此方法;要拦截这种情况,请使用 ConnectionEvents.engine_connect() 事件。

参数:
  • conn – 新复制的 Connection 对象

  • opts

    传递给 Connection.execution_options() 方法的选项字典。可以就地修改此字典以影响最终生效的选项。

    2.0 版本新增: opts 字典可以就地修改。

另请参阅

ConnectionEvents.set_engine_execution_options() - 当 Engine.execution_options() 被调用时调用的事件。

method sqlalchemy.events.ConnectionEvents.set_engine_execution_options(engine: Engine, opts: Dict[str, Any]) None

拦截 Engine.execution_options() 方法被调用时。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'set_engine_execution_options')
def receive_set_engine_execution_options(engine, opts):
    "listen for the 'set_engine_execution_options' event"

    # ... (event handling logic) ...

The Engine.execution_options() 方法生成 Engine 的浅拷贝,用于存储新的选项。这个新的 Engine 将在此处传递。此方法的一个特定应用是向给定的 Engine 添加一个 ConnectionEvents.engine_connect() 事件处理程序,该处理程序将执行一些特定于这些执行选项的每个 Connection 任务。

参数:
  • conn – 新复制的 Engine 对象

  • opts

    传递给 Connection.execution_options() 方法的选项字典。可以就地修改此字典以影响最终生效的选项。

    2.0 版本新增: opts 字典可以就地修改。

class sqlalchemy.events.DialectEvents

用于执行替换功能的事件接口。

这些事件允许直接检测和替换与 DBAPI 交互的关键方言函数。

注意

DialectEvents 钩子应被视为**半公开**和实验性的。 这些钩子不适用于通用场景,仅适用于必须将复杂的 DBAPI 机制重新声明注入到现有方言中的情况。 对于通用语句拦截事件,请使用 ConnectionEvents 接口。

类签名

class sqlalchemy.events.DialectEvents (sqlalchemy.event.Events)

attribute sqlalchemy.events.DialectEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DialectEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

method sqlalchemy.events.DialectEvents.do_connect(dialect: Dialect, conn_rec: ConnectionPoolEntry, cargs: Tuple[Any, ...], cparams: Dict[str, Any]) DBAPIConnection | None

在建立连接之前接收连接参数。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_connect')
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    "listen for the 'do_connect' event"

    # ... (event handling logic) ...

此事件很有用,因为它允许处理程序操作 cargs 和/或 cparams 集合,这些集合控制将如何调用 DBAPI 的 connect() 函数。 cargs 将始终是一个可以就地修改的 Python 列表,而 cparams 是一个也可以修改的 Python 字典。

e = create_engine("postgresql+psycopg2://user@host/dbname")


@event.listens_for(e, "do_connect")
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    cparams["password"] = "some_password"

事件钩子也可以用于完全覆盖对 connect() 的调用,方法是返回一个非 None 的 DBAPI 连接对象。

e = create_engine("postgresql+psycopg2://user@host/dbname")


@event.listens_for(e, "do_connect")
def receive_do_connect(dialect, conn_rec, cargs, cparams):
    return psycopg2.connect(*cargs, **cparams)
method sqlalchemy.events.DialectEvents.do_execute(cursor: DBAPICursor, statement: str, parameters: _DBAPISingleExecuteParams, context: ExecutionContext) Literal[True] | None

接收一个游标以调用 execute()。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_execute')
def receive_do_execute(cursor, statement, parameters, context):
    "listen for the 'do_execute' event"

    # ... (event handling logic) ...

返回 True 值以阻止进一步事件的调用,并指示游标执行已在事件处理程序中发生。

method sqlalchemy.events.DialectEvents.do_execute_no_params(cursor: DBAPICursor, statement: str, context: ExecutionContext) Literal[True] | None

接收一个游标以调用不带参数的 execute()。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_execute_no_params')
def receive_do_execute_no_params(cursor, statement, context):
    "listen for the 'do_execute_no_params' event"

    # ... (event handling logic) ...

返回 True 值以阻止进一步事件的调用,并指示游标执行已在事件处理程序中发生。

method sqlalchemy.events.DialectEvents.do_executemany(cursor: DBAPICursor, statement: str, parameters: _DBAPIMultiExecuteParams, context: ExecutionContext) Literal[True] | None

接收一个游标以调用 executemany()。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_executemany')
def receive_do_executemany(cursor, statement, parameters, context):
    "listen for the 'do_executemany' event"

    # ... (event handling logic) ...

返回 True 值以阻止进一步事件的调用,并指示游标执行已在事件处理程序中发生。

method sqlalchemy.events.DialectEvents.do_setinputsizes(inputsizes: Dict[BindParameter[Any], Any], cursor: DBAPICursor, statement: str, parameters: _DBAPIAnyExecuteParams, context: ExecutionContext) None

接收 setinputsizes 字典以进行可能的修改。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'do_setinputsizes')
def receive_do_setinputsizes(inputsizes, cursor, statement, parameters, context):
    "listen for the 'do_setinputsizes' event"

    # ... (event handling logic) ...

当方言使用 DBAPI 的 cursor.setinputsizes() 方法时,会发出此事件,该方法传递有关特定语句的参数绑定的信息。 给定的 inputsizes 字典将包含 BindParameter 对象作为键,链接到 DBAPI 特定的类型对象作为值; 对于未绑定的参数,它们会添加到字典中,值为 None,这意味着该参数将不会包含在最终的 setinputsizes 调用中。 该事件可用于检查和/或记录正在绑定的数据类型,以及就地修改字典。 可以在此字典中添加、修改或删除参数。 调用者通常需要检查给定绑定对象的 BindParameter.type 属性,以便确定 DBAPI 对象。

事件发生后,inputsizes 字典将转换为适当的数据结构,以传递给 cursor.setinputsizes; 对于位置绑定参数执行样式,可以是列表;对于命名绑定参数执行样式,可以是字符串参数键到 DBAPI 类型对象的字典。

总体而言,setinputsizes 钩子仅用于包含 use_setinputsizes=True 标志的方言。 使用此标志的方言包括 python-oracledb、cx_Oracle、pg8000、asyncpg 和 pyodbc 方言。

注意

为了与 pyodbc 一起使用,必须将 use_setinputsizes 标志传递给方言,例如:

create_engine("mssql+pyodbc://...", use_setinputsizes=True)

另请参阅

Setinputsizes 支持

1.2.9 版本新增。

method sqlalchemy.events.DialectEvents.handle_error(exception_context: ExceptionContext) BaseException | None

拦截由 Dialect 处理的所有异常,通常但不限于在 Connection 范围内发出的异常。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeEngine, 'handle_error')
def receive_handle_error(exception_context):
    "listen for the 'handle_error' event"

    # ... (event handling logic) ...

2.0 版本更改: DialectEvents.handle_error() 事件已移动到 DialectEvents 类,从 ConnectionEvents 类移动而来,以便它也可以参与使用 create_engine.pool_pre_ping 参数配置的“pre ping”操作。 该事件仍然通过使用 Engine 作为事件目标进行注册,但请注意,不再支持使用 Connection 作为 DialectEvents.handle_error() 的事件目标。

这包括 DBAPI 发出的所有异常以及 SQLAlchemy 的语句调用过程中发生的异常,包括编码错误和其他语句验证错误。 调用该事件的其他领域包括事务开始和结束、结果行获取、游标创建。

请注意,handle_error() 随时 可能支持新的异常类型和新的调用场景。 使用此事件的代码必须预期在次要版本中会出现新的调用模式。

为了支持与异常对应的各种成员,并允许事件的扩展性而无需向后不兼容,接收到的唯一参数是 ExceptionContext 的实例。 此对象包含表示异常详细信息的数据成员。

此钩子支持的用例包括:

  • 用于日志记录和调试目的的只读、低级别异常处理

  • 确定 DBAPI 连接错误消息是否指示数据库连接需要重新连接,包括对于 某些 方言使用的“pre_ping”处理程序

  • 确定或禁用连接或拥有的连接池是否因特定异常而无效或过期

  • 异常重写

当失败操作的游标(如果有)仍然打开且可访问时,将调用此钩子。 可以在此游标上调用特殊的清理操作; 在调用此钩子之后,SQLAlchemy 将尝试关闭此游标。

从 SQLAlchemy 2.0 开始,使用 create_engine.pool_pre_ping 参数启用的“pre-ping”处理程序也将参与 DialectEvents.handle_error() 过程,对于那些依赖断开连接代码来检测数据库活动状态的方言。 请注意,某些方言(例如 psycopg、psycopg2 和大多数 MySQL 方言)使用 DBAPI 提供的原生 ping() 方法,该方法不使用断开连接代码。

2.0.0 版本更改: DialectEvents.handle_error() 事件钩子参与连接池“pre-ping”操作。 在这种用法中,ExceptionContext.engine 属性将为 None,但是正在使用的 Dialect 始终可以通过 ExceptionContext.dialect 属性获得。

2.0.5 版本更改: 添加了 ExceptionContext.is_pre_ping 属性,当在连接池 pre-ping 操作中触发 DialectEvents.handle_error() 事件钩子时,该属性将设置为 True

2.0.5 版本更改: 修复了一个问题,该问题允许 PostgreSQL 的 psycopgpsycopg2 驱动程序以及所有 MySQL 驱动程序在连接池“pre-ping”操作期间正确参与 DialectEvents.handle_error() 事件钩子; 以前,该实现对于这些驱动程序不起作用。

处理程序函数有两种选项可以将 SQLAlchemy 构建的异常替换为用户定义的异常。 它可以直接引发这个新异常,在这种情况下,所有进一步的事件侦听器都将被绕过,并且在进行适当的清理后将引发异常。

@event.listens_for(Engine, "handle_error")
def handle_exception(context):
    if isinstance(
        context.original_exception, psycopg2.OperationalError
    ) and "failed" in str(context.original_exception):
        raise MySpecialException("failed operation")

警告

由于 DialectEvents.handle_error() 事件专门允许将异常重新抛出为失败语句引发的最终异常,因此,如果用户定义的事件处理程序本身失败并抛出意外异常,堆栈跟踪将具有误导性; 堆栈跟踪可能无法说明实际失败的代码行! 建议在此处仔细编码,并在发生意外异常时使用日志记录和/或内联调试。

或者,可以使用“链式”事件处理风格,方法是将处理程序配置为 retval=True 修饰符,并从函数返回新的异常实例。 在这种情况下,事件处理将继续到下一个处理程序。 可以使用 ExceptionContext.chained_exception 获取“链式”异常。

@event.listens_for(Engine, "handle_error", retval=True)
def handle_exception(context):
    if (
        context.chained_exception is not None
        and "special" in context.chained_exception.message
    ):
        return MySpecialException(
            "failed", cause=context.chained_exception
        )

返回 None 的处理程序可以在链中使用; 当处理程序返回 None 时,先前的异常实例(如果有)将保持为传递到下一个处理程序的当前异常。

当引发或返回自定义异常时,SQLAlchemy 会按原样引发此新异常,它不会被任何 SQLAlchemy 对象包装。 如果异常不是 sqlalchemy.exc.StatementError 的子类,则某些功能可能不可用; 目前,这包括 ORM 的功能,即向 autoflush 过程中引发的异常添加有关“autoflush”的详细提示。

参数:

context – 一个 ExceptionContext 对象。 有关所有可用成员的详细信息,请参阅此类。

Schema 事件

对象名称 描述

DDLEvents

为 schema 对象定义事件监听器,即 SchemaItem 和其他 SchemaEventTarget 子类,包括 MetaData, Table, Column 等。

SchemaEventTarget

作为 DDLEvents 事件目标的元素基类。

class sqlalchemy.events.DDLEvents

为 schema 对象定义事件监听器,即 SchemaItem 和其他 SchemaEventTarget 子类,包括 MetaData, Table, Column 等。

创建 / 删除事件

当向数据库发出 CREATE 和 DROP 命令时,将发出事件。 此类别的事件钩子包括 DDLEvents.before_create(), DDLEvents.after_create(), DDLEvents.before_drop()DDLEvents.after_drop()

当使用 schema 级别的方法(例如 MetaData.create_all()MetaData.drop_all())时,将发出这些事件。 还包括每个对象的创建/删除方法,例如 Table.create(), Table.drop(), Index.create(),以及特定于方言的方法,例如 ENUM.create()

2.0 版本新增: DDLEvents 事件钩子现在适用于非表对象,包括约束、索引和特定于方言的 schema 类型。

事件钩子可以直接附加到 Table 对象或 MetaData 集合,以及任何可以使用不同的 SQL 命令单独创建和删除的 SchemaItem 类或对象。 此类类包括 Index, Sequence 和特定于方言的类,例如 ENUM

使用 DDLEvents.after_create() 事件的示例,其中自定义事件钩子将在发出 CREATE TABLE 后在当前连接上发出 ALTER TABLE 命令

from sqlalchemy import create_engine
from sqlalchemy import event
from sqlalchemy import Table, Column, Metadata, Integer

m = MetaData()
some_table = Table("some_table", m, Column("data", Integer))


@event.listens_for(some_table, "after_create")
def after_create(target, connection, **kw):
    connection.execute(
        text("ALTER TABLE %s SET name=foo_%s" % (target.name, target.name))
    )


some_engine = create_engine("postgresql://scott:tiger@host/test")

# will emit "CREATE TABLE some_table" as well as the above
# "ALTER TABLE" statement afterwards
m.create_all(some_engine)

约束对象(例如 ForeignKeyConstraint, UniqueConstraint, CheckConstraint)也可以订阅这些事件,但是它们通常不会产生事件,因为这些对象通常以内联方式呈现到封闭的 CREATE TABLE 语句中,并从 DROP TABLE 语句中隐式删除。

对于 Index 构造,将为 CREATE INDEX 发出事件钩子,但是当删除表时,SQLAlchemy 通常不会发出 DROP INDEX,因为这再次隐式包含在 DROP TABLE 语句中。

2.0 版本新增: SchemaItem 对象的创建/删除事件的支持从以前对 MetaDataTable 的支持扩展到还包括 Constraint 及其所有子类、IndexSequence 和一些类型相关的构造,例如 ENUM

注意

这些事件钩子仅在 SQLAlchemy 的 create/drop 方法的范围内发出; 诸如 alembic 之类的工具不一定支持它们。

附加事件

提供了附加事件,用于自定义子模式元素与父元素关联时的行为,例如当 Column 与其 Table 关联,当 ForeignKeyConstraintTable 关联等等。这些事件包括 DDLEvents.before_parent_attach()DDLEvents.after_parent_attach()

反射事件

当数据库表的 反射 过程进行时,DDLEvents.column_reflect() 事件用于拦截和修改数据库列的 Python 定义。

与通用 DDL 一起使用

DDL 事件与 DDL 类和 DDL 子句构造的 ExecutableDDLElement 层级结构紧密集成,这些结构本身适合作为监听器可调用对象。

from sqlalchemy import DDL

event.listen(
    some_table,
    "after_create",
    DDL("ALTER TABLE %(table)s SET name=foo_%(table)s"),
)

事件传播到 MetaData 副本

对于所有 DDLEvent 事件,propagate=True 关键字参数将确保给定的事件处理程序传播到对象的副本,这些副本在使用 Table.to_metadata() 方法时创建。

from sqlalchemy import DDL

metadata = MetaData()
some_table = Table("some_table", metadata, Column("data", Integer))

event.listen(
    some_table,
    "after_create",
    DDL("ALTER TABLE %(table)s SET name=foo_%(table)s"),
    propagate=True,
)

new_metadata = MetaData()
new_table = some_table.to_metadata(new_metadata)

上述 DDL 对象将与 some_tablenew_table Table 对象的 DDLEvents.after_create() 事件关联。

类签名

class sqlalchemy.events.DDLEvents (sqlalchemy.event.Events)

方法 sqlalchemy.events.DDLEvents.after_create(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 CREATE 语句后调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_create')
def receive_after_create(target, connection, **kw):
    "listen for the 'after_create' event"

    # ... (event handling logic) ...
参数:
  • target

    SchemaObject,例如 MetaDataTable,但也包括所有 create/drop 对象,例如 IndexSequence 等,作为事件目标的对象。

    2.0 版本新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 发出 CREATE 语句的 Connection

  • **kw – 与事件相关的附加关键字参数。此字典的内容可能因版本而异,包括元数据级别事件正在生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

方法 sqlalchemy.events.DDLEvents.after_drop(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 DROP 语句后调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_drop')
def receive_after_drop(target, connection, **kw):
    "listen for the 'after_drop' event"

    # ... (event handling logic) ...
参数:
  • target

    SchemaObject,例如 MetaDataTable,但也包括所有 create/drop 对象,例如 IndexSequence 等,作为事件目标的对象。

    2.0 版本新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 发出 DROP 语句的 Connection

  • **kw – 与事件相关的附加关键字参数。此字典的内容可能因版本而异,包括元数据级别事件正在生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

方法 sqlalchemy.events.DDLEvents.after_parent_attach(target: SchemaEventTarget, parent: SchemaItem) None

SchemaItem 与父 SchemaItem 关联后调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'after_parent_attach')
def receive_after_parent_attach(target, parent):
    "listen for the 'after_parent_attach' event"

    # ... (event handling logic) ...
参数:
  • target – 目标对象

  • parent – 目标对象要附加到的父对象。

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

方法 sqlalchemy.events.DDLEvents.before_create(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 CREATE 语句之前调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_create')
def receive_before_create(target, connection, **kw):
    "listen for the 'before_create' event"

    # ... (event handling logic) ...
参数:
  • target

    SchemaObject,例如 MetaDataTable,但也包括所有 create/drop 对象,例如 IndexSequence 等,作为事件目标的对象。

    2.0 版本新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 将要发出 CREATE 语句的 Connection

  • **kw – 与事件相关的附加关键字参数。此字典的内容可能因版本而异,包括元数据级别事件正在生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

listen() 接受此事件的 insert=True 修饰符;当为 True 时,监听器函数将在发现时添加到事件的内部列表的前面,并在未传递此参数的已注册监听器函数之前执行。

方法 sqlalchemy.events.DDLEvents.before_drop(target: SchemaEventTarget, connection: Connection, **kw: Any) None

在发出 DROP 语句之前调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_drop')
def receive_before_drop(target, connection, **kw):
    "listen for the 'before_drop' event"

    # ... (event handling logic) ...
参数:
  • target

    SchemaObject,例如 MetaDataTable,但也包括所有 create/drop 对象,例如 IndexSequence 等,作为事件目标的对象。

    2.0 版本新增: 添加了对所有 SchemaItem 对象的支持。

  • connection – 将要发出 DROP 语句的 Connection

  • **kw – 与事件相关的附加关键字参数。此字典的内容可能因版本而异,包括元数据级别事件正在生成的表列表、checkfirst 标志以及内部事件使用的其他元素。

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

方法 sqlalchemy.events.DDLEvents.before_parent_attach(target: SchemaEventTarget, parent: SchemaItem) None

SchemaItem 与父 SchemaItem 关联之前调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'before_parent_attach')
def receive_before_parent_attach(target, parent):
    "listen for the 'before_parent_attach' event"

    # ... (event handling logic) ...
参数:
  • target – 目标对象

  • parent – 目标对象要附加到的父对象。

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

方法 sqlalchemy.events.DDLEvents.column_reflect(inspector: Inspector, table: Table, column_info: ReflectedColumn) None

Table 正在被反射时,为检索到的每个 ‘列信息’ 单位调用。

参数示例形式

from sqlalchemy import event


@event.listens_for(SomeSchemaClassOrObject, 'column_reflect')
def receive_column_reflect(inspector, table, column_info):
    "listen for the 'column_reflect' event"

    # ... (event handling logic) ...

此事件最容易通过将其应用于特定的 MetaData 实例来使用,在该实例中,它将对该 MetaData 中所有经历反射的 Table 对象生效

metadata = MetaData()


@event.listens_for(metadata, "column_reflect")
def receive_column_reflect(inspector, table, column_info):
    # receives for all Table objects that are reflected
    # under this MetaData
    ...


# will use the above event hook
my_table = Table("my_table", metadata, autoload_with=some_engine)

1.4.0b2 版本新增: DDLEvents.column_reflect() 钩子现在可以应用于 MetaData 对象以及 MetaData 类本身,在该类本身中,它将对与目标 MetaData 关联的所有 Table 对象生效。

它也可以应用于全局的 Table

from sqlalchemy import Table


@event.listens_for(Table, "column_reflect")
def receive_column_reflect(inspector, table, column_info):
    # receives for all Table objects that are reflected
    ...

它也可以应用于特定的 Table,在使用 Table.listeners 参数反射时。

t1 = Table(
    "my_table",
    autoload_with=some_engine,
    listeners=[("column_reflect", receive_column_reflect)],
)

将传递由方言返回的列信息字典,并且可以对其进行修改。该字典是在 Inspector.get_columns() 返回的列表的每个元素中返回的字典

该事件在对该字典执行任何操作之前调用,并且可以修改内容;可以将以下附加键添加到字典中,以进一步修改 Column 的构造方式

listen() 也接受此事件的 propagate=True 修饰符;当为 True 时,监听器函数将为目标对象的任何副本建立,即使用 Table.to_metadata() 时生成的那些副本。

属性 sqlalchemy.events.DDLEvents.dispatch: _Dispatch[_ET] = <sqlalchemy.event.base.DDLEventsDispatch object>

引用回 _Dispatch 类。

与 _Dispatch._events 双向关联

class sqlalchemy.events.SchemaEventTarget

作为 DDLEvents 事件目标的元素基类。

这包括 SchemaItem 以及 SchemaType

类签名

class sqlalchemy.events.SchemaEventTarget (sqlalchemy.event.registry.EventTarget)