Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Spark] Fix AttributeReference mismatch for readChangeFeed queries co…
…ming from Spark Connect (#3451) <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description This fixes an issue in the `DeltaAnalysis` rule, more specifically `fromV2Relation`, that leads to Spark Connect `readChangeFeed` queries failing when applying Projections, Selections, etc. to the underlying table's columns due to an `AttributeReference` mismatch: Spark Connect Query: ``` spark.read.format("delta").option("readChangeFeed", "true").option("startingVersion", 0) .table("main.dillitz.test").select("id").show() ``` Unresolved Logical Plan: ``` common { plan_id: 15 } project { input { common { plan_id: 14 } read { named_table { unparsed_identifier: "main.dillitz.test" options { key: "startingVersion" value: "0" } options { key: "readChangeFeed" value: "true" } } } } expressions { unresolved_attribute { unparsed_identifier: "id" } } } ``` Resolved Logical Plan: ``` 'Project ['id] +- 'UnresolvedRelation [dillitz, default, test], [startingVersion=0, readChangeFeed=true], false ``` Plan before DeltaAnalysis rule: ``` Project [id#594L] +- SubqueryAlias dillitz.default.test +- RelationV2[id#594L] dillitz.default.test dillitz.default.test ``` Plan after DeltaAnalysis rule: ``` !Project [id#594L] +- SubqueryAlias spark_catalog.delta.`/private/var/folders/11/kfrr0zqj4w3_lb6mpjk76q_00000gp/T/spark-8f2dc5b0-6722-4928-90bb-fba73bd9ce87` +- Relation [id#595L,_change_type#596,_commit_version#597L,_commit_timestamp#598] DeltaCDFRelation(SnapshotWithSchemaMode(...)) ``` Error: ``` org.apache.spark.sql.catalyst.ExtendedAnalysisException: [MISSING_ATTRIBUTES.RESOLVED_ATTRIBUTE_APPEAR_IN_OPERATION] Resolved attribute(s) "id" missing from "id", "_change_type", "_commit_version", "_commit_timestamp" in operator !Project [id#493L]. Attribute(s) with the same name appear in the operation: "id". Please check if the right attribute(s) are used. SQLSTATE: XX000; !Project [id#493L] +- SubqueryAlias dillitz.default.test +- Relation dillitz.default.test[id#494L,_change_type#495,_commit_version#496L,_commit_timestamp#497] DeltaCDFRelation(SnapshotWithSchemaMode(...)) at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:55) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2(CheckAnalysis.scala:694) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis0$2$adapted(CheckAnalysis.scala:197) at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:287) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0(CheckAnalysis.scala:197) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis0$(CheckAnalysis.scala:179) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis0(Analyzer.scala:341) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.$anonfun$checkAnalysis$1(CheckAnalysis.scala:167) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis(CheckAnalysis.scala:155) at org.apache.spark.sql.catalyst.analysis.CheckAnalysis.checkAnalysis$(CheckAnalysis.scala:155) at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:341) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$2(Analyzer.scala:396) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:169) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:396) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:443) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:393) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:260) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:441) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$4(QueryExecution.scala:600) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:1152) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:600) at com.databricks.util.LexicalThreadLocal$Handle.runWith(LexicalThreadLocal.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:596) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:596) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:254) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:253) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:235) at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:105) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.SparkSession.$anonfun$withActiveAndFrameProfiler$1(SparkSession.scala:1187) at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:94) at org.apache.spark.sql.SparkSession.withActiveAndFrameProfiler(SparkSession.scala:1187) at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:103) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformShowString(SparkConnectPlanner.scala:323) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.$anonfun$transformRelation$1(SparkConnectPlanner.scala:169) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$usePlanCache$3(SessionHolder.scala:480) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.connect.service.SessionHolder.usePlanCache(SessionHolder.scala:479) at org.apache.spark.sql.connect.planner.SparkConnectPlanner.transformRelation(SparkConnectPlanner.scala:166) at org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:90) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handlePlan(ExecuteThreadRunner.scala:312) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:244) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:176) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:343) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180) at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:343) at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97) at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:83) at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:237) at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:82) at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:342) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:176) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:126) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:530) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51) at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:103) at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:108) at scala.util.Using$.resource(Using.scala:269) at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:107) at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:529) ``` ## How was this patch tested? Created an E2E Spark Connect test running queries like the one above. Not sure how to merge it into this repository. ## Does this PR introduce _any_ user-facing changes? No.
- Loading branch information