From f4e1c91fdcbaf21a6e63f037d3bc80315dd539fd Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Thu, 7 Aug 2025 10:00:43 +0200 Subject: [PATCH 1/2] [FLINK-38201][table-planner] SinkUpsertMaterializer should not be inserted for retract sinks --- .../FlinkChangelogModeInferenceProgram.scala | 50 +++++------ .../nodes/exec/stream/SinkSemanticTests.java | 35 ++++++++ .../nodes/exec/stream/SinkTestPrograms.java | 89 +++++++++++++++++++ 3 files changed, 147 insertions(+), 27 deletions(-) create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java create mode 100644 flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala index a66394bfd292c..8134ea7d13762 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala @@ -1052,33 +1052,29 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val inputChangelogMode = ChangelogPlanUtils.getChangelogMode(sink.getInput.asInstanceOf[StreamPhysicalRel]).get val primaryKeys = sink.contextResolvedTable.getResolvedSchema.getPrimaryKeyIndexes - val upsertMaterialize = - tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match { - case UpsertMaterialize.FORCE => primaryKeys.nonEmpty - case UpsertMaterialize.NONE => false - case UpsertMaterialize.AUTO => - val sinkAcceptInsertOnly = sink.tableSink - .getChangelogMode(inputChangelogMode) - .containsOnly(RowKind.INSERT) - val inputInsertOnly = inputChangelogMode.containsOnly(RowKind.INSERT) - - if (!sinkAcceptInsertOnly && !inputInsertOnly && primaryKeys.nonEmpty) { - val pks = ImmutableBitSet.of(primaryKeys: _*) - val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) - val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) - // if input has update and primary key != upsert key (upsert key can be null) we should - // enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink - // pk(s) contains input changeLogUpsertKeys - if (changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains)) { - true - } else { - false - } - } else { - false - } - } - upsertMaterialize + val sinkChangelogMode = sink.tableSink.getChangelogMode(inputChangelogMode) + val inputIsAppend = inputChangelogMode.containsOnly(RowKind.INSERT) + val sinkIsAppend = sinkChangelogMode.containsOnly(RowKind.INSERT) + val sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE) + + tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match { + case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract + case UpsertMaterialize.NONE => false + case UpsertMaterialize.AUTO => + if (inputIsAppend || sinkIsAppend || sinkIsRetract) { + return false + } + if (primaryKeys.isEmpty) { + return false + } + val pks = ImmutableBitSet.of(primaryKeys: _*) + val fmq = FlinkRelMetadataQuery.reuseOrCreate(sink.getCluster.getMetadataQuery) + val changeLogUpsertKeys = fmq.getUpsertKeys(sink.getInput) + // if input has updates and primary key != upsert key (upsert key can be null) we should + // enable upsertMaterialize. An optimize is: do not enable upsertMaterialize when sink + // pk(s) contains input changeLogUpsertKeys + changeLogUpsertKeys == null || !changeLogUpsertKeys.exists(pks.contains) + } } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java new file mode 100644 index 0000000000000..747b6e91a4dd8 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkSemanticTests.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.planner.plan.nodes.exec.testutils.SemanticTestBase; +import org.apache.flink.table.test.program.TableTestProgram; + +import java.util.List; + +/** Semantic tests for {@link StreamExecSink}. */ +public class SinkSemanticTests extends SemanticTestBase { + + @Override + public List programs() { + return List.of( + SinkTestPrograms.INSERT_RETRACT_WITHOUT_PK, + SinkTestPrograms.INSERT_RETRACT_WITH_PK); + } +} diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java new file mode 100644 index 0000000000000..deed442ce3b89 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/SinkTestPrograms.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.flink.table.test.program.SinkTestStep; +import org.apache.flink.table.test.program.SourceTestStep; +import org.apache.flink.table.test.program.TableTestProgram; +import org.apache.flink.types.Row; +import org.apache.flink.types.RowKind; + +/** Tests for verifying sink semantics. */ +public class SinkTestPrograms { + + public static final TableTestProgram INSERT_RETRACT_WITHOUT_PK = + TableTestProgram.of( + "insert-retract-without-pk", + "The sink accepts retract input. Retract is directly passed through.") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("name STRING", "score INT") + .addOption("changelog-mode", "I") + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 3), + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.INSERT, "Bob", 6), + Row.ofKind(RowKind.INSERT, "Charly", 33)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema("name STRING", "score BIGINT") + .addOption("sink-changelog-mode-enforced", "I,UB,UA,D") + .consumedValues( + "+I[Alice, 3]", + "+I[Bob, 5]", + "-U[Bob, 5]", + "+U[Bob, 11]", + "+I[Charly, 33]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT name, SUM(score) FROM source_t GROUP BY name") + .build(); + + public static final TableTestProgram INSERT_RETRACT_WITH_PK = + TableTestProgram.of( + "insert-retract-with-pk", + "The sink accepts retract input. Although upsert keys (name) and primary keys (UPPER(name))" + + "don't match, the retract changelog is passed through.") + .setupTableSource( + SourceTestStep.newBuilder("source_t") + .addSchema("name STRING", "score INT") + .addOption("changelog-mode", "I") + .producedValues( + Row.ofKind(RowKind.INSERT, "Alice", 3), + Row.ofKind(RowKind.INSERT, "Bob", 5), + Row.ofKind(RowKind.INSERT, "Bob", 6), + Row.ofKind(RowKind.INSERT, "Charly", 33)) + .build()) + .setupTableSink( + SinkTestStep.newBuilder("sink_t") + .addSchema( + "name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT") + .addOption("sink-changelog-mode-enforced", "I,UB,UA,D") + .consumedValues( + "+I[ALICE, 3]", + "+I[BOB, 5]", + "-U[BOB, 5]", + "+U[BOB, 11]", + "+I[CHARLY, 33]") + .build()) + .runSql( + "INSERT INTO sink_t SELECT UPPER(name), SUM(score) FROM source_t GROUP BY name") + .build(); +} From 234e48edd0b65c13fb33b201733ef41a80860732 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 11 Aug 2025 13:31:44 +0200 Subject: [PATCH 2/2] Feedback addressed --- .../stream/DuplicateChangesInferRuleTest.xml | 8 +++---- .../planner/plan/stream/sql/DeltaJoinTest.xml | 6 ++--- .../planner/plan/stream/sql/TableSinkTest.xml | 23 ++++++++++++++++++ .../plan/stream/sql/TableSinkTest.scala | 24 +++++++++++++++++++ 4 files changed, 54 insertions(+), 7 deletions(-) diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml index 27aeaa5243134..b2fccab63efa6 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/rules/physical/stream/DuplicateChangesInferRuleTest.xml @@ -753,10 +753,10 @@ LogicalSink(table=[default_catalog.default_database.another_pk_snk], fields=[a, diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml index d689955746bb0..201f4ec63bfbb 100644 --- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml +++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/DeltaJoinTest.xml @@ -31,7 +31,7 @@ LogicalSink(table=[default_catalog.default_database.snk], fields=[a0, a1, a2, a3 + + + + + + + +