diff --git a/SingularityService/src/main/java/com/hubspot/singularity/SingularityManagedThreadPoolFactory.java b/SingularityService/src/main/java/com/hubspot/singularity/SingularityManagedThreadPoolFactory.java index ff8a0d6b14..bc44b24646 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/SingularityManagedThreadPoolFactory.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/SingularityManagedThreadPoolFactory.java @@ -56,9 +56,16 @@ public synchronized ExecutorService get(String name, int maxSize) { return service; } - public synchronized ExecutorAndQueue get(String name, int maxSize, int queueSize) { + public synchronized ExecutorAndQueue get( + String name, + int maxSize, + int queueSize, + boolean blockWhenFull + ) { checkState(!stopped.get(), "already stopped"); - LinkedBlockingQueue queue = new LinkedBlockingQueue(queueSize); + LinkedBlockingQueue queue = blockWhenFull + ? new ThreadPoolQueue(queueSize) + : new LinkedBlockingQueue<>(queueSize); ExecutorService service = new ThreadPoolExecutor( maxSize, maxSize, @@ -102,4 +109,21 @@ public void stop() throws Exception { } } } + + public static final class ThreadPoolQueue extends LinkedBlockingQueue { + + public ThreadPoolQueue(int capacity) { + super(capacity); + } + + @Override + public boolean offer(Runnable e) { + try { + put(e); + } catch (InterruptedException e1) { + return false; + } + return true; + } + } } diff --git a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java index 9cafce0a25..defd10eb9b 100644 --- a/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java +++ b/SingularityService/src/main/java/com/hubspot/singularity/mesos/SingularityMesosStatusUpdateHandler.java @@ -139,7 +139,8 @@ public SingularityMesosStatusUpdateHandler( threadPoolFactory.get( "status-updates", configuration.getMesosConfiguration().getStatusUpdateConcurrencyLimit(), - configuration.getMesosConfiguration().getMaxStatusUpdateQueueSize() + configuration.getMesosConfiguration().getMaxStatusUpdateQueueSize(), + true ); } diff --git a/SingularityService/src/test/java/com/hubspot/singularity/helpers/SingularityBlockingThreadPoolTest.java b/SingularityService/src/test/java/com/hubspot/singularity/helpers/SingularityBlockingThreadPoolTest.java new file mode 100644 index 0000000000..a75c31cc14 --- /dev/null +++ b/SingularityService/src/test/java/com/hubspot/singularity/helpers/SingularityBlockingThreadPoolTest.java @@ -0,0 +1,113 @@ +package com.hubspot.singularity.helpers; + +import com.hubspot.singularity.SingularityManagedThreadPoolFactory; +import com.hubspot.singularity.async.ExecutorAndQueue; +import com.hubspot.singularity.config.SingularityConfiguration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.stream.IntStream; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class SingularityBlockingThreadPoolTest { + + @Test + public void testBoundedQueueBlocksWhenFull() { + SingularityManagedThreadPoolFactory threadPoolFactory = new SingularityManagedThreadPoolFactory( + new SingularityConfiguration() + ); + Assertions.assertThrows( + RejectedExecutionException.class, + () -> { + ExecutorAndQueue executorAndQueue = threadPoolFactory.get("test", 2, 5, false); + IntStream + .range(0, 10) + .forEach( + i -> + executorAndQueue + .getExecutorService() + .submit( + () -> { + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + // didn't see that... + } + } + ) + ); + } + ); + + Assertions.assertDoesNotThrow( + () -> { + ExecutorAndQueue executorAndQueue = threadPoolFactory.get("test", 2, 5, true); + IntStream + .range(0, 10) + .forEach( + i -> + executorAndQueue + .getExecutorService() + .submit( + () -> { + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + // didn't see that... + } + } + ) + ); + } + ); + } + + @Test + public void testBoundedQueueBlocksWhenFullForCompletableFutures() { + SingularityManagedThreadPoolFactory threadPoolFactory = new SingularityManagedThreadPoolFactory( + new SingularityConfiguration() + ); + Assertions.assertThrows( + RejectedExecutionException.class, + () -> { + ExecutorAndQueue executorAndQueue = threadPoolFactory.get("test", 2, 5, false); + IntStream + .range(0, 10) + .forEach( + i -> + CompletableFuture.runAsync( + () -> { + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + // didn't see that... + } + }, + executorAndQueue.getExecutorService() + ) + ); + } + ); + + Assertions.assertDoesNotThrow( + () -> { + ExecutorAndQueue executorAndQueue = threadPoolFactory.get("test", 2, 5, true); + IntStream + .range(0, 10) + .forEach( + i -> + CompletableFuture.runAsync( + () -> { + try { + Thread.sleep(2000); + } catch (InterruptedException ie) { + // didn't see that... + } + }, + executorAndQueue.getExecutorService() + ) + ); + } + ); + } +}