Skip to content

Commit

Permalink
[CELEBORN-1763] Fix DataPusher be blocked for a long time
Browse files Browse the repository at this point in the history
fix DataPusher be blocked for a long time

The worker has been at a performance bottleneck for a long time, the slow start strategy adjusts its maxInFlight to 1, which may cause RequestInFlight to exceed maxInFlight. If the task’s main thread has been blocked in the waitIdleQueueFullWithLock call, then the main thread will not be able to detect the sending failure since this failure changes the exception in the push state, and the waitIdleQueueFullWithLock function does not check for it

NO

GA

Closes #2978 from zhaostu4/fix_pusher_block.

Authored-by: zhangzhao.08 <zhangzhao.08@bytedance.com>
Signed-off-by: Wang, Fei <fwang12@ebay.com>
(cherry picked from commit 9e04ff4)
Signed-off-by: Wang, Fei <fwang12@ebay.com>
  • Loading branch information
zhaostu4 authored and turboFei committed Dec 23, 2024
1 parent de436f7 commit c6855e6
Showing 1 changed file with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.celeborn.common.CelebornConf;
import org.apache.celeborn.common.exception.CelebornIOException;
import org.apache.celeborn.common.util.ThreadExceptionHandler;
import org.apache.celeborn.common.util.Utils;
import org.apache.celeborn.common.write.PushState;

public class DataPusher {
private static final Logger logger = LoggerFactory.getLogger(DataPusher.class);
Expand All @@ -43,6 +45,7 @@ public class DataPusher {

private LinkedBlockingQueue<PushTask> idleQueue;
// partition -> PushTask Queue
private final PushState pushState;
private final DataPushQueue dataPushQueue;
private final ReentrantLock idleLock = new ReentrantLock();
private final Condition idleFull = idleLock.newCondition();
Expand Down Expand Up @@ -98,6 +101,8 @@ public DataPusher(
this.client = client;
this.afterPush = afterPush;
this.mapStatusLengths = mapStatusLengths;
final String mapKey = Utils.makeMapKey(shuffleId, mapId, attemptId);
this.pushState = client.getPushState(mapKey);

pushThread =
new Thread("celeborn-client-data-pusher-" + taskId) {
Expand Down Expand Up @@ -193,6 +198,9 @@ public void checkException() throws IOException {
if (exceptionRef.get() != null) {
throw exceptionRef.get();
}
if (pushState.exception.get() != null) {
throw pushState.exception.get();
}
}

protected void pushData(PushTask task) throws IOException {
Expand All @@ -216,6 +224,7 @@ private void waitIdleQueueFullWithLock() throws InterruptedException {
while (idleQueue != null
&& idleQueue.remainingCapacity() > 0
&& exceptionRef.get() == null
&& pushState.exception.get() == null
&& (pushThread != null && pushThread.isAlive())) {
idleFull.await(WAIT_TIME_NANOS, TimeUnit.NANOSECONDS);
}
Expand All @@ -228,7 +237,9 @@ private void waitIdleQueueFullWithLock() throws InterruptedException {
}

protected boolean stillRunning() {
return !terminated && !Objects.nonNull(exceptionRef.get());
return !terminated
&& !Objects.nonNull(exceptionRef.get())
&& !Objects.nonNull(pushState.exception.get());
}

public DataPushQueue getDataPushQueue() {
Expand Down

0 comments on commit c6855e6

Please # to comment.