diff --git a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java index 249a3581..822c4b21 100644 --- a/reactor-pool/src/main/java/reactor/pool/AbstractPool.java +++ b/reactor-pool/src/main/java/reactor/pool/AbstractPool.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -427,9 +427,11 @@ public void request(long n) { boolean noIdle = pool.idleSize() == 0; boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0; - if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) { + if (noIdle && noPermits) { pendingAcquireStart = pool.clock.millis(); - timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); + if (!pendingAcquireTimeout.isZero()) { + timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout); + } } //doAcquire should interrupt the countdown if there is either an available //resource or the pool can allocate one @@ -441,12 +443,14 @@ public void request(long n) { * Stop the countdown started when calling {@link AbstractPool#doAcquire(Borrower)}. */ void stopPendingCountdown(boolean success) { - if (!timeoutTask.isDisposed()) { + if (pendingAcquireStart > 0) { if (success) { pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart); } else { pool.metricsRecorder.recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart); } + + pendingAcquireStart = 0; } timeoutTask.dispose(); } diff --git a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java index e93e59f2..6760b9d0 100644 --- a/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java +++ b/reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -2689,31 +2689,39 @@ void recordsPendingCountAndLatencies(PoolStyle configAdjuster) { PoolBuilder builder = PoolBuilder .from(Mono.defer(() -> Mono.just("foo"))) .metricsRecorder(recorder) - .sizeBetween(0, 1) + .sizeBetween(0, 2) .clock(recorder.getClock()); Pool pool = configAdjuster.apply(builder); //success, acquisition happens immediately - PooledRef pooledRef = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1)); - assertThat(pooledRef).isNotNull(); + PooledRef pooledRef1 = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1)); + assertThat(pooledRef1).isNotNull(); + + // success, acquisition happens immediately without timeout + PooledRef pooledRef2 = pool.acquire().block(Duration.ofSeconds(1)); + assertThat(pooledRef2).isNotNull(); //success, acquisition happens after pending some time pool.acquire(Duration.ofMillis(50)).subscribe(); + // success, acquisition happens after pending some time without timeout + pool.acquire().subscribe(); + //error, timed out pool.acquire(Duration.ofMillis(1)) .as(StepVerifier::create) .expectError(PoolAcquireTimeoutException.class) .verify(Duration.ofSeconds(1)); - pooledRef.release().block(Duration.ofSeconds(1)); + pooledRef1.release().block(Duration.ofSeconds(1)); + pooledRef2.release().block(Duration.ofSeconds(1)); assertThat(recorder.getPendingTotalCount()) .as("total pending") - .isEqualTo(2); + .isEqualTo(3); assertThat(recorder.getPendingSuccessCount()) .as("pending success") - .isEqualTo(1) + .isEqualTo(2) .isEqualTo(recorder.getPendingSuccessHistogram().getTotalCount()); assertThat(recorder.getPendingErrorCount()) .as("pending errors")