diff --git a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java index 7a80131f..ae3ab338 100644 --- a/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java +++ b/reactor-pool/src/main/java/reactor/pool/SimpleDequePool.java @@ -451,7 +451,7 @@ else if (sig.isOnError()) { final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1); Flux.range(1, toWarmup) .map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain())) - .startWith(primary.doOnSuccess(__ -> drain()).then()) + .startWith(primary.doOnSuccess(__ -> drain()).onErrorComplete().then()) .flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified .onErrorResume(e -> Mono.empty()) .subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain); diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index 43824fdc..e066337b 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -2634,4 +2634,50 @@ void poolExposesConfig(PoolStyle style) { assertThat(config.allocationStrategy().estimatePermitCount()).as("maxSize").isEqualTo(123); assertThat(config.clock()).as("clock").isSameAs(clock); } + + @ParameterizedTestWithName + @EnumSource + void testIssue_174(PoolStyle style) { + final AtomicBoolean canAllocateResource = new AtomicBoolean(true); + final Mono allocator = Mono.defer(() -> + canAllocateResource.get() ? + Mono.just("value") : + Mono.error(new IllegalStateException("Can't allocate")) + ); + + final PoolBuilder> configBuilder = PoolBuilder + .from(allocator) + .maxPendingAcquireUnbounded() + .sizeBetween(10, 10); // Spring Boot R2DBC connections pool default + final InstrumentedPool pool = style.apply(configBuilder); + + // New empty pool. No resources allocated yet, but has min-size (10) permits + assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10); + assertThat(pool.metrics().idleSize()).isEqualTo(0); + + // Try to acquire one resource. This should trigger pool "warmup" to min-size of resources + StepVerifier.create(pool.acquire().flatMap(PooledRef::release)).verifyComplete(); + assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0); + assertThat(pool.metrics().idleSize()).isEqualTo(10); + + // Now allocator will return errors (simulating inaccessible DB server for R2DBC connections pool) + canAllocateResource.set(false); + + // We have 10 allocated resources in the pool, but they are not valid anymore, so invalidate them + StepVerifier.create(Flux.range(0, 10).concatMap(ignore -> pool.acquire().flatMap(PooledRef::invalidate))) + .verifyComplete(); + assertThat(pool.metrics().idleSize()).isEqualTo(0); + assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10); + + // Now we have empty pool, so it should be warmed up again but allocator still not working + StepVerifier.create(pool.acquire()).verifyError(); + assertThat(pool.metrics().idleSize()).isEqualTo(0); + assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10); + + // Return allocator to "working" state and check what pool warms up correctly + canAllocateResource.set(true); + StepVerifier.create(pool.acquire().flatMap(PooledRef::release)).verifyComplete(); + assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0); + assertThat(pool.metrics().idleSize()).isEqualTo(10); + } }