Skip to content

Commit

Permalink
Fix a bug that CDCReader.changesToDF does not produce a plan with cor…
Browse files Browse the repository at this point in the history
…rect stats

CDCReader.changesToDF always creates a empty LogicalRDD to retain schema and some properties, but LogicalRDD has stats of INT_MAX sizeInByte by default so it could make CDF-based queries optimized incorrectly in many cases.

GitOrigin-RevId: 80bfe6b50d8504d6d013e102366b6225cfafa3f7
  • Loading branch information
minyyy authored and vkorukanti committed Jul 6, 2023
1 parent bf482d5 commit 8a2da73
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, Filter, PrunedFilteredScan}
Expand Down Expand Up @@ -620,8 +622,15 @@ trait CDCReaderImpl extends DeltaLogging {

val readSchema = cdcReadSchema(readSchemaSnapshot.metadata.schema)
// build an empty DS. This DS retains the table schema and the isStreaming property
val emptyDf = spark.sqlContext.internalCreateDataFrame(
spark.sparkContext.emptyRDD[InternalRow], readSchema, isStreaming)
// NOTE: We need to manually set the stats to 0 otherwise we will use default stats of INT_MAX,
// which causes lots of optimizations to be applied wrong.
val emptyRdd = LogicalRDD(
readSchema.toAttributes,
spark.sparkContext.emptyRDD[InternalRow],
isStreaming = isStreaming
)(spark.sqlContext.sparkSession, Some(Statistics(0, Some(0))))
val emptyDf =
Dataset.ofRows(spark.sqlContext.sparkSession, emptyRdd)

CDCVersionDiffInfo(
(emptyDf +: dfs).reduce((df1, df2) => df1.union(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest}
import org.apache.spark.sql.{Row, SaveMode}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.{LogicalRDD, SQLExecution}
import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -145,6 +145,30 @@ class CDCReaderSuite
}
}

test("CDC has correct stats") {
withTempDir { dir =>
val log = DeltaLog.forTable(spark, dir.getAbsolutePath)
val data = spark.range(10)
val cdcData = spark.range(20, 25).withColumn(CDC_TYPE_COLUMN_NAME, lit("insert"))

data.write.format("delta").save(dir.getAbsolutePath)
sql(s"DELETE FROM delta.`${dir.getAbsolutePath}`")
writeCdcData(log, cdcData)

assert(
CDCReader
.changesToBatchDF(log, 0, 2, spark)
.queryExecution
.optimizedPlan
.collectLeaves()
.exists {
case l: LogicalRDD => l.stats.sizeInBytes == 0 && !l.isStreaming
case _ => false
}
)
}
}

test("cdc update ops") {
withTempDir { dir =>
val log = DeltaLog.forTable(spark, dir.getAbsolutePath)
Expand Down

0 comments on commit 8a2da73

Please # to comment.