Skip to content

Commit

Permalink
Revert "stub: Have ClientCalls.ThreadlessExecutor reject Runnables af…
Browse files Browse the repository at this point in the history
…ter end of RPC (#8847)" (#8933)

This reverts commit bb33657.
  • Loading branch information
YifeiZhuang authored Feb 19, 2022
1 parent 80a2ca6 commit 6559ef8
Showing 1 changed file with 7 additions and 35 deletions.
42 changes: 7 additions & 35 deletions stub/src/main/java/io/grpc/stub/ClientCalls.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.locks.LockSupport;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -162,7 +161,6 @@ public static <ReqT, RespT> RespT blockingUnaryCall(
// Something very bad happened. All bets are off; it may be dangerous to wait for onClose().
throw cancelThrow(call, e);
} finally {
executor.shutdown();
if (interrupt) {
Thread.currentThread().interrupt();
}
Expand Down Expand Up @@ -628,9 +626,6 @@ private Object waitForNext() {
// Now wait for onClose() to be called, so interceptors can clean up
}
}
if (next == this || next instanceof StatusRuntimeException) {
threadless.shutdown();
}
return next;
}
} finally {
Expand Down Expand Up @@ -717,10 +712,7 @@ private static final class ThreadlessExecutor extends ConcurrentLinkedQueue<Runn
implements Executor {
private static final Logger log = Logger.getLogger(ThreadlessExecutor.class.getName());

private static final Object SHUTDOWN = new Object(); // sentinel

// Set to the calling thread while it's parked, SHUTDOWN on RPC completion
private volatile Object waiter;
private volatile Thread waiter;

// Non private to avoid synthetic class
ThreadlessExecutor() {}
Expand All @@ -744,29 +736,14 @@ public void waitAndDrain() throws InterruptedException {
}
}
do {
runQuietly(runnable);
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
} while ((runnable = poll()) != null);
}

/**
* Called after final call to {@link #waitAndDrain()}, from same thread.
*/
public void shutdown() {
waiter = SHUTDOWN;
Runnable runnable;
while ((runnable = poll()) != null) {
runQuietly(runnable);
}
}

private static void runQuietly(Runnable runnable) {
try {
runnable.run();
} catch (Throwable t) {
log.log(Level.WARNING, "Runnable threw exception", t);
}
}

private static void throwIfInterrupted() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
Expand All @@ -776,12 +753,7 @@ private static void throwIfInterrupted() throws InterruptedException {
@Override
public void execute(Runnable runnable) {
add(runnable);
Object waiter = this.waiter;
if (waiter != SHUTDOWN) {
LockSupport.unpark((Thread) waiter); // no-op if null
} else if (remove(runnable)) {
throw new RejectedExecutionException();
}
LockSupport.unpark(waiter); // no-op if null
}
}

Expand Down

0 comments on commit 6559ef8

Please # to comment.