From 1e79f7acb36d98ff9ad3521bdd8ba81eecf8fd61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez=20Mondrag=C3=B3n?= Date: Wed, 13 Dec 2023 20:55:17 -0600 Subject: [PATCH] fix(targets): Default handling of `ACTIVATE_VERSION` messages to soft deletes --- .github/workflows/test.yml | 4 ++-- singer_sdk/connectors/sql.py | 25 +++++++++++++++++++++++++ singer_sdk/helpers/capabilities.py | 8 ++++++++ singer_sdk/sinks/sql.py | 14 ++++++-------- singer_sdk/target_base.py | 11 ++++++++++- tests/samples/test_target_sqlite.py | 8 ++++---- 6 files changed, 55 insertions(+), 15 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 0f1fef77e9..3a6d27967f 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -40,14 +40,14 @@ jobs: name: "Test on ${{ matrix.python-version }} (${{ matrix.session }}) / ${{ matrix.os }} / SQLAlchemy: ${{ matrix.sqlalchemy }}" runs-on: ${{ matrix.os }} env: - NOXPYTHON: ${{ matrix.python-version }} + NOXFORCEPYTHON: ${{ matrix.python-version }} NOXSESSION: ${{ matrix.session }} strategy: fail-fast: false matrix: session: [tests] os: ["ubuntu-latest", "macos-latest", "windows-latest"] - python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12"] + python-version: ["3.7", "3.8", "3.9", "3.10", "3.11", "3.12", "3.13"] sqlalchemy: ["2.*"] include: - { session: tests, python-version: "3.11", os: "ubuntu-latest", sqlalchemy: "1.*" } diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 5f61908e7f..e459b70bbb 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1189,3 +1189,28 @@ def deserialize_json(self, json_str: str) -> object: .. versionadded:: 0.31.0 """ return json.loads(json_str, parse_float=decimal.Decimal) + + def delete_old_versions( + self, + *, + full_table_name: str, + version_column_name: str, + current_version: int, + ) -> None: + """Hard-deletes any old version rows from the table. + + This is used to clean up old versions when an ACTIVATE_VERSION message is + received. + + Args: + full_table_name: The fully qualified table name. + version_column_name: The name of the version column. + current_version: The current ACTIVATE version of the table. + """ + with self._connect() as conn, conn.begin(): + conn.execute( + sqlalchemy.text( + f"DELETE FROM {full_table_name} " # noqa: S608 + f"WHERE {version_column_name} < {current_version}", + ), + ) diff --git a/singer_sdk/helpers/capabilities.py b/singer_sdk/helpers/capabilities.py index c3d1240932..b18d537483 100644 --- a/singer_sdk/helpers/capabilities.py +++ b/singer_sdk/helpers/capabilities.py @@ -106,6 +106,14 @@ description="Add metadata to records.", ), ).to_dict() +TARGET_HARD_DELETE_CONFIG = PropertiesList( + Property( + "hard_delete", + BooleanType(), + description="Hard delete records.", + default=False, + ), +).to_dict() class TargetLoadMethods(str, Enum): diff --git a/singer_sdk/sinks/sql.py b/singer_sdk/sinks/sql.py index 9d1428c4e8..0e4b6823a1 100644 --- a/singer_sdk/sinks/sql.py +++ b/singer_sdk/sinks/sql.py @@ -378,14 +378,12 @@ def activate_version(self, new_version: int) -> None: sql_type=sqlalchemy.types.Integer(), ) - if self.config.get("hard_delete", True): - with self.connector._connect() as conn, conn.begin(): # noqa: SLF001 - conn.execute( - sqlalchemy.text( - f"DELETE FROM {self.full_table_name} " # noqa: S608 - f"WHERE {self.version_column_name} <= {new_version}", - ), - ) + if self.config.get("hard_delete", False): + self.connector.delete_old_versions( + full_table_name=self.full_table_name, + version_column_name=self.version_column_name, + new_version=new_version, + ) return if not self.connector.column_exists( diff --git a/singer_sdk/target_base.py b/singer_sdk/target_base.py index bef581601f..8b29414ca6 100644 --- a/singer_sdk/target_base.py +++ b/singer_sdk/target_base.py @@ -19,6 +19,7 @@ from singer_sdk.helpers.capabilities import ( ADD_RECORD_METADATA_CONFIG, BATCH_CONFIG, + TARGET_HARD_DELETE_CONFIG, TARGET_LOAD_METHOD_CONFIG, TARGET_SCHEMA_CONFIG, CapabilitiesEnum, @@ -636,7 +637,12 @@ def capabilities(self) -> list[CapabilitiesEnum]: A list of capabilities supported by this target. """ sql_target_capabilities: list[CapabilitiesEnum] = super().capabilities - sql_target_capabilities.extend([TargetCapabilities.TARGET_SCHEMA]) + sql_target_capabilities.extend( + [ + TargetCapabilities.TARGET_SCHEMA, + TargetCapabilities.HARD_DELETE, + ] + ) return sql_target_capabilities @@ -668,6 +674,9 @@ def _merge_missing(source_jsonschema: dict, target_jsonschema: dict) -> None: if TargetCapabilities.TARGET_SCHEMA in capabilities: _merge_missing(TARGET_SCHEMA_CONFIG, config_jsonschema) + if TargetCapabilities.HARD_DELETE in capabilities: + _merge_missing(TARGET_HARD_DELETE_CONFIG, config_jsonschema) + super().append_builtin_config(config_jsonschema) @final diff --git a/tests/samples/test_target_sqlite.py b/tests/samples/test_target_sqlite.py index 59c8565c1f..58eedb43b3 100644 --- a/tests/samples/test_target_sqlite.py +++ b/tests/samples/test_target_sqlite.py @@ -48,9 +48,9 @@ def sqlite_sample_target(sqlite_target_test_config): @pytest.fixture -def sqlite_sample_target_soft_delete(sqlite_target_test_config): +def sqlite_sample_target_hard_delete(sqlite_target_test_config): """Get a sample target object with hard_delete disabled.""" - return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": False}) + return SQLiteTarget(config={**sqlite_target_test_config, "hard_delete": True}) @pytest.fixture @@ -217,7 +217,7 @@ def test_sqlite_column_addition(sqlite_sample_target: SQLTarget): def test_sqlite_activate_version( sqlite_sample_target: SQLTarget, - sqlite_sample_target_soft_delete: SQLTarget, + sqlite_sample_target_hard_delete: SQLTarget, ): """Test handling the activate_version message for the SQLite target. @@ -249,7 +249,7 @@ def test_sqlite_activate_version( target_sync_test(sqlite_sample_target, input=StringIO(tap_output), finalize=True) target_sync_test( - sqlite_sample_target_soft_delete, + sqlite_sample_target_hard_delete, input=StringIO(tap_output), finalize=True, )