Skip to content

Commit

Permalink
Return warmup permits in case primary allocation fails
Browse files Browse the repository at this point in the history
Pool resources should be correctly restored according to size based allocation strategy after allocator problems disappear

Fixes #174

---------

Co-authored-by: k.tokarev <k.tokarev@a1-systems.com>
  • Loading branch information
k-tokarev and Konstriktor authored Sep 12, 2023
1 parent 6d98818 commit 9d61d0f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ else if (sig.isOnError()) {
final int mergeConcurrency = Math.min(poolConfig.allocationStrategy().warmupParallelism(), toWarmup + 1);
Flux.range(1, toWarmup)
.map(i -> warmupMono(i, toWarmup, startWarmupIteration, allocator).doOnSuccess(__ -> drain()))
.startWith(primary.doOnSuccess(__ -> drain()).then())
.startWith(primary.doOnSuccess(__ -> drain()).onErrorComplete().then())
.flatMap(Function.identity(), mergeConcurrency, 1) // since we dont store anything the inner buffer can be simplified
.onErrorResume(e -> Mono.empty())
.subscribe(aVoid -> { }, alreadyPropagatedOrLogged -> drain(), this::drain);
Expand Down
46 changes: 46 additions & 0 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -2634,4 +2634,50 @@ void poolExposesConfig(PoolStyle style) {
assertThat(config.allocationStrategy().estimatePermitCount()).as("maxSize").isEqualTo(123);
assertThat(config.clock()).as("clock").isSameAs(clock);
}

@ParameterizedTestWithName
@EnumSource
void testIssue_174(PoolStyle style) {
final AtomicBoolean canAllocateResource = new AtomicBoolean(true);
final Mono<String> allocator = Mono.defer(() ->
canAllocateResource.get() ?
Mono.just("value") :
Mono.error(new IllegalStateException("Can't allocate"))
);

final PoolBuilder<String, PoolConfig<String>> configBuilder = PoolBuilder
.from(allocator)
.maxPendingAcquireUnbounded()
.sizeBetween(10, 10); // Spring Boot R2DBC connections pool default
final InstrumentedPool<String> pool = style.apply(configBuilder);

// New empty pool. No resources allocated yet, but has min-size (10) permits
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10);
assertThat(pool.metrics().idleSize()).isEqualTo(0);

// Try to acquire one resource. This should trigger pool "warmup" to min-size of resources
StepVerifier.create(pool.acquire().flatMap(PooledRef::release)).verifyComplete();
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0);
assertThat(pool.metrics().idleSize()).isEqualTo(10);

// Now allocator will return errors (simulating inaccessible DB server for R2DBC connections pool)
canAllocateResource.set(false);

// We have 10 allocated resources in the pool, but they are not valid anymore, so invalidate them
StepVerifier.create(Flux.range(0, 10).concatMap(ignore -> pool.acquire().flatMap(PooledRef::invalidate)))
.verifyComplete();
assertThat(pool.metrics().idleSize()).isEqualTo(0);
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10);

// Now we have empty pool, so it should be warmed up again but allocator still not working
StepVerifier.create(pool.acquire()).verifyError();
assertThat(pool.metrics().idleSize()).isEqualTo(0);
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(10);

// Return allocator to "working" state and check what pool warms up correctly
canAllocateResource.set(true);
StepVerifier.create(pool.acquire().flatMap(PooledRef::release)).verifyComplete();
assertThat(pool.config().allocationStrategy().estimatePermitCount()).isEqualTo(0);
assertThat(pool.metrics().idleSize()).isEqualTo(10);
}
}

0 comments on commit 9d61d0f

Please # to comment.