diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala index 79e66d41346..5d8bb434f7d 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral} -import org.apache.spark.sql.catalyst.expressions.objects.InvokeLike import org.apache.spark.sql.catalyst.util.TypeUtils import org.apache.spark.sql.execution.InSubqueryExec import org.apache.spark.sql.expressions.SparkUserDefinedFunction @@ -597,6 +596,93 @@ trait DataSkippingReaderBase maxExpr: Expression, nullCountExpr: Expression) + /** + * Whitelist of expressions that can be rewritten as partition-like. + * Set to a finite list to avoid having to silently introducing correctness issues as new + * expressions that violate the assumptions of partition-like skipping are introduced. + * There's no need to include [[SkippingEligibleColumn]] here - it's already handled explicitly. + * + * The following expressions have been intentionally excluded from the whitelist of supported + * expressions: + * - [[AttributeReference]]: Any non-skipping eligible column references can't be rewritten as + * partition-like. + * - Any nondeterministic expression: The value returned while skipping might be different when + * the expression is evaluated again. For example, rand() > 0.5 would return ~25% of records + * if used in data skipping, while the user would expect ~50% of records to be returned. + * - [[UserDefinedExpression]]: Often nondeterministic, and may have side effects when executed + * multiple times. + * - [[RegExpReplace]], [[RegExpExtractBase]], [[Like]], [[MultiLikeBase]], [[InvokeLike]], and + * [[JsonToStructs]]: These expressions might be very expensive to evalute more than once. + */ + private def shouldRewriteAsPartitionLike(expr: Expression): Boolean = expr match { + // Expressions supported by traditional data skipping. + // Boolean operators. AND is explicitly handled by the caller. + case _: Not | _: Or => true + // Comparison operators. + case _: EqualNullSafe | _: EqualTo | _: GreaterThan | _: GreaterThanOrEqual | _: IsNull | + _: IsNotNull | _: LessThan | _: LessThanOrEqual => true + // String and set operators. InSubqueryExec is explicitly handled by the caller. + case _: In | _: InSet | _: StartsWith => true + case _: Literal => true + + // Expressions only supported for partition-like data skipping. + // Date and time conversions. + case _: ConvertTimezone | _: DateFormatClass | _: Extract | _: GetDateField | + _: GetTimeField | _: IntegralToTimestampBase | _: MakeDate | _: MakeTimestamp | + _: ParseToDate | _: ParseToTimestamp | _: ToTimestamp | _: TruncDate | + _: TruncTimestamp | _: UTCTimestamp => true + // Unix date and timestamp conversions. + case _: DateFromUnixDate | _: FromUnixTime | _: TimestampToLongBase | _: ToUnixTimestamp | + _: UnixDate | _: UnixTime | _: UnixTimestamp => true + // Date and time arithmetic. + case _: AddMonthsBase | _: DateAdd | _: DateAddInterval | _: DateDiff | _: DateSub | + _: DatetimeSub | _: LastDay | _: MonthsBetween | _: NextDay | _: SubtractDates | + _: SubtractTimestamps | _: TimeAdd | _: TimestampAdd | _: TimestampAddYMInterval | + _: TimestampDiff | _: TruncInstant => true + // String expressions. + case _: Base64 | _: BitLength | _: Chr | _: ConcatWs | _: Decode | _: Elt | _: Empty2Null | + _: Encode | _: FormatNumber | _: FormatString | _: ILike | _: InitCap | _: Left | + _: Length | _: Levenshtein | _: Luhncheck | _: OctetLength | _: Overlay | _: Right | + _: Sentences | _: SoundEx | _: SplitPart | _: String2StringExpression | + _: String2TrimExpression | _: StringDecode | _: StringInstr | _: StringLPad | + _: StringLocate | _: StringPredicate | _: StringRPad | _: StringRepeat | + _: StringReplace | _: StringSpace | _: StringSplit | _: StringSplitSQL | + _: StringTranslate | _: StringTrimBoth | _: Substring | _: SubstringIndex | _: ToBinary | + _: TryToBinary | _: UnBase64 => true + // Arithmetic expressions. + case _: Abs | _: BinaryArithmetic | _: Greatest | _: Least | _: UnaryMinus | + _: UnaryPositive => true + // Array expressions. + case _: ArrayBinaryLike | _: ArrayCompact | _: ArrayContains | _: ArrayInsert | _: ArrayJoin | + _: ArrayMax | _: ArrayMin | _: ArrayPosition | _: ArrayRemove | _: ArrayRepeat | + _: ArraySetLike | _: ArraySize | _: ArraysZip | + _: BinaryArrayExpressionWithImplicitCast | _: Concat | _: CreateArray | _: ElementAt | + _: Flatten | _: Get | _: GetArrayItem | _: GetArrayStructFields | + _: Reverse | _: Sequence | _: Size | _: Slice | _: SortArray | _: TryElementAt => true + // Map expressions. + case _: CreateMap | _: GetMapValue | _: MapConcat | _: MapContainsKey | _: MapEntries | + _: MapFromArrays | _: MapFromEntries | _: MapKeys | _: MapValues | _: StringToMap => true + // Struct expressions. + case _: CreateNamedStruct | _: DropField | _: GetStructField | _: UpdateFields | + _: WithField => true + // Hash expressions. + case _: Crc32 | _: HashExpression[_] | _: Md5 | _: Sha1 | _: Sha2 => true + // URL expressions. + case _: ParseUrl | _: UrlDecode | _: UrlEncode => true + // NULL expressions. + case _: AtLeastNNonNulls | _: Coalesce | _: IsNaN | _: NaNvl | _: NullIf | _: Nvl | + _: Nvl2 => true + // Cast expressions. + case _: Cast | _: UpCast => true + // Conditional expressions. + case _: If | _: CaseWhen => true + case _: Alias => true + + // Don't attempt partition-like skipping on any unknown expressions: there's no way to + // guarantee it's safe to do so. + case _ => false + } + /** * Rewrites the references in an expression to point to the collected stats over that column * (if possible). @@ -650,10 +736,10 @@ trait DataSkippingReaderBase } // For other attribute references, we can't safely rewrite the expression. case SkippingEligibleColumn(_, _) => None - // Don't attempt data skipping on a nondeterministic expression, since the value returned - // might be different when executed twice on the same input. - // For example, rand() > 0.5 would return ~25% of records if used in data skipping, while the - // user would expect ~50% of records to be returned. + // Explicitly disallow rewriting nondeterministic expressions. Even though this check isn't + // strictly necessary (there shouldn't be any nondeterministic expressions in the whitelist), + // defensively keep it due to the extreme risk of correctness issues if any nondeterministic + // expressions sneak into the whitelist. case other if !other.deterministic => None // Inline subquery results to support InSet. The subquery should generally have already been // evaluated. @@ -667,13 +753,6 @@ trait DataSkippingReaderBase Some(InSet(rewrittenChildren, possiblyNullValues.toSet), referencedStats) } } - // Don't allow rewriting UDFs - even if deterministic, UDFs might have some unexpected - // side effects when executed twice. - case _: UserDefinedExpression => None - // Don't attempt to rewrite expressions might be extremely expensive to invoke twice. - case _: RegExpReplace | _: RegExpExtractBase | _: Like | _: MultiLikeBase => None - case _: InvokeLike => None - case _: JsonToStructs => None // Pushdown NOT through OR - we prefer AND to OR because AND can tolerate one branch not being // rewriteable. case Not(Or(e1, e2)) => @@ -688,14 +767,16 @@ trait DataSkippingReaderBase Some((And(newLeft, newRight), statsLeft ++ statsRight)) case _ => leftResult.orElse(rightResult) } - // For all other expressions, recursively rewrite the children. - case other => + // For all other eligible expressions, recursively rewrite the children. + case other if shouldRewriteAsPartitionLike(other) => val childResults = other.children.map( rewriteDataFiltersAsPartitionLikeInternal(_, clusteringColumnPaths)) Option.whenNot (childResults.exists(_.isEmpty)) { val (children, stats) = childResults.map(_.get).unzip (other.withNewChildren(children), stats.flatten.toSet) } + // Don't attempt rewriting any non-whitelisted expressions. + case _ => None } /** diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala index c03d8fb371d..531f4fad93f 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/PartitionLikeDataSkippingSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.SparkConf import org.apache.spark.sql.QueryTest import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.functions.{col, concat, lit, struct} +import org.apache.spark.sql.functions.{array, col, concat, lit, struct} import org.apache.spark.sql.test.SharedSparkSession trait PartitionLikeDataSkippingSuiteBase @@ -341,6 +341,38 @@ trait PartitionLikeDataSkippingSuiteBase allPredicatesUsed = true, minNumFilesToApply = 1) } + + test("partition-like data skipping expression references non-skipping eligible columns") { + val tbl = "tbl" + withClusteredTable( + table = tbl, + schema = "a BIGINT, b ARRAY, c STRUCT, e BIGINT>", + clusterBy = "a") { + spark.range(10) + .withColumnRenamed("id", "a") + .withColumn("b", array(col("a"), lit(0L))) + .withColumn("c", struct(array(col("a"), lit(0L)), lit(0L))) + .select("a", "b", "c") // Reorder columns to ensure the schema matches. + .repartitionByRange(10, col("a")) + .write.format("delta").mode("append").insertInto(tbl) + + // All files should be read because the filters are on columns that aren't skipping eligible. + validateExpectedScanMetrics( + tableName = tbl, + query = s"SELECT * FROM $tbl WHERE GET(b, 1) = 0", + expectedNumFiles = 10, + expectedNumPartitionLikeDataFilters = 0, + allPredicatesUsed = false, + minNumFilesToApply = 1) + validateExpectedScanMetrics( + tableName = tbl, + query = s"SELECT * FROM $tbl WHERE GET(c.d, 1) = 0", + expectedNumFiles = 10, + expectedNumPartitionLikeDataFilters = 0, + allPredicatesUsed = false, + minNumFilesToApply = 1) + } + } } class PartitionLikeDataSkippingSuite extends PartitionLikeDataSkippingSuiteBase