Skip to content

[FLINK-38201] SinkUpsertMaterializer should not be inserted for retract sinks #26879

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Aug 11, 2025

Conversation

twalthr
Copy link
Contributor

@twalthr twalthr commented Aug 7, 2025

What is the purpose of the change

Fixes retract sinks by not adding an SinkUpsertMaterializer. Since Flink's primary keys are declared as NOT ENFORCED, it is the users responsibility to ensure uniqueness. Retract streams are directly passed through without modification.

Brief change log

  • Consider retract mode in FlinkChangelogModeInferenceProgram

Verifying this change

This change added tests and can be verified as follows: SinkSemanticTests

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 7, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

SinkTestStep.newBuilder("sink_t")
.addSchema(
"name STRING PRIMARY KEY NOT ENFORCED", "score BIGINT")
.addOption("sink-changelog-mode-enforced", "I,UB,UA,D")
Copy link
Contributor

@gustavodemorais gustavodemorais Aug 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd probably just have added a plan test to check that SUM is not added anymore. Do we usually prefer semantic tests over to plan tests or is it just a matter of preference?

Copy link
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks great!

val sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE)

tableConfig.get(ExecutionConfigOptions.TABLE_EXEC_SINK_UPSERT_MATERIALIZE) match {
case UpsertMaterialize.FORCE => primaryKeys.nonEmpty && !sinkIsRetract
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding the two changes

  • Auto mode: makes sense we don't add SUM if the sinkIsRetract
  • Force: Do we know where UpsertMaterialize.FORCE is used? Is it safe to "not respect" the force here?

@snuyanzin
Copy link
Contributor

snuyanzin commented Aug 7, 2025

is it expected that plans for some queries in delta join changed?
Asking since probably need to update them

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 7, 2025
@twalthr
Copy link
Contributor Author

twalthr commented Aug 8, 2025

@xuyangzhong what do you think about this change? Is acceptable to simply update the affected test or do we need changes in DuplicateChangesInferRule? The current Flink behavior is incorrect: if a sink requests retract, the engine needs to guarantee that -U will be send, independent of the primary key declaration.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 9, 2025
@xuyangzhong
Copy link
Contributor

xuyangzhong commented Aug 11, 2025

@xuyangzhong what do you think about this change? Is acceptable to simply update the affected test or do we need changes in DuplicateChangesInferRule? The current Flink behavior is incorrect: if a sink requests retract, the engine needs to guarantee that -U will be send, independent of the primary key declaration.

Regarding this change for delta join, I've thought about it, and it is necessary for the DuplicateChangesInferRule to return false when dealing with retract sinks (in

). This is because delta joins might generate redundant data, while retract sinks require data to appear in pairs as both '+' and '-' forms.

This issue stands on its own, so I can go ahead and create a separate jira to resolve it if you prefer. It's entirely your decision.

In summary, here are the situations regarding the failed cases below:

  • DuplicateChangesInferRuleTest.testSinkWithMaterialize: the duplicateChanges in upstream operators for sink should be DISALLOW .
  • DeltaJoinTest.testCdcSource: just apply new changes.
  • DeltaJoinTest.testWithAggregatingAfterJoin: just apply new changes.
  • DeltaJoinTest.testWithAggregatingSourceTableBeforeJoin: just apply new changes.

@twalthr
Copy link
Contributor Author

twalthr commented Aug 11, 2025

Thanks for the quick reply @xuyangzhong. I tried to apply your suggestion to DuplicateChangesInferRule#canConsumeDuplicateChanges:

    private boolean canConsumeDuplicateChanges(StreamPhysicalSink sink) {
        try {
            final ChangelogMode sinkChangelogMode =
                    sink.tableSink().getChangelogMode(ChangelogMode.all());
            final boolean sinkIsAppend = sinkChangelogMode.containsOnly(RowKind.INSERT);
            final boolean sinkIsRetract = sinkChangelogMode.contains(RowKind.UPDATE_BEFORE);
            if (sinkIsAppend || sinkIsRetract) {
                return false;
            }
        } catch (Throwable t) {
            return false;
        }
        return sink.contextResolvedTable().getResolvedSchema().getPrimaryKey().isPresent();
    }

A lot of test will behave differently with this change. I guess also because pk_snk in DuplicateChangesInferRuleTest should be declared as 'sink-changelog-mode-enforced' = 'I,UA,D'? In any case, it might be best if you take another look at this issue. I will open a followup issue and disable the affected tests temporarily.

@xuyangzhong
Copy link
Contributor

Sure, @twalthr let's temporarily disable the test DuplicateChangesInferRuleTest.testSinkWithMaterialize to avoid introducing unrelated changes in this PR. I have created a new jira FLINK-38224 for this.

@twalthr twalthr merged commit 00241a6 into apache:master Aug 11, 2025
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants