From c0f13bf868b11f61abb9579c3503c4e4f0d326b1 Mon Sep 17 00:00:00 2001 From: Stephen Carman Date: Tue, 30 Jan 2024 16:46:06 -0500 Subject: [PATCH] Added many new table feature reference tables --- dat/generated_tables.py | 168 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 152 insertions(+), 16 deletions(-) diff --git a/dat/generated_tables.py b/dat/generated_tables.py index d6e9239..4671cc4 100644 --- a/dat/generated_tables.py +++ b/dat/generated_tables.py @@ -3,12 +3,13 @@ from datetime import date, datetime, timedelta from decimal import Decimal from pathlib import Path -from typing import Callable, List, Tuple +from typing import Callable, Tuple, List import pyspark.sql import pyspark.sql.types as types from delta.tables import DeltaTable from pyspark.sql import SparkSession +from pyspark.sql.functions import col, lit, now from dat.models import TableVersionMetadata, TestCaseInfo from dat.spark_builder import get_spark_session @@ -30,7 +31,7 @@ def get_version_metadata(case: TestCaseInfo) -> TableVersionMetadata: def save_expected(case: TestCaseInfo, as_latest=False) -> None: - '''Save the specified version of a Delta Table as a Parquet file.''' + """Save the specified version of a Delta Table as a Parquet file.""" spark = get_spark_session() df = spark.read.format('delta').load(case.delta_root) @@ -270,7 +271,7 @@ def create_nested_types(case: TestCaseInfo, spark: SparkSession): [types.StructField( 'float64', types.DoubleType()), types.StructField( - 'bool', types.BooleanType()), ])), + 'bool', types.BooleanType()), ])), types.StructField( 'array', types.ArrayType( types.ShortType())), @@ -311,6 +312,7 @@ def get_sample_data( description='Table with a checkpoint', ) def create_with_checkpoint(case: TestCaseInfo, spark: SparkSession): + spark.conf.set('spark.databricks.delta.legacy.allowAmbiguousPathsInCreateTable', 'true') df = get_sample_data(spark) (DeltaTable.create(spark) @@ -371,12 +373,12 @@ def create_no_replay(case: TestCaseInfo, spark: SparkSession): def create_stats_as_struct(case: TestCaseInfo, spark: SparkSession): df = get_sample_data(spark) (DeltaTable.create(spark) - .location(str(Path(case.delta_root).absolute())) - .addColumns(df.schema) - .property('delta.checkpointInterval', '2') - .property('delta.checkpoint.writeStatsAsStruct', 'true') - .property('delta.checkpoint.writeStatsAsJson', 'false') - .execute()) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property('delta.checkpointInterval', '2') + .property('delta.checkpoint.writeStatsAsStruct', 'true') + .property('delta.checkpoint.writeStatsAsJson', 'false') + .execute()) for i in range(3): df = get_sample_data(spark, seed=i, nrows=5) @@ -391,15 +393,149 @@ def create_stats_as_struct(case: TestCaseInfo, spark: SparkSession): def create_no_stats(case: TestCaseInfo, spark: SparkSession): df = get_sample_data(spark) (DeltaTable.create(spark) - .location(str(Path(case.delta_root).absolute())) - .addColumns(df.schema) - .property('delta.checkpointInterval', '2') - .property('delta.checkpoint.writeStatsAsStruct', 'false') - .property('delta.checkpoint.writeStatsAsJson', 'false') - .property('delta.dataSkippingNumIndexedCols', '0') - .execute()) + .location(str(Path(case.delta_root).absolute())) + .addColumns(df.schema) + .property('delta.checkpointInterval', '2') + .property('delta.checkpoint.writeStatsAsStruct', 'false') + .property('delta.checkpoint.writeStatsAsJson', 'false') + .property('delta.dataSkippingNumIndexedCols', '0') + .execute()) for i in range(3): df = get_sample_data(spark, seed=i, nrows=5) df.repartition(1).write.format('delta').mode( 'overwrite').save(case.delta_root) + + +@reference_table( + name='deletion_vectors', + description='Table with deletion vectors', +) +def create_deletion_vectors(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + + delta_path = str(Path(case.delta_root).absolute()) + delta_table: DeltaTable = (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .property('delta.enableDeletionVectors', 'true') + .execute()) + + df.repartition(1).write.format('delta').mode('append').save(case.delta_root) + + delta_table.delete(col("letter") == "a") + + +@reference_table( + name='check_constraints', + description='Table with a check constraint' +) +def check_constraint_table(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .property('delta.enableDeletionVectors', 'true') + .execute()) + + df.repartition(1).write.format('delta').mode('append').save(case.delta_root) + spark.sql(f"ALTER TABLE delta.`{delta_path}` ADD CONSTRAINT const1 CHECK (int > 0);") + + +@reference_table( + name='cdf', + description='Table with cdf turned on', +) +def create_change_data_feed(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + delta_table: DeltaTable = (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .property('delta.enableChangeDataFeed', 'true') + .execute()) + + df.repartition(1).write.format('delta').mode('append').save(case.delta_root) + + delta_table.update( + condition=col("letter") == "c", + set={"letter": lit("a")} + ) + delta_table.delete(col("letter") == "a") + + +@reference_table( + name='generated_columns', + description='Table with a generated column', +) +def create_generated_columns(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .addColumn("creation", types.DateType(), generatedAlwaysAs="CAST(now() AS DATE)") + .execute()) + + df.repartition(1).write.format('delta').mode('append').save(case.delta_root) + + +@reference_table( + name='column_mapping', + description='Table with column mapping turned on', +) +def create_column_mapping(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .property('delta.columnMapping.mode', 'name') + .execute()) + + df.repartition(1).write.format('delta').mode('append').save(case.delta_root) + spark.sql(f"ALTER TABLE delta.`{delta_path}` RENAME COLUMN int TO new_int;") + (df.withColumnRenamed('int', 'new_int') + .repartition(1) + .write + .format('delta') + .mode('append') + .save(case.delta_root)) + + +@reference_table( + name='timestamp_ntz', + description='Table with not timezone timestamps in it', +) +def create_timestamp_ntz(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + delta_table: DeltaTable = (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .addColumn("timestampNTZ", types.TimestampNTZType()) + .execute()) + delta_table.upgradeTableProtocol(3, 7) + (df.withColumn("timestampNTZ", now().cast(types.TimestampNTZType())) + .repartition(1) + .write + .format('delta') + .mode('append') + .save(case.delta_root)) + + +@reference_table( + name='iceberg_compat_v1', + description='Table with Iceberg compatability v1 turned on', +) +def create_iceberg_compat_v1(case: TestCaseInfo, spark: SparkSession): + df = get_sample_data(spark) + delta_path = str(Path(case.delta_root).absolute()) + delta_table: DeltaTable = (DeltaTable.create(spark) + .location(delta_path) + .addColumns(df.schema) + .property('delta.enableIcebergCompatV1', 'true') + .execute()) + delta_table.upgradeTableProtocol(3, 7) + df.repartition(1).write.format('delta').mode('append').save(case.delta_root)