diff --git a/src/sqlalchemy_cratedb/polyfill.py b/src/sqlalchemy_cratedb/polyfill.py index 8c0f3a8..6354e11 100644 --- a/src/sqlalchemy_cratedb/polyfill.py +++ b/src/sqlalchemy_cratedb/polyfill.py @@ -95,6 +95,26 @@ def polyfill_refresh_after_dml_session(session: sa.orm.Session): listen(session, "after_flush", do_flush) +def polyfill_refresh_after_dml_engine(engine: sa.engine.Engine): + def receive_after_execute( + conn: sa.engine.Connection, clauseelement, multiparams, params, execution_options, result + ): + """ + Run a `REFRESH TABLE ...` command after each DML operation (INSERT, UPDATE, DELETE). + + This is used by CrateDB's Singer/Meltano and `rdflib-sqlalchemy` adapters. + """ + + if isinstance(clauseelement, (sa.sql.Insert, sa.sql.Update, sa.sql.Delete)): + if not isinstance(clauseelement.table, sa.sql.Join): + full_table_name = f'"{clauseelement.table.name}"' + if clauseelement.table.schema is not None: + full_table_name = f'"{clauseelement.table.schema}".' + full_table_name + conn.execute(sa.text(f'REFRESH TABLE {full_table_name};')) + + sa.event.listen(engine, "after_execute", receive_after_execute) + + def do_flush(session, flush_context): """ SQLAlchemy event handler for the 'after_flush' event,