diff --git a/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/SetSnapshotsRetentionPolicyTest.java b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/SetSnapshotsRetentionPolicyTest.java new file mode 100644 index 00000000..643a9f98 --- /dev/null +++ b/integrations/spark/openhouse-spark-itest/src/test/java/com/linkedin/openhouse/spark/e2e/extensions/SetSnapshotsRetentionPolicyTest.java @@ -0,0 +1,66 @@ +package com.linkedin.openhouse.spark.e2e.extensions; + +import static com.linkedin.openhouse.spark.MockHelpers.*; +import static com.linkedin.openhouse.spark.SparkTestBase.*; + +import com.linkedin.openhouse.spark.SparkTestBase; +import org.apache.iceberg.catalog.TableIdentifier; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; + +@ExtendWith(SparkTestBase.class) +public class SetSnapshotsRetentionPolicyTest { + @Test + public void testSetSnapshotsRetentionPolicy() { + Object existingTable = + mockGetTableResponseBody( + "dbSetSnapshotsRetention", + "t1", + "c1", + "dbSetSnapshotsRetention.t1", + "u1", + mockTableLocation( + TableIdentifier.of("dbSetSnapshotsRetention", "t1"), + convertSchemaToDDLComponent(baseSchema), + ""), + "V1", + baseSchema, + null, + null); + Object existingOpenhouseTable = + mockGetTableResponseBody( + "dbSetSnapshotsRetention", + "t1", + "c1", + "dbSetSnapshotsRetention.t1", + "u1", + mockTableLocationAfterOperation( + TableIdentifier.of("dbSetSnapshotsRetention", "t1"), + "ALTER TABLE %t SET TBLPROPERTIES ('openhouse.tableId'='t1')"), + "V1", + baseSchema, + null, + null); + Object tableAfterSetSnapshotsRetention = + mockGetTableResponseBody( + "dbSetSnapshotsRetention", + "t1", + "c1", + "dbSetSnapshotsRetention.t1", + "u1", + mockTableLocationAfterOperation( + TableIdentifier.of("dbSetSnapshotsRetention", "t1"), + "ALTER TABLE %t SET TBLPROPERTIES('policies'='{\"snapshotsRetention\":{\"timeCount\":24,\"granularity\":\"HOUR\",\"count\":10}}')"), + "V2", + baseSchema, + null, + null); + mockTableService.enqueue(mockResponse(200, existingOpenhouseTable)); // doRefresh() + mockTableService.enqueue(mockResponse(200, existingOpenhouseTable)); // doRefresh() + mockTableService.enqueue(mockResponse(201, tableAfterSetSnapshotsRetention)); // doCommit() + String ddlWithSchema = + "ALTER TABLE openhouse.dbSetSnapshotsRetention.t1 SET POLICY (SNAPSHOTS_RETENTION TTL=24H COUNT=10)"; + Assertions.assertDoesNotThrow(() -> spark.sql(ddlWithSchema)); + } +} diff --git a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 index bf762af6..3abacfc0 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 +++ b/integrations/spark/openhouse-spark-runtime/src/main/antlr/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensions.g4 @@ -25,6 +25,7 @@ singleStatement statement : ALTER TABLE multipartIdentifier SET POLICY '(' retentionPolicy (columnRetentionPolicy)? ')' #setRetentionPolicy | ALTER TABLE multipartIdentifier SET POLICY '(' sharingPolicy ')' #setSharingPolicy + | ALTER TABLE multipartIdentifier SET POLICY '(' snapshotsRetentionPolicy ')' #setSnapshotsRetentionPolicy | ALTER TABLE multipartIdentifier MODIFY columnNameClause SET columnPolicy #setColumnPolicyTag | GRANT privilege ON grantableResource TO principal #grantStatement | REVOKE privilege ON grantableResource FROM principal #revokeStatement @@ -64,7 +65,7 @@ quotedIdentifier ; nonReserved - : ALTER | TABLE | SET | POLICY | RETENTION | SHARING + : ALTER | TABLE | SET | POLICY | RETENTION | SHARING | SNAPSHOTS_RETENTION | GRANT | REVOKE | ON | TO | SHOW | GRANTS | PATTERN | WHERE | COLUMN ; @@ -131,6 +132,27 @@ policyTag : PII | HC ; +snapshotsRetentionPolicy + : SNAPSHOTS_RETENTION snapshotsCombinedRetention + ; + +snapshotsCombinedRetention + : snapshotsTTL (snapshotsCount)? + ; + +snapshotsTTL + : TTL '=' snapshotsTTLValue + ; + +snapshotsCount + : COUNT '=' POSITIVE_INTEGER + ; + +snapshotsTTLValue + : RETENTION_DAY + | RETENTION_HOUR + ; + ALTER: 'ALTER'; TABLE: 'TABLE'; SET: 'SET'; @@ -157,6 +179,9 @@ HC: 'HC'; MODIFY: 'MODIFY'; TAG: 'TAG'; NONE: 'NONE'; +TTL: 'TTL'; +COUNT: 'COUNT'; +SNAPSHOTS_RETENTION : 'SNAPSHOTS_RETENTION'; POSITIVE_INTEGER : DIGIT+ diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala index 4b8fc405..582c458e 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/parser/extensions/OpenhouseSqlExtensionsAstBuilder.scala @@ -2,7 +2,7 @@ package com.linkedin.openhouse.spark.sql.catalyst.parser.extensions import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes import com.linkedin.openhouse.spark.sql.catalyst.parser.extensions.OpenhouseSqlExtensionsParser._ -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetRetentionPolicy, SetSharingPolicy, SetSnapshotsRetentionPolicy, ShowGrantsStatement} import com.linkedin.openhouse.spark.sql.catalyst.enums.GrantableResourceTypes.GrantableResourceType import com.linkedin.openhouse.gen.tables.client.model.TimePartitionSpec import org.antlr.v4.runtime.tree.ParseTree @@ -18,10 +18,11 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh override def visitSetRetentionPolicy(ctx: SetRetentionPolicyContext): SetRetentionPolicy = { val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) - val (granularity, count) = typedVisit[(String, Int)](ctx.retentionPolicy()) + val retentionPolicy = ctx.retentionPolicy() + val (granularity, count) = typedVisit[(String, Int)](retentionPolicy) val (colName, colPattern) = if (ctx.columnRetentionPolicy() != null) - typedVisit[(String, String)](ctx.columnRetentionPolicy()) + typedVisit[(String, String)](ctx.columnRetentionPolicy()) else (null, null) SetRetentionPolicy(tableName, granularity, count, Option(colName), Option(colPattern)) } @@ -94,8 +95,8 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } } - override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): (String) = { - (ctx.retentionColumnPatternClause().STRING().getText) + override def visitColumnRetentionPolicyPatternClause(ctx: ColumnRetentionPolicyPatternClauseContext): String = { + ctx.retentionColumnPatternClause().STRING().getText } override def visitSharingPolicy(ctx: SharingPolicyContext): String = { @@ -115,7 +116,7 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } override def visitDuration(ctx: DurationContext): (String, Int) = { - val granularity = if (ctx.RETENTION_DAY != null) { + val granularity: String = if (ctx.RETENTION_DAY != null) { TimePartitionSpec.GranularityEnum.DAY.getValue() } else if (ctx.RETENTION_YEAR() != null) { TimePartitionSpec.GranularityEnum.YEAR.getValue() @@ -124,13 +125,47 @@ class OpenhouseSqlExtensionsAstBuilder (delegate: ParserInterface) extends Openh } else { TimePartitionSpec.GranularityEnum.HOUR.getValue() } - val count = ctx.getText.substring(0, ctx.getText.length - 1).toInt (granularity, count) } + override def visitSetSnapshotsRetentionPolicy(ctx: SetSnapshotsRetentionPolicyContext): SetSnapshotsRetentionPolicy = { + val tableName = typedVisit[Seq[String]](ctx.multipartIdentifier) + val (granularity, timeCount, count) = typedVisit[(String, Int, Int)](ctx.snapshotsRetentionPolicy()) + SetSnapshotsRetentionPolicy(tableName, granularity, timeCount, count) + } + + override def visitSnapshotsRetentionPolicy(ctx: SnapshotsRetentionPolicyContext): (String, Int, Int) = { + typedVisit[(String, Int, Int)](ctx.snapshotsCombinedRetention()) + } + + override def visitSnapshotsCombinedRetention(ctx: SnapshotsCombinedRetentionContext): (String, Int, Int) = { + val snapshotsTTL = ctx.snapshotsTTL() + val (granularity, timeCount) = typedVisit[(String, Int)](snapshotsTTL) + val count = + if (ctx.snapshotsCount() != null) { + typedVisit[Int](ctx.snapshotsCount()) + } else 0 + (granularity, timeCount, count) + } + + override def visitSnapshotsTTL(ctx: SnapshotsTTLContext): (String, Int) = { + val ttl = ctx.snapshotsTTLValue() + val granularity: String = if (ttl.RETENTION_DAY() != null) { + TimePartitionSpec.GranularityEnum.DAY.getValue() + } else { + TimePartitionSpec.GranularityEnum.HOUR.getValue() + } + val count = ttl.getText.substring(0, ttl.getText.length - 1).toInt + (granularity, count) + } + + override def visitSnapshotsCount(ctx: SnapshotsCountContext): Integer = { + ctx.POSITIVE_INTEGER().getText.toInt + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala - private def toSeq[T](list: java.util.List[T]): Seq[T] = toBuffer(list).toSeq + private def toSeq[T](list: java.util.List[T]) = toBuffer(list).toSeq private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSnapshotsRetentionPolicy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSnapshotsRetentionPolicy.scala new file mode 100644 index 00000000..46e16d92 --- /dev/null +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/catalyst/plans/logical/SetSnapshotsRetentionPolicy.scala @@ -0,0 +1,9 @@ +package com.linkedin.openhouse.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.plans.logical.Command + +case class SetSnapshotsRetentionPolicy (tableName: Seq[String], granularity: String, timeCount: Int, count: Int) extends Command { + override def simpleString(maxFields: Int): String = { + s"SetSnapshotsRetentionPolicy: ${tableName} ${timeCount} ${granularity} ${count}" + } +} diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala index 595c6e06..658e5138 100644 --- a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/OpenhouseDataSourceV2Strategy.scala @@ -1,6 +1,6 @@ package com.linkedin.openhouse.spark.sql.execution.datasources.v2 -import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetRetentionPolicy, SetSharingPolicy, SetColumnPolicyTag, ShowGrantsStatement} +import com.linkedin.openhouse.spark.sql.catalyst.plans.logical.{GrantRevokeStatement, SetColumnPolicyTag, SetRetentionPolicy, SetSharingPolicy, SetSnapshotsRetentionPolicy, ShowGrantsStatement} import org.apache.iceberg.spark.{Spark3Util, SparkCatalog, SparkSessionCatalog} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.expressions.PredicateHelper @@ -15,6 +15,8 @@ case class OpenhouseDataSourceV2Strategy(spark: SparkSession) extends Strategy w override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case SetRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, count, colName, colPattern) => SetRetentionPolicyExec(catalog, ident, granularity, count, colName, colPattern) :: Nil + case SetSnapshotsRetentionPolicy(CatalogAndIdentifierExtractor(catalog, ident), granularity, timeCount, count) => + SetSnapshotsRetentionPolicyExec(catalog, ident, granularity, timeCount, count) :: Nil case SetSharingPolicy(CatalogAndIdentifierExtractor(catalog, ident), sharing) => SetSharingPolicyExec(catalog, ident, sharing) :: Nil case SetColumnPolicyTag(CatalogAndIdentifierExtractor(catalog, ident), policyTag, cols) => diff --git a/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSnapshotsRetentionPolicyExec.scala b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSnapshotsRetentionPolicyExec.scala new file mode 100644 index 00000000..183d8621 --- /dev/null +++ b/integrations/spark/openhouse-spark-runtime/src/main/scala/com/linkedin/openhouse/spark/sql/execution/datasources/v2/SetSnapshotsRetentionPolicyExec.scala @@ -0,0 +1,45 @@ +package com.linkedin.openhouse.spark.sql.execution.datasources.v2 + +import org.apache.iceberg.spark.source.SparkTable +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.sql.execution.datasources.v2.V2CommandExec + +case class SetSnapshotsRetentionPolicyExec( + catalog: TableCatalog, + ident: Identifier, + granularity: String, + timeCount: Int, + count: Int +) extends V2CommandExec { + + override lazy val output: Seq[Attribute] = Nil + + override protected def run(): Seq[InternalRow] = { + catalog.loadTable(ident) match { + case iceberg: SparkTable if iceberg.table().properties().containsKey("openhouse.tableId") => + val key = "updated.openhouse.policy" + val value = { + (count) match { + case (0) => s"""{"snapshotsRetention":{"timeCount":${timeCount},"granularity":"${granularity}"}}""" + case (_) => + s"""{"snapshotsRetention":{"timeCount":${timeCount}, "granularity":"${granularity}", "count":${count}}}""" + } + } + + iceberg.table().updateProperties() + .set(key, value) + .commit() + + case table => + throw new UnsupportedOperationException(s"Cannot set snapshots retention policy for non-Openhouse table: $table") + } + + Nil + } + + override def simpleString(maxFields: Int): String = { + s"SetSnapshotsRetentionPolicyExec: ${catalog} ${ident} ${timeCount} ${granularity} ${count}" + } +}