diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala index d367f05a65d..322ef885eca 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/cdc/CDCReader.scala @@ -35,6 +35,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} +import org.apache.spark.sql.catalyst.plans.logical.Statistics +import org.apache.spark.sql.execution.LogicalRDD import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan} @@ -620,8 +622,15 @@ trait CDCReaderImpl extends DeltaLogging { val readSchema = cdcReadSchema(readSchemaSnapshot.metadata.schema) // build an empty DS. This DS retains the table schema and the isStreaming property - val emptyDf = spark.sqlContext.internalCreateDataFrame( - spark.sparkContext.emptyRDD[InternalRow], readSchema, isStreaming) + // NOTE: We need to manually set the stats to 0 otherwise we will use default stats of INT_MAX, + // which causes lots of optimizations to be applied wrong. + val emptyRdd = LogicalRDD( + readSchema.toAttributes, + spark.sparkContext.emptyRDD[InternalRow], + isStreaming = isStreaming + )(spark.sqlContext.sparkSession, Some(Statistics(0, Some(0)))) + val emptyDf = + Dataset.ofRows(spark.sqlContext.sparkSession, emptyRdd) CDCVersionDiffInfo( (emptyDf +: dfs).reduce((df1, df2) => df1.union( diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/cdc/CDCReaderSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/cdc/CDCReaderSuite.scala index c71255b794d..cec5b734769 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/cdc/CDCReaderSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/cdc/CDCReaderSuite.scala @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, QueryTest} import org.apache.spark.sql.{Row, SaveMode} -import org.apache.spark.sql.execution.SQLExecution +import org.apache.spark.sql.execution.{LogicalRDD, SQLExecution} import org.apache.spark.sql.execution.datasources.FileFormatWriter import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SharedSparkSession @@ -145,6 +145,30 @@ class CDCReaderSuite } } + test("CDC has correct stats") { + withTempDir { dir => + val log = DeltaLog.forTable(spark, dir.getAbsolutePath) + val data = spark.range(10) + val cdcData = spark.range(20, 25).withColumn(CDC_TYPE_COLUMN_NAME, lit("insert")) + + data.write.format("delta").save(dir.getAbsolutePath) + sql(s"DELETE FROM delta.`${dir.getAbsolutePath}`") + writeCdcData(log, cdcData) + + assert( + CDCReader + .changesToBatchDF(log, 0, 2, spark) + .queryExecution + .optimizedPlan + .collectLeaves() + .exists { + case l: LogicalRDD => l.stats.sizeInBytes == 0 && !l.isStreaming + case _ => false + } + ) + } + } + test("cdc update ops") { withTempDir { dir => val log = DeltaLog.forTable(spark, dir.getAbsolutePath)