Skip to content

[FLINK-38180][task] Clean up task after switching to FAILED #26861

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

pnowojski
Copy link
Contributor

What is the purpose of the change

This prevents a race condition of some exception from clean up
hiding the real exception if `failExternally` is used in the clean up

Verifying this change

Added unit test to cover for a bug.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (yes / no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / no)
  • The serializers: (yes / no / don't know)
  • The runtime per-record code paths (performance sensitive): (yes / no / don't know)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / no / don't know)
  • The S3 file system connector: (yes / no / don't know)

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)

@@ -827,6 +829,8 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {
}
// else fall through the loop and
}

cleanUpRegistry.close();
Copy link
Contributor Author

@pnowojski pnowojski Aug 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NOTE! Take a look at the try/catch below. There is a change in behaviour. Now any exception from the cleanup will not be suppressed any more but will be treated as fatal error (I think correctly).

@flinkbot
Copy link
Collaborator

flinkbot commented Aug 1, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Contributor

@rkhachatryan rkhachatryan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the fix

Copy link
Contributor

@Savonitar Savonitar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, thanks for the PR!

@@ -612,6 +613,7 @@ private void doRun() {
// need to be undone in the end
Map<String, Future<Path>> distributedCacheEntries = new HashMap<>();
TaskInvokable invokable = null;
AutoCloseableRegistry cleanUpRegistry = new AutoCloseableRegistry();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice the registry is created for every task run, but it's only actually used in the failure case.
What do you think about adding a comment like

// Registry for actions that should be run if the task fails

That would help future readers understand why it's unused on the success path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was actually hopping for it to be more generic. If someone needs to defer some action, this registry could be used.

I could maybe rephrase this comment to:

// Registry for actions that should be run after the task has already failed 

?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, sure. Sounds good.

Copy link
Contributor

@Savonitar Savonitar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix.
LGTM

@pnowojski
Copy link
Contributor Author

e2e test was failing due to:

java.lang.NullPointerException: Cannot invoke "org.apache.flink.table.runtime.util.collections.binary.AbstractBytesHashMap.free()" because "this.aggregateMap$8" is null
        at LocalHashAggregateWithKeys$133.close(Unknown Source) ~[?:?]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1197) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:1101) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:950) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:965) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$1(Task.java:950) [flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:257) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:833) [flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:569) [flink-dist-2.2-SNAPSHOT.jar:2.2-SNAPSHOT]
        at java.base/java.lang.Thread.run(Thread.java:840) [?:?]

I think the problem was that I changed order of clean up calls. I'm trying to fix it with the fixup commit.

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 1, 2025
@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 9, 2025
@@ -885,6 +850,53 @@ else if (transitionState(current, ExecutionState.FAILED, t)) {
}
}

/**
* Transition into our final state in case of failure. We should be either in DEPLOYING,
* INITIALIZING, RUNNING, CANCELING, or FAILED loop for multiple retries during concurrent state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The sentence does not read well. I suggest

  • full stop after FAILED
  • then Loop to asynchronously clean up via calls to cancel() or to failExternally()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then Loop to asynchronously clean up via calls to cancel() or to failExternally()

I think this is not really better. I've rewritten the original sentence to:

Loop for multiple retries in case of concurrent state changes via calls to cancel() or to failExternally()

private void transitionStateOnFailure(
Throwable t, AutoCloseableRegistry postFailureCleanUpRegistry) throws IOException {
while (true) {
ExecutionState current = this.executionState;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest adding a yield() call in the loop to prevent this thread hogging the cpu in a tight loop until done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I would prefer avoid touching this, as this is pre-existing logic that I'm just extracting to a separate method, and I would like to minimise changes for this already pretty fragile bug fix (I've been struggling to fix all of the failing e2e/itcases for quite some time).
  2. This can't loop for more then one or maybe a couple of times. Task can't keep changing it's state for a long period of time. At worst this will just make two or three iterations.

@@ -1072,7 +1072,8 @@ public final void cleanUp(Throwable throwable) throws Exception {
LOG.debug(
"Cleanup StreamTask (operators closed: {}, cancelled: {})",
closedOperators,
canceled);
canceled,
throwable);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the throwable information come out in the dbeug?

I see the Logger.debug interface is

public void debug(String format, Object... arguments);

  /**
   * Log an exception (throwable) at the DEBUG level with an
   * accompanying message.
   *
   * @param msg the message accompanying the exception
   * @param t   the exception (throwable) to log
   */
  public void debug(String msg, Throwable t);

It looks like we should pass a resolved string and the throwable or have the Throwable as an insert in the message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks fixed, good catch!

@github-actions github-actions bot added community-reviewed PR has been reviewed by the community. and removed community-reviewed PR has been reviewed by the community. labels Aug 13, 2025
…depend on close not being called on failures
…ing closing

Exceptions thrown during Close can prevent resources from being cleaned up
and can cause TaskManager to exit with fatal error.
…throwing NPE on close

Before, NPE could have been thrown if operator was closed before properly
opening it, for example during task cancelation.
This prevents a race condition of some exception from clean up
hiding the real exception if `failExternally` is used in the clean up
@pnowojski
Copy link
Contributor Author

@flinkbot run azure

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
community-reviewed PR has been reviewed by the community.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants