From 520c8e8bb288604ff044ac1f593bedd6aac6a5ca Mon Sep 17 00:00:00 2001 From: Paddy Xu Date: Thu, 7 Nov 2024 17:39:45 +0100 Subject: [PATCH] [Spark] Add `DeltaTable.addFeatureSupport` API to PySpark (#3786) #### Which Delta project/connector is this regarding? - [X] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This PR introduces a `DeltaTable.addFeatureSupport` API which was missing in PySpark. This API is used to add support of a table feature to a given Delta table. ## How was this patch tested? A new test is added. ## Does this PR introduce _any_ user-facing changes? Yes. See the above `Description` section. --- python/delta/tables.py | 13 ++++++++++ python/delta/tests/test_deltatable.py | 37 +++++++++++++++++++++++++++ 2 files changed, 50 insertions(+) diff --git a/python/delta/tables.py b/python/delta/tables.py index 83ef64408ba..ee52c77e386 100644 --- a/python/delta/tables.py +++ b/python/delta/tables.py @@ -581,6 +581,19 @@ def upgradeTableProtocol(self, readerVersion: int, writerVersion: int) -> None: type(writerVersion)) jdt.upgradeTableProtocol(readerVersion, writerVersion) + @since(3.3) # type: ignore[arg-type] + def addFeatureSupport(self, featureName: str) -> None: + """ + Modify the protocol to add a supported feature, and if the table does not support table + features, upgrade the protocol automatically. In such a case when the provided feature is + writer-only, the table's writer version will be upgraded to `7`, and when the provided + feature is reader-writer, both reader and writer versions will be upgraded, to `(3, 7)`. + + See online documentation and Delta's protocol specification at PROTOCOL.md for more details. + """ + DeltaTable._verify_type_str(featureName, "featureName") + self._jdt.addFeatureSupport(featureName) + @since(1.2) # type: ignore[arg-type] def restoreToVersion(self, version: int) -> DataFrame: """ diff --git a/python/delta/tests/test_deltatable.py b/python/delta/tests/test_deltatable.py index 7d1991041ff..a263e99ec96 100644 --- a/python/delta/tests/test_deltatable.py +++ b/python/delta/tests/test_deltatable.py @@ -23,6 +23,7 @@ from multiprocessing.pool import ThreadPool from typing import List, Set, Dict, Optional, Any, Callable, Union, Tuple +from py4j.protocol import Py4JJavaError from pyspark.errors.exceptions.base import UnsupportedOperationException from pyspark.sql import DataFrame, Row from pyspark.sql.functions import col, lit, expr, floor @@ -1187,6 +1188,42 @@ def test_protocolUpgrade(self) -> None: with self.assertRaisesRegex(ValueError, "writerVersion"): dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type] + def test_addFeatureSupport(self) -> None: + try: + self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1') + self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2') + self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)]) + dt = DeltaTable.forPath(self.spark, self.tempFile) + finally: + self.spark.conf.unset('spark.databricks.delta.minReaderVersion') + self.spark.conf.unset('spark.databricks.delta.minWriterVersion') + + # bad args + with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"): + dt.addFeatureSupport("abc") + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.addFeatureSupport(12345) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.addFeatureSupport([12345]) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.addFeatureSupport({}) # type: ignore[arg-type] + with self.assertRaisesRegex(ValueError, "featureName needs to be a string"): + dt.addFeatureSupport([]) # type: ignore[arg-type] + + # good args + dt.addFeatureSupport("appendOnly") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 1, "The upgrade should be a no-op") + self.assertTrue(dt_details["minWriterVersion"] == 2, "The upgrade should be a no-op") + self.assertEqual(sorted(dt_details["tableFeatures"]), ["appendOnly", "invariants"]) + + dt.addFeatureSupport("deletionVectors") + dt_details = dt.detail().collect()[0].asDict() + self.assertTrue(dt_details["minReaderVersion"] == 3, "DV requires reader version 3") + self.assertTrue(dt_details["minWriterVersion"] == 7, "DV requires writer version 7") + self.assertEqual(sorted(dt_details["tableFeatures"]), + ["appendOnly", "deletionVectors", "invariants"]) + def test_restore_to_version(self) -> None: self.__writeDeltaTable([('a', 1), ('b', 2)]) self.__overwriteDeltaTable([('a', 3), ('b', 2)],