From c6855e6e806a69d0c77a601953fffc17378257ee Mon Sep 17 00:00:00 2001 From: "zhangzhao.08" Date: Sun, 22 Dec 2024 23:08:36 -0800 Subject: [PATCH] [CELEBORN-1763] Fix DataPusher be blocked for a long time MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit fix DataPusher be blocked for a long time The worker has been at a performance bottleneck for a long time, the slow start strategy adjusts its maxInFlight to 1, which may cause RequestInFlight to exceed maxInFlight. If the task’s main thread has been blocked in the waitIdleQueueFullWithLock call, then the main thread will not be able to detect the sending failure since this failure changes the exception in the push state, and the waitIdleQueueFullWithLock function does not check for it NO GA Closes #2978 from zhaostu4/fix_pusher_block. Authored-by: zhangzhao.08 Signed-off-by: Wang, Fei (cherry picked from commit 9e04ff4a9fe12473be10b043887206134fcc66fd) Signed-off-by: Wang, Fei --- .../apache/celeborn/client/write/DataPusher.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java index 311a8da9900..6e344e917e6 100644 --- a/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java +++ b/client/src/main/java/org/apache/celeborn/client/write/DataPusher.java @@ -35,6 +35,8 @@ import org.apache.celeborn.common.CelebornConf; import org.apache.celeborn.common.exception.CelebornIOException; import org.apache.celeborn.common.util.ThreadExceptionHandler; +import org.apache.celeborn.common.util.Utils; +import org.apache.celeborn.common.write.PushState; public class DataPusher { private static final Logger logger = LoggerFactory.getLogger(DataPusher.class); @@ -43,6 +45,7 @@ public class DataPusher { private LinkedBlockingQueue idleQueue; // partition -> PushTask Queue + private final PushState pushState; private final DataPushQueue dataPushQueue; private final ReentrantLock idleLock = new ReentrantLock(); private final Condition idleFull = idleLock.newCondition(); @@ -98,6 +101,8 @@ public DataPusher( this.client = client; this.afterPush = afterPush; this.mapStatusLengths = mapStatusLengths; + final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId); + this.pushState = client.getPushState(mapKey); pushThread = new Thread("celeborn-client-data-pusher-" + taskId) { @@ -193,6 +198,9 @@ public void checkException() throws IOException { if (exceptionRef.get() != null) { throw exceptionRef.get(); } + if (pushState.exception.get() != null) { + throw pushState.exception.get(); + } } protected void pushData(PushTask task) throws IOException { @@ -216,6 +224,7 @@ private void waitIdleQueueFullWithLock() throws InterruptedException { while (idleQueue != null && idleQueue.remainingCapacity() > 0 && exceptionRef.get() == null + && pushState.exception.get() == null && (pushThread != null && pushThread.isAlive())) { idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS); } @@ -228,7 +237,9 @@ private void waitIdleQueueFullWithLock() throws InterruptedException { } protected boolean stillRunning() { - return !terminated && !Objects.nonNull(exceptionRef.get()); + return !terminated + && !Objects.nonNull(exceptionRef.get()) + && !Objects.nonNull(pushState.exception.get()); } public DataPushQueue getDataPushQueue() {