Skip to content

Commit

Permalink
[Spark] Fix AttributeReference mismatch for readChangeFeed queries co…
Browse files Browse the repository at this point in the history
…ming from Spark Connect (#3451)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description
This fixes an issue in the `DeltaAnalysis` rule, more specifically
`fromV2Relation`, that leads to Spark Connect `readChangeFeed` queries
failing when applying Projections, Selections, etc. to the underlying
table's columns due to an `AttributeReference` mismatch:
Spark Connect Query:
```
spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0)
   .table("main.dillitz.test").select("id").show()
```
Unresolved Logical Plan:
```
common {
  plan_id: 15
}
project {
  input {
    common {
      plan_id: 14
    }
    read {
      named_table {
        unparsed_identifier: "main.dillitz.test"
        options {
          key: "startingVersion"
          value: "0"
        }
        options {
          key: "readChangeFeed"
          value: "true"
        }
      }
    }
  }
  expressions {
    unresolved_attribute {
      unparsed_identifier: "id"
    }
  }
}
```
Resolved Logical Plan:
```
'Project ['id]
+- 'UnresolvedRelation [dillitz, default, test], [startingVersion=0, readChangeFeed=true], false 
```
Plan before DeltaAnalysis rule:
```
Project [id#594L]
+- SubqueryAlias dillitz.default.test
   +- RelationV2[id#594L] dillitz.default.test dillitz.default.test
```
Plan after DeltaAnalysis rule:
```
!Project [id#594L]
+- SubqueryAlias spark_catalog.delta.`/private/var/folders/11/kfrr0zqj4w3_lb6mpjk76q_00000gp/T/spark-8f2dc5b0-6722-4928-90bb-fba73bd9ce87`
   +- Relation [id#595L,_change_type#596,_commit_version#597L,_commit_timestamp#598] DeltaCDFRelation(SnapshotWithSchemaMode(...))
```
Error:
```
org.apache.spark.sql.catalyst.ExtendedAnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "id" missing from "id", "_change_type", "_commit_version", "_commit_timestamp" in operator !Project [id#493L]. Attribute(s) with the same name appear in the operation: "id".
Please check if the right attribute(s) are used. SQLSTATE: XX000;
!Project [id#493L]
+- SubqueryAlias dillitz.default.test
   +- Relation dillitz.default.test[id#494L,_change_type#495,_commit_version#496L,_commit_timestamp#497] DeltaCDFRelation(SnapshotWithSchemaMode(...))

	at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:55)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:694)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:287)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:197)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:179)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:341)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:167)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:155)
	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:155)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:341)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$2(Analyzer.scala:396)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:169)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:396)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:443)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:393)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:260)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:441)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:600)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1152)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:600)
	at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:596)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:596)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:254)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:253)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:235)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:105)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1187)
	at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94)
	at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1187)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:103)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformShowString(SparkConnectPlanner.scala:323)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:169)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:480)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:479)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:166)
	at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:90)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:312)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:244)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:176)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:343)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:343)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:83)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:237)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:82)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:342)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:176)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:530)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:529)
```
## How was this patch tested?
Created an E2E Spark Connect test running queries like the one above.
Not sure how to merge it into this repository.

## Does this PR introduce _any_ user-facing changes?
No.
  • Loading branch information
dillitz authored Aug 7, 2024
1 parent 100cc4d commit 4f768f1
Showing 1 changed file with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,15 @@ object DeltaRelation extends DeltaLogging {
val relation = d.withOptions(options.asScala.toMap).toBaseRelation
val output = if (CDCReader.isCDCRead(options)) {
// Handles cdc for the spark.read.options().table() code path
toAttributes(relation.schema)
// Mapping needed for references to the table's columns coming from Spark Connect.
val newOutput = toAttributes(relation.schema)
newOutput.map { a =>
val existingReference = v2Relation.output
.find(e => e.name == a.name && e.dataType == a.dataType && e.nullable == a.nullable)
existingReference.map { e =>
e.copy(metadata = a.metadata)(exprId = e.exprId, qualifier = e.qualifier)
}.getOrElse(a)
}
} else {
v2Relation.output
}
Expand Down

0 comments on commit 4f768f1

Please # to comment.