-
Notifications
You must be signed in to change notification settings - Fork 28.5k
[SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly() #16866
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
Conversation
Test build #72633 has started for PR 16866 at commit |
jenkins retest this please |
Test build #72635 has finished for PR 16866 at commit
|
How about throwing InterruptedIOException and also setting the interrupted state? E.g., try {
cf.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException();
} |
@zsxwing, as described in the PR description, if we throw an
|
Actually, to clarify: @zsxwing, are you suggesting to check |
In a second thought, I think your current approach is better, since we don't check InterruptedIOException in many places. |
LGTM |
In terms of API compatibility, I believe that the classes in In terms of source compatibility, this would have an impact on external Java code building against these sources since that code would now need to throw a new exception, but I don't think this is a common use-case and there are easy workarounds in such cases (either update the code or update the build to compile against the old interface even though you run against the new implementation (since binary compat is the same)). |
I prefer to just view these APIs as private since they are not in the public docs. We have some similar APIs in SQL, such as classes in |
Merging to master, branch-2.1, branch-2.0, and branch-1.6. Conflicts occurred, I'm fixing them manually. |
Branch-2.1 test compilation happens to be broken right now. Trying to fix the compilation failure first. |
…aitUninterruptibly() This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b) Signed-off-by: Cheng Lian <lian@databricks.com>
…aitUninterruptibly() This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes #16866 from JoshRosen/SPARK-19529. (cherry picked from commit 1c4d10b) Signed-off-by: Cheng Lian <lian@databricks.com>
Merged to master, branch-2.1, and branch-2.0. Files involved in branch-1.6 were moved to new directories and made it hard to cherry-pick. Created PR #16917 to backport this one to 1.6. |
## What changes were proposed in this pull request? This PR backports PR #16866 to branch-1.6 ## How was this patch tested? Existing tests. Author: Cheng Lian <lian@databricks.com> Closes #16917 from liancheng/spark-19529-1.6-backport.
## What changes were proposed in this pull request? This PR backports PR apache#16866 to branch-1.6 ## How was this patch tested? Existing tests. Author: Cheng Lian <lian@databricks.com> Closes apache#16917 from liancheng/spark-19529-1.6-backport. (cherry picked from commit a50ef3d)
…aitUninterruptibly() ## What changes were proposed in this pull request? This patch replaces a single `awaitUninterruptibly()` call with a plain `await()` call in Spark's `network-common` library in order to fix a bug which may cause tasks to be uncancellable. In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls `awaitUninterruptibly()` on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation when `interruptOnCancel = true`. As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack: ``` java.lang.Object.wait(Native Method) java.lang.Object.wait(Object.java:460) io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607) io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224) org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179) => holding Monitor(java.lang.Object1849476028}) org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105) org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140) org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120) org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114) org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169) org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala: 350) org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286) org.apache.spark.storage.ShuffleBlockFetcherIterator.<init>(ShuffleBlockFetcherIterator.scala:120) org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45) org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323) org.apache.spark.rdd.RDD.iterator(RDD.scala:287) [...] ``` As far as I can tell, `awaitUninterruptibly()` might have been used in order to avoid having to declare that methods throw `InterruptedException` (this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptible `await()` call,. This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility). An alternative approach would be to wrap `InterruptedException` into `IOException` in order to avoid having to change interfaces. The problem with this approach is that the `network-shuffle` project's `RetryingBlockFetcher` code treats `IOExceptions` as transitive failures when deciding whether to retry fetches, so throwing a wrapped `IOException` might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task. Note that there are three other `awaitUninterruptibly()` in the codebase, but those calls have a hard 10 second timeout and are waiting on a `close()` operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller. ## How was this patch tested? Manually. Author: Josh Rosen <joshrosen@databricks.com> Closes apache#16866 from JoshRosen/SPARK-19529.
…TransportClientFactory.createClient() ### What changes were proposed in this pull request? #41785 / SPARK-44241 introduced a new `awaitUninterruptibly()` call in one branch of `TrasportClientFactory.createClient()` (executed when the connection create timeout is non-positive). This PR replaces that call with an interruptible `await()` call. Note that the other pre-existing branches in this method were already using `await()`. ### Why are the changes needed? Uninterruptible waiting can cause problems when cancelling tasks. For details, see #16866 / SPARK-19529, an older PR fixing a similar issue in this same `TransportClientFactory.createClient()` method. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42619 from JoshRosen/remove-awaitUninterruptibly. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Kent Yao <yao@apache.org>
…TransportClientFactory.createClient() ### What changes were proposed in this pull request? #41785 / SPARK-44241 introduced a new `awaitUninterruptibly()` call in one branch of `TrasportClientFactory.createClient()` (executed when the connection create timeout is non-positive). This PR replaces that call with an interruptible `await()` call. Note that the other pre-existing branches in this method were already using `await()`. ### Why are the changes needed? Uninterruptible waiting can cause problems when cancelling tasks. For details, see #16866 / SPARK-19529, an older PR fixing a similar issue in this same `TransportClientFactory.createClient()` method. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42619 from JoshRosen/remove-awaitUninterruptibly. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 2137606) Signed-off-by: Kent Yao <yao@apache.org>
…TransportClientFactory.createClient() ### What changes were proposed in this pull request? #41785 / SPARK-44241 introduced a new `awaitUninterruptibly()` call in one branch of `TrasportClientFactory.createClient()` (executed when the connection create timeout is non-positive). This PR replaces that call with an interruptible `await()` call. Note that the other pre-existing branches in this method were already using `await()`. ### Why are the changes needed? Uninterruptible waiting can cause problems when cancelling tasks. For details, see #16866 / SPARK-19529, an older PR fixing a similar issue in this same `TransportClientFactory.createClient()` method. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42619 from JoshRosen/remove-awaitUninterruptibly. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 2137606) Signed-off-by: Kent Yao <yao@apache.org>
…TransportClientFactory.createClient() ### What changes were proposed in this pull request? #41785 / SPARK-44241 introduced a new `awaitUninterruptibly()` call in one branch of `TrasportClientFactory.createClient()` (executed when the connection create timeout is non-positive). This PR replaces that call with an interruptible `await()` call. Note that the other pre-existing branches in this method were already using `await()`. ### Why are the changes needed? Uninterruptible waiting can cause problems when cancelling tasks. For details, see #16866 / SPARK-19529, an older PR fixing a similar issue in this same `TransportClientFactory.createClient()` method. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42619 from JoshRosen/remove-awaitUninterruptibly. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 2137606) Signed-off-by: Kent Yao <yao@apache.org>
…TransportClientFactory.createClient() ### What changes were proposed in this pull request? apache#41785 / SPARK-44241 introduced a new `awaitUninterruptibly()` call in one branch of `TrasportClientFactory.createClient()` (executed when the connection create timeout is non-positive). This PR replaces that call with an interruptible `await()` call. Note that the other pre-existing branches in this method were already using `await()`. ### Why are the changes needed? Uninterruptible waiting can cause problems when cancelling tasks. For details, see apache#16866 / SPARK-19529, an older PR fixing a similar issue in this same `TransportClientFactory.createClient()` method. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. Closes apache#42619 from JoshRosen/remove-awaitUninterruptibly. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit 2137606) Signed-off-by: Kent Yao <yao@apache.org>
What changes were proposed in this pull request?
This patch replaces a single
awaitUninterruptibly()
call with a plainawait()
call in Spark'snetwork-common
library in order to fix a bug which may cause tasks to be uncancellable.In Spark's Netty RPC layer,
TransportClientFactory.createClient()
callsawaitUninterruptibly()
on a Netty future while waiting for a connection to be established. This creates problem when a Spark task is interrupted while blocking in this call (which can happen in the event of a slow connection which will eventually time out). This has bad impacts on task cancellation wheninterruptOnCancel = true
.As an example of the impact of this problem, I experienced significant numbers of uncancellable "zombie tasks" on a production cluster where several tasks were blocked trying to connect to a dead shuffle server and then continued running as zombies after I cancelled the associated Spark stage. The zombie tasks ran for several minutes with the following stack:
As far as I can tell,
awaitUninterruptibly()
might have been used in order to avoid having to declare that methods throwInterruptedException
(this code is written in Java, hence the need to use checked exceptions). This patch simply replaces this with a regular, interruptibleawait()
call,.This required several interface changes to declare a new checked exception (these are internal interfaces, though, and this change doesn't significantly impact binary compatibility).
An alternative approach would be to wrap
InterruptedException
intoIOException
in order to avoid having to change interfaces. The problem with this approach is that thenetwork-shuffle
project'sRetryingBlockFetcher
code treatsIOExceptions
as transitive failures when deciding whether to retry fetches, so throwing a wrappedIOException
might cause an interrupted shuffle fetch to be retried, further prolonging the lifetime of a cancelled zombie task.Note that there are three other
awaitUninterruptibly()
in the codebase, but those calls have a hard 10 second timeout and are waiting on aclose()
operation which is expected to complete near instantaneously, so the impact of uninterruptibility there is much smaller.How was this patch tested?
Manually.