Skip to content

Commit

Permalink
[Spark] Add DeltaTable.addFeatureSupport API to PySpark (#3786)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [X] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

This PR introduces a `DeltaTable.addFeatureSupport` API which was
missing in PySpark. This API is used to add support of a table feature
to a given Delta table.

## How was this patch tested?

A new test is added.

## Does this PR introduce _any_ user-facing changes?

Yes. See the above `Description` section.
  • Loading branch information
xupefei authored Nov 7, 2024
1 parent cb352c2 commit 520c8e8
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 0 deletions.
13 changes: 13 additions & 0 deletions python/delta/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,19 @@ def upgradeTableProtocol(self, readerVersion: int, writerVersion: int) -> None:
type(writerVersion))
jdt.upgradeTableProtocol(readerVersion, writerVersion)

@since(3.3) # type: ignore[arg-type]
def addFeatureSupport(self, featureName: str) -> None:
"""
Modify the protocol to add a supported feature, and if the table does not support table
features, upgrade the protocol automatically. In such a case when the provided feature is
writer-only, the table's writer version will be upgraded to `7`, and when the provided
feature is reader-writer, both reader and writer versions will be upgraded, to `(3, 7)`.
See online documentation and Delta's protocol specification at PROTOCOL.md for more details.
"""
DeltaTable._verify_type_str(featureName, "featureName")
self._jdt.addFeatureSupport(featureName)

@since(1.2) # type: ignore[arg-type]
def restoreToVersion(self, version: int) -> DataFrame:
"""
Expand Down
37 changes: 37 additions & 0 deletions python/delta/tests/test_deltatable.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from multiprocessing.pool import ThreadPool
from typing import List, Set, Dict, Optional, Any, Callable, Union, Tuple

from py4j.protocol import Py4JJavaError
from pyspark.errors.exceptions.base import UnsupportedOperationException
from pyspark.sql import DataFrame, Row
from pyspark.sql.functions import col, lit, expr, floor
Expand Down Expand Up @@ -1187,6 +1188,42 @@ def test_protocolUpgrade(self) -> None:
with self.assertRaisesRegex(ValueError, "writerVersion"):
dt.upgradeTableProtocol(1, {}) # type: ignore[arg-type]

def test_addFeatureSupport(self) -> None:
try:
self.spark.conf.set('spark.databricks.delta.minReaderVersion', '1')
self.spark.conf.set('spark.databricks.delta.minWriterVersion', '2')
self.__writeDeltaTable([('a', 1), ('b', 2), ('c', 3), ('d', 4)])
dt = DeltaTable.forPath(self.spark, self.tempFile)
finally:
self.spark.conf.unset('spark.databricks.delta.minReaderVersion')
self.spark.conf.unset('spark.databricks.delta.minWriterVersion')

# bad args
with self.assertRaisesRegex(Py4JJavaError, "DELTA_UNSUPPORTED_FEATURES_IN_CONFIG"):
dt.addFeatureSupport("abc")
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.addFeatureSupport(12345) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.addFeatureSupport([12345]) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.addFeatureSupport({}) # type: ignore[arg-type]
with self.assertRaisesRegex(ValueError, "featureName needs to be a string"):
dt.addFeatureSupport([]) # type: ignore[arg-type]

# good args
dt.addFeatureSupport("appendOnly")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 1, "The upgrade should be a no-op")
self.assertTrue(dt_details["minWriterVersion"] == 2, "The upgrade should be a no-op")
self.assertEqual(sorted(dt_details["tableFeatures"]), ["appendOnly", "invariants"])

dt.addFeatureSupport("deletionVectors")
dt_details = dt.detail().collect()[0].asDict()
self.assertTrue(dt_details["minReaderVersion"] == 3, "DV requires reader version 3")
self.assertTrue(dt_details["minWriterVersion"] == 7, "DV requires writer version 7")
self.assertEqual(sorted(dt_details["tableFeatures"]),
["appendOnly", "deletionVectors", "invariants"])

def test_restore_to_version(self) -> None:
self.__writeDeltaTable([('a', 1), ('b', 2)])
self.__overwriteDeltaTable([('a', 3), ('b', 2)],
Expand Down

0 comments on commit 520c8e8

Please # to comment.