Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Added many new table feature reference tables #45

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 152 additions & 16 deletions dat/generated_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
from datetime import date, datetime, timedelta
from decimal import Decimal
from pathlib import Path
from typing import Callable, List, Tuple
from typing import Callable, Tuple, List
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A linter make this change?


import pyspark.sql
import pyspark.sql.types as types
from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, now

from dat.models import TableVersionMetadata, TestCaseInfo
from dat.spark_builder import get_spark_session
Expand All @@ -30,7 +31,7 @@ def get_version_metadata(case: TestCaseInfo) -> TableVersionMetadata:


def save_expected(case: TestCaseInfo, as_latest=False) -> None:
'''Save the specified version of a Delta Table as a Parquet file.'''
"""Save the specified version of a Delta Table as a Parquet file."""
spark = get_spark_session()
df = spark.read.format('delta').load(case.delta_root)

Expand Down Expand Up @@ -270,7 +271,7 @@ def create_nested_types(case: TestCaseInfo, spark: SparkSession):
[types.StructField(
'float64', types.DoubleType()),
types.StructField(
'bool', types.BooleanType()), ])),
'bool', types.BooleanType()), ])),
types.StructField(
'array', types.ArrayType(
types.ShortType())),
Expand Down Expand Up @@ -311,6 +312,7 @@ def get_sample_data(
description='Table with a checkpoint',
)
def create_with_checkpoint(case: TestCaseInfo, spark: SparkSession):
spark.conf.set('spark.databricks.delta.legacy.allowAmbiguousPathsInCreateTable', 'true')
df = get_sample_data(spark)

(DeltaTable.create(spark)
Expand Down Expand Up @@ -371,12 +373,12 @@ def create_no_replay(case: TestCaseInfo, spark: SparkSession):
def create_stats_as_struct(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
(DeltaTable.create(spark)
.location(str(Path(case.delta_root).absolute()))
.addColumns(df.schema)
.property('delta.checkpointInterval', '2')
.property('delta.checkpoint.writeStatsAsStruct', 'true')
.property('delta.checkpoint.writeStatsAsJson', 'false')
.execute())
.location(str(Path(case.delta_root).absolute()))
.addColumns(df.schema)
.property('delta.checkpointInterval', '2')
.property('delta.checkpoint.writeStatsAsStruct', 'true')
.property('delta.checkpoint.writeStatsAsJson', 'false')
.execute())

for i in range(3):
df = get_sample_data(spark, seed=i, nrows=5)
Expand All @@ -391,15 +393,149 @@ def create_stats_as_struct(case: TestCaseInfo, spark: SparkSession):
def create_no_stats(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
(DeltaTable.create(spark)
.location(str(Path(case.delta_root).absolute()))
.addColumns(df.schema)
.property('delta.checkpointInterval', '2')
.property('delta.checkpoint.writeStatsAsStruct', 'false')
.property('delta.checkpoint.writeStatsAsJson', 'false')
.property('delta.dataSkippingNumIndexedCols', '0')
.execute())
.location(str(Path(case.delta_root).absolute()))
.addColumns(df.schema)
.property('delta.checkpointInterval', '2')
.property('delta.checkpoint.writeStatsAsStruct', 'false')
.property('delta.checkpoint.writeStatsAsJson', 'false')
.property('delta.dataSkippingNumIndexedCols', '0')
.execute())

for i in range(3):
df = get_sample_data(spark, seed=i, nrows=5)
df.repartition(1).write.format('delta').mode(
'overwrite').save(case.delta_root)


@reference_table(
name='deletion_vectors',
description='Table with deletion vectors',
)
def create_deletion_vectors(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)

delta_path = str(Path(case.delta_root).absolute())
delta_table: DeltaTable = (DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.property('delta.enableDeletionVectors', 'true')
.execute())

df.repartition(1).write.format('delta').mode('append').save(case.delta_root)

delta_table.delete(col("letter") == "a")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe append some more rows here so the tests are a little more reliable?



@reference_table(
name='check_constraints',
description='Table with a check constraint'
)
def check_constraint_table(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
(DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.property('delta.enableDeletionVectors', 'true')
.execute())

df.repartition(1).write.format('delta').mode('append').save(case.delta_root)
spark.sql(f"ALTER TABLE delta.`{delta_path}` ADD CONSTRAINT const1 CHECK (int > 0);")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh my, this syntax.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the only way to add a constraint :-) delta-rs though has it part of the builder methods :-D



@reference_table(
name='cdf',
description='Table with cdf turned on',
)
def create_change_data_feed(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
delta_table: DeltaTable = (DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.property('delta.enableChangeDataFeed', 'true')
.execute())

df.repartition(1).write.format('delta').mode('append').save(case.delta_root)

delta_table.update(
condition=col("letter") == "c",
set={"letter": lit("a")}
)
delta_table.delete(col("letter") == "a")


@reference_table(
name='generated_columns',
description='Table with a generated column',
)
def create_generated_columns(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
(DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.addColumn("creation", types.DateType(), generatedAlwaysAs="CAST(now() AS DATE)")
.execute())

df.repartition(1).write.format('delta').mode('append').save(case.delta_root)


@reference_table(
name='column_mapping',
description='Table with column mapping turned on',
)
def create_column_mapping(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
(DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.property('delta.columnMapping.mode', 'name')
.execute())

df.repartition(1).write.format('delta').mode('append').save(case.delta_root)
spark.sql(f"ALTER TABLE delta.`{delta_path}` RENAME COLUMN int TO new_int;")
(df.withColumnRenamed('int', 'new_int')
.repartition(1)
.write
.format('delta')
.mode('append')
.save(case.delta_root))


@reference_table(
name='timestamp_ntz',
description='Table with not timezone timestamps in it',
)
def create_timestamp_ntz(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
delta_table: DeltaTable = (DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.addColumn("timestampNTZ", types.TimestampNTZType())
.execute())
delta_table.upgradeTableProtocol(3, 7)
(df.withColumn("timestampNTZ", now().cast(types.TimestampNTZType()))
.repartition(1)
.write
.format('delta')
.mode('append')
.save(case.delta_root))


@reference_table(
name='iceberg_compat_v1',
description='Table with Iceberg compatability v1 turned on',
)
def create_iceberg_compat_v1(case: TestCaseInfo, spark: SparkSession):
df = get_sample_data(spark)
delta_path = str(Path(case.delta_root).absolute())
delta_table: DeltaTable = (DeltaTable.create(spark)
.location(delta_path)
.addColumns(df.schema)
.property('delta.enableIcebergCompatV1', 'true')
.execute())
delta_table.upgradeTableProtocol(3, 7)
df.repartition(1).write.format('delta').mode('append').save(case.delta_root)
Loading