Skip to content

Commit

Permalink
[Kernel] Add Snapshot::getPartitionColumnNames public API (#3916)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Add new `Snapshot::getPartitionColumnNames` public API

## How was this patch tested?

Simple UTs.

## Does this PR introduce _any_ user-facing changes?

Yes.
  • Loading branch information
scottsand-db authored Dec 4, 2024
1 parent 8ef5efc commit 6345a88
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
11 changes: 11 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;
import java.util.List;

/**
* Represents the snapshot of a Delta table.
Expand All @@ -36,6 +37,16 @@ public interface Snapshot {
*/
long getVersion(Engine engine);

/**
* Get the names of the partition columns in the Delta table at this snapshot.
*
* <p>The partition column names are returned in the order they are defined in the Delta table
* schema. If the table does not define any partition columns, this method returns an empty list.
*
* @param engine {@link Engine} instance to use in Delta Kernel.
* @return a list of partition column names, or an empty list if the table is not partitioned.
*/
List<String> getPartitionColumnNames(Engine engine);
/**
* Get the schema of the table at this snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.TableCommitCoordinatorClientHandler;
import io.delta.kernel.internal.util.VectorUtils;
import io.delta.kernel.types.StructType;
import java.util.List;
import java.util.Map;
import java.util.Optional;

Expand Down Expand Up @@ -85,6 +87,10 @@ public Protocol getProtocol() {
return protocol;
}

public List<String> getPartitionColumnNames(Engine engine) {
return VectorUtils.toJavaList(getMetadata().getPartitionColumns());
}

/**
* Get the domain metadata map from the log replay, which lazily loads and replays a history of
* domain metadata actions, resolving them to produce the current state of the domain metadata.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.defaults

import io.delta.kernel.{Operation, Table}
import io.delta.kernel.defaults.utils.TestUtils
import io.delta.kernel.types.{IntegerType, StructField, StructType}
import io.delta.kernel.utils.CloseableIterable
import org.scalatest.funsuite.AnyFunSuite

import scala.collection.JavaConverters._

class SnapshotSuite extends AnyFunSuite with TestUtils {

Seq(
Seq("part1"), // simple case
Seq("part1", "part2", "part3"), // multiple partition columns
Seq(), // non-partitioned
Seq("PART1", "part2") // case-sensitive
).foreach { partCols =>
test(s"Snapshot getPartitionColumnNames - partCols=$partCols") {
withTempDir { dir =>
// Step 1: Create a table with the given partition columns
val table = Table.forPath(defaultEngine, dir.getCanonicalPath)

val columns = (partCols ++ Seq("col1", "col2")).map { colName =>
new StructField(colName, IntegerType.INTEGER, true /* nullable */)
}

val schema = new StructType(columns.asJava)

var txnBuilder = table
.createTransactionBuilder(defaultEngine, "engineInfo", Operation.CREATE_TABLE)
.withSchema(defaultEngine, schema)

if (partCols.nonEmpty) {
txnBuilder = txnBuilder.withPartitionColumns(defaultEngine, partCols.asJava)
}

txnBuilder.build(defaultEngine).commit(defaultEngine, CloseableIterable.emptyIterable())

// Step 2: Check the partition columns
val tablePartCols =
table.getLatestSnapshot(defaultEngine).getPartitionColumnNames(defaultEngine)

assert(partCols.asJava === tablePartCols)
}
}
}

}

0 comments on commit 6345a88

Please # to comment.