From 4f768f1bfb0de003af5e4f20d2cd0fcbb857b996 Mon Sep 17 00:00:00 2001 From: Robert Dillitz Date: Wed, 7 Aug 2024 17:45:43 +0200 Subject: [PATCH] [Spark] Fix AttributeReference mismatch for readChangeFeed queries coming from Spark Connect (#3451) #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This fixes an issue in the `DeltaAnalysis` rule, more specifically `fromV2Relation`, that leads to Spark Connect `readChangeFeed` queries failing when applying Projections, Selections, etc. to the underlying table's columns due to an `AttributeReference` mismatch: Spark Connect Query: ``` spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0) .table("main.dillitz.test").select("id").show() ``` Unresolved Logical Plan: ``` common { plan_id: 15 } project { input { common { plan_id: 14 } read { named_table { unparsed_identifier: "main.dillitz.test" options { key: "startingVersion" value: "0" } options { key: "readChangeFeed" value: "true" } } } } expressions { unresolved_attribute { unparsed_identifier: "id" } } } ``` Resolved Logical Plan: ``` 'Project ['id] +- 'UnresolvedRelation [dillitz, default, test], [startingVersion=0, readChangeFeed=true], false ``` Plan before DeltaAnalysis rule: ``` Project [id#594L] +- SubqueryAlias dillitz.default.test +- RelationV2[id#594L] dillitz.default.test dillitz.default.test ``` Plan after DeltaAnalysis rule: ``` !Project [id#594L] +- SubqueryAlias spark_catalog.delta.`/private/var/folders/11/kfrr0zqj4w3_lb6mpjk76q_00000gp/T/spark-8f2dc5b0-6722-4928-90bb-fba73bd9ce87` +- Relation [id#595L,_change_type#596,_commit_version#597L,_commit_timestamp#598] DeltaCDFRelation(SnapshotWithSchemaMode(...)) ``` Error: ``` org.apache.spark.sql.catalyst.ExtendedAnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "id" missing from "id", "_change_type", "_commit_version", "_commit_timestamp" in operator !Project [id#493L]. Attribute(s) with the same name appear in the operation: "id". Please check if the right attribute(s) are used. SQLSTATE: XX000; !Project [id#493L] +- SubqueryAlias dillitz.default.test +- Relation dillitz.default.test[id#494L,_change_type#495,_commit_version#496L,_commit_timestamp#497] DeltaCDFRelation(SnapshotWithSchemaMode(...)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:55) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:694) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:197) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:287) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:197) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:341) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:167) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:155) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:341) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$2(Analyzer.scala:396) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:169) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:396) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:443) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:393) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:260) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:441) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:600) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1152) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:600) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:596) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:596) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:254) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:253) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:235) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:105) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1187) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1187) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:103) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformShowString(SparkConnectPlanner.scala:323) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:169) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:480) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:479) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:166) at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:90) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:312) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:244) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:176) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:343) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:343) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:83) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:237) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:82) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:342) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:176) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:530) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:529) ``` ## How was this patch tested? Created an E2E Spark Connect test running queries like the one above. Not sure how to merge it into this repository. ## Does this PR introduce _any_ user-facing changes? No. --- .../org/apache/spark/sql/delta/DeltaAnalysis.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala index 86f15942e44..bf393f4ac31 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaAnalysis.scala @@ -1218,7 +1218,15 @@ object DeltaRelation extends DeltaLogging { val relation = d.withOptions(options.asScala.toMap).toBaseRelation val output = if (CDCReader.isCDCRead(options)) { // Handles cdc for the spark.read.options().table() code path - toAttributes(relation.schema) + // Mapping needed for references to the table's columns coming from Spark Connect. + val newOutput = toAttributes(relation.schema) + newOutput.map { a => + val existingReference = v2Relation.output + .find(e => e.name == a.name && e.dataType == a.dataType && e.nullable == a.nullable) + existingReference.map { e => + e.copy(metadata = a.metadata)(exprId = e.exprId, qualifier = e.qualifier) + }.getOrElse(a) + } } else { v2Relation.output }