Skip to content

Commit

Permalink
[Spark] Restrict partition-like data filters to whitelist of known-go…
Browse files Browse the repository at this point in the history
…od expressions (#3872)
  • Loading branch information
chirag-s-db authored Dec 2, 2024
1 parent 4d2c5cf commit 81f27b3
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.Literal.{FalseLiteral, TrueLiteral}
import org.apache.spark.sql.catalyst.expressions.objects.InvokeLike
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.execution.InSubqueryExec
import org.apache.spark.sql.expressions.SparkUserDefinedFunction
Expand Down Expand Up @@ -597,6 +596,93 @@ trait DataSkippingReaderBase
maxExpr: Expression,
nullCountExpr: Expression)

/**
* Whitelist of expressions that can be rewritten as partition-like.
* Set to a finite list to avoid having to silently introducing correctness issues as new
* expressions that violate the assumptions of partition-like skipping are introduced.
* There's no need to include [[SkippingEligibleColumn]] here - it's already handled explicitly.
*
* The following expressions have been intentionally excluded from the whitelist of supported
* expressions:
* - [[AttributeReference]]: Any non-skipping eligible column references can't be rewritten as
* partition-like.
* - Any nondeterministic expression: The value returned while skipping might be different when
* the expression is evaluated again. For example, rand() > 0.5 would return ~25% of records
* if used in data skipping, while the user would expect ~50% of records to be returned.
* - [[UserDefinedExpression]]: Often nondeterministic, and may have side effects when executed
* multiple times.
* - [[RegExpReplace]], [[RegExpExtractBase]], [[Like]], [[MultiLikeBase]], [[InvokeLike]], and
* [[JsonToStructs]]: These expressions might be very expensive to evalute more than once.
*/
private def shouldRewriteAsPartitionLike(expr: Expression): Boolean = expr match {
// Expressions supported by traditional data skipping.
// Boolean operators. AND is explicitly handled by the caller.
case _: Not | _: Or => true
// Comparison operators.
case _: EqualNullSafe | _: EqualTo | _: GreaterThan | _: GreaterThanOrEqual | _: IsNull |
_: IsNotNull | _: LessThan | _: LessThanOrEqual => true
// String and set operators. InSubqueryExec is explicitly handled by the caller.
case _: In | _: InSet | _: StartsWith => true
case _: Literal => true

// Expressions only supported for partition-like data skipping.
// Date and time conversions.
case _: ConvertTimezone | _: DateFormatClass | _: Extract | _: GetDateField |
_: GetTimeField | _: IntegralToTimestampBase | _: MakeDate | _: MakeTimestamp |
_: ParseToDate | _: ParseToTimestamp | _: ToTimestamp | _: TruncDate |
_: TruncTimestamp | _: UTCTimestamp => true
// Unix date and timestamp conversions.
case _: DateFromUnixDate | _: FromUnixTime | _: TimestampToLongBase | _: ToUnixTimestamp |
_: UnixDate | _: UnixTime | _: UnixTimestamp => true
// Date and time arithmetic.
case _: AddMonthsBase | _: DateAdd | _: DateAddInterval | _: DateDiff | _: DateSub |
_: DatetimeSub | _: LastDay | _: MonthsBetween | _: NextDay | _: SubtractDates |
_: SubtractTimestamps | _: TimeAdd | _: TimestampAdd | _: TimestampAddYMInterval |
_: TimestampDiff | _: TruncInstant => true
// String expressions.
case _: Base64 | _: BitLength | _: Chr | _: ConcatWs | _: Decode | _: Elt | _: Empty2Null |
_: Encode | _: FormatNumber | _: FormatString | _: ILike | _: InitCap | _: Left |
_: Length | _: Levenshtein | _: Luhncheck | _: OctetLength | _: Overlay | _: Right |
_: Sentences | _: SoundEx | _: SplitPart | _: String2StringExpression |
_: String2TrimExpression | _: StringDecode | _: StringInstr | _: StringLPad |
_: StringLocate | _: StringPredicate | _: StringRPad | _: StringRepeat |
_: StringReplace | _: StringSpace | _: StringSplit | _: StringSplitSQL |
_: StringTranslate | _: StringTrimBoth | _: Substring | _: SubstringIndex | _: ToBinary |
_: TryToBinary | _: UnBase64 => true
// Arithmetic expressions.
case _: Abs | _: BinaryArithmetic | _: Greatest | _: Least | _: UnaryMinus |
_: UnaryPositive => true
// Array expressions.
case _: ArrayBinaryLike | _: ArrayCompact | _: ArrayContains | _: ArrayInsert | _: ArrayJoin |
_: ArrayMax | _: ArrayMin | _: ArrayPosition | _: ArrayRemove | _: ArrayRepeat |
_: ArraySetLike | _: ArraySize | _: ArraysZip |
_: BinaryArrayExpressionWithImplicitCast | _: Concat | _: CreateArray | _: ElementAt |
_: Flatten | _: Get | _: GetArrayItem | _: GetArrayStructFields |
_: Reverse | _: Sequence | _: Size | _: Slice | _: SortArray | _: TryElementAt => true
// Map expressions.
case _: CreateMap | _: GetMapValue | _: MapConcat | _: MapContainsKey | _: MapEntries |
_: MapFromArrays | _: MapFromEntries | _: MapKeys | _: MapValues | _: StringToMap => true
// Struct expressions.
case _: CreateNamedStruct | _: DropField | _: GetStructField | _: UpdateFields |
_: WithField => true
// Hash expressions.
case _: Crc32 | _: HashExpression[_] | _: Md5 | _: Sha1 | _: Sha2 => true
// URL expressions.
case _: ParseUrl | _: UrlDecode | _: UrlEncode => true
// NULL expressions.
case _: AtLeastNNonNulls | _: Coalesce | _: IsNaN | _: NaNvl | _: NullIf | _: Nvl |
_: Nvl2 => true
// Cast expressions.
case _: Cast | _: UpCast => true
// Conditional expressions.
case _: If | _: CaseWhen => true
case _: Alias => true

// Don't attempt partition-like skipping on any unknown expressions: there's no way to
// guarantee it's safe to do so.
case _ => false
}

/**
* Rewrites the references in an expression to point to the collected stats over that column
* (if possible).
Expand Down Expand Up @@ -650,10 +736,10 @@ trait DataSkippingReaderBase
}
// For other attribute references, we can't safely rewrite the expression.
case SkippingEligibleColumn(_, _) => None
// Don't attempt data skipping on a nondeterministic expression, since the value returned
// might be different when executed twice on the same input.
// For example, rand() > 0.5 would return ~25% of records if used in data skipping, while the
// user would expect ~50% of records to be returned.
// Explicitly disallow rewriting nondeterministic expressions. Even though this check isn't
// strictly necessary (there shouldn't be any nondeterministic expressions in the whitelist),
// defensively keep it due to the extreme risk of correctness issues if any nondeterministic
// expressions sneak into the whitelist.
case other if !other.deterministic => None
// Inline subquery results to support InSet. The subquery should generally have already been
// evaluated.
Expand All @@ -667,13 +753,6 @@ trait DataSkippingReaderBase
Some(InSet(rewrittenChildren, possiblyNullValues.toSet), referencedStats)
}
}
// Don't allow rewriting UDFs - even if deterministic, UDFs might have some unexpected
// side effects when executed twice.
case _: UserDefinedExpression => None
// Don't attempt to rewrite expressions might be extremely expensive to invoke twice.
case _: RegExpReplace | _: RegExpExtractBase | _: Like | _: MultiLikeBase => None
case _: InvokeLike => None
case _: JsonToStructs => None
// Pushdown NOT through OR - we prefer AND to OR because AND can tolerate one branch not being
// rewriteable.
case Not(Or(e1, e2)) =>
Expand All @@ -688,14 +767,16 @@ trait DataSkippingReaderBase
Some((And(newLeft, newRight), statsLeft ++ statsRight))
case _ => leftResult.orElse(rightResult)
}
// For all other expressions, recursively rewrite the children.
case other =>
// For all other eligible expressions, recursively rewrite the children.
case other if shouldRewriteAsPartitionLike(other) =>
val childResults = other.children.map(
rewriteDataFiltersAsPartitionLikeInternal(_, clusteringColumnPaths))
Option.whenNot (childResults.exists(_.isEmpty)) {
val (children, stats) = childResults.map(_.get).unzip
(other.withNewChildren(children), stats.flatten.toSet)
}
// Don't attempt rewriting any non-whitelisted expressions.
case _ => None
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkConf
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.functions.{col, concat, lit, struct}
import org.apache.spark.sql.functions.{array, col, concat, lit, struct}
import org.apache.spark.sql.test.SharedSparkSession

trait PartitionLikeDataSkippingSuiteBase
Expand Down Expand Up @@ -341,6 +341,38 @@ trait PartitionLikeDataSkippingSuiteBase
allPredicatesUsed = true,
minNumFilesToApply = 1)
}

test("partition-like data skipping expression references non-skipping eligible columns") {
val tbl = "tbl"
withClusteredTable(
table = tbl,
schema = "a BIGINT, b ARRAY<BIGINT>, c STRUCT<d ARRAY<BIGINT>, e BIGINT>",
clusterBy = "a") {
spark.range(10)
.withColumnRenamed("id", "a")
.withColumn("b", array(col("a"), lit(0L)))
.withColumn("c", struct(array(col("a"), lit(0L)), lit(0L)))
.select("a", "b", "c") // Reorder columns to ensure the schema matches.
.repartitionByRange(10, col("a"))
.write.format("delta").mode("append").insertInto(tbl)

// All files should be read because the filters are on columns that aren't skipping eligible.
validateExpectedScanMetrics(
tableName = tbl,
query = s"SELECT * FROM $tbl WHERE GET(b, 1) = 0",
expectedNumFiles = 10,
expectedNumPartitionLikeDataFilters = 0,
allPredicatesUsed = false,
minNumFilesToApply = 1)
validateExpectedScanMetrics(
tableName = tbl,
query = s"SELECT * FROM $tbl WHERE GET(c.d, 1) = 0",
expectedNumFiles = 10,
expectedNumPartitionLikeDataFilters = 0,
allPredicatesUsed = false,
minNumFilesToApply = 1)
}
}
}

class PartitionLikeDataSkippingSuite extends PartitionLikeDataSkippingSuiteBase
Expand Down

0 comments on commit 81f27b3

Please # to comment.