Skip to content

Commit

Permalink
Fix recordPendingSuccess/FailureAndLatency not recorded without timeo…
Browse files Browse the repository at this point in the history
…ut (#214)
  • Loading branch information
HeartPattern authored May 17, 2024
1 parent 243f877 commit 2e297a8
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 11 deletions.
12 changes: 8 additions & 4 deletions reactor-pool/src/main/java/reactor/pool/AbstractPool.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -427,9 +427,11 @@ public void request(long n) {
boolean noIdle = pool.idleSize() == 0;
boolean noPermits = pool.poolConfig.allocationStrategy().estimatePermitCount() == 0;

if (!pendingAcquireTimeout.isZero() && noIdle && noPermits) {
if (noIdle && noPermits) {
pendingAcquireStart = pool.clock.millis();
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
if (!pendingAcquireTimeout.isZero()) {
timeoutTask = this.pool.config().pendingAcquireTimer().apply(this, pendingAcquireTimeout);
}
}
//doAcquire should interrupt the countdown if there is either an available
//resource or the pool can allocate one
Expand All @@ -441,12 +443,14 @@ public void request(long n) {
* Stop the countdown started when calling {@link AbstractPool#doAcquire(Borrower)}.
*/
void stopPendingCountdown(boolean success) {
if (!timeoutTask.isDisposed()) {
if (pendingAcquireStart > 0) {
if (success) {
pool.metricsRecorder.recordPendingSuccessAndLatency(pool.clock.millis() - pendingAcquireStart);
} else {
pool.metricsRecorder.recordPendingFailureAndLatency(pool.clock.millis() - pendingAcquireStart);
}

pendingAcquireStart = 0;
}
timeoutTask.dispose();
}
Expand Down
22 changes: 15 additions & 7 deletions reactor-pool/src/test/java/reactor/pool/CommonPoolTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019-2023 VMware Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2019-2024 VMware Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -2689,31 +2689,39 @@ void recordsPendingCountAndLatencies(PoolStyle configAdjuster) {
PoolBuilder<String, ?> builder = PoolBuilder
.from(Mono.defer(() -> Mono.just("foo")))
.metricsRecorder(recorder)
.sizeBetween(0, 1)
.sizeBetween(0, 2)
.clock(recorder.getClock());
Pool<String> pool = configAdjuster.apply(builder);

//success, acquisition happens immediately
PooledRef<String> pooledRef = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1));
assertThat(pooledRef).isNotNull();
PooledRef<String> pooledRef1 = pool.acquire(Duration.ofMillis(1)).block(Duration.ofSeconds(1));
assertThat(pooledRef1).isNotNull();

// success, acquisition happens immediately without timeout
PooledRef<String> pooledRef2 = pool.acquire().block(Duration.ofSeconds(1));
assertThat(pooledRef2).isNotNull();

//success, acquisition happens after pending some time
pool.acquire(Duration.ofMillis(50)).subscribe();

// success, acquisition happens after pending some time without timeout
pool.acquire().subscribe();

//error, timed out
pool.acquire(Duration.ofMillis(1))
.as(StepVerifier::create)
.expectError(PoolAcquireTimeoutException.class)
.verify(Duration.ofSeconds(1));

pooledRef.release().block(Duration.ofSeconds(1));
pooledRef1.release().block(Duration.ofSeconds(1));
pooledRef2.release().block(Duration.ofSeconds(1));

assertThat(recorder.getPendingTotalCount())
.as("total pending")
.isEqualTo(2);
.isEqualTo(3);
assertThat(recorder.getPendingSuccessCount())
.as("pending success")
.isEqualTo(1)
.isEqualTo(2)
.isEqualTo(recorder.getPendingSuccessHistogram().getTotalCount());
assertThat(recorder.getPendingErrorCount())
.as("pending errors")
Expand Down

0 comments on commit 2e297a8

Please # to comment.