diff --git a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java index 3c8d381d61..edd19aba14 100644 --- a/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java +++ b/reactor-netty-core/src/main/java/reactor/netty/resources/PooledConnectionProvider.java @@ -355,7 +355,19 @@ final void disposeInactivePoolsInBackground() { if (log.isDebugEnabled()) { log.debug("ConnectionProvider[name={}]: Disposing inactive pool for [{}]", name, e.getKey().holder); } - e.getValue().dispose(); + SocketAddress address = e.getKey().holder; + String id = e.getKey().hashCode() + ""; + PoolFactory poolFactory = poolFactory(address); + e.getValue().disposeLater().then( + Mono.fromRunnable(() -> { + if (poolFactory.registrar != null) { + poolFactory.registrar.get().deRegisterMetrics(name, id, address); + } + else if (Metrics.isInstrumentationAvailable()) { + deRegisterDefaultMetrics(id, address); + } + }) + ).subscribe(); } }); } diff --git a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java index 772be14553..870b1f78f3 100644 --- a/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java +++ b/reactor-netty-http/src/test/java/reactor/netty/resources/DefaultPooledConnectionProviderTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2022 VMware, Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2020-2023 VMware, Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,6 +73,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; @@ -649,7 +650,7 @@ void testMinConnections() throws Exception { @ParameterizedTest @MethodSource("disposeInactivePoolsInBackgroundCombinations") - void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boolean isHttp2) throws Exception { + void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boolean isHttp2, boolean isBuiltInMetrics) throws Exception { disposableServer = createServer() .wiretap(false) @@ -668,6 +669,19 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole builder.evictInBackground(Duration.ofMillis(50)); } + MeterRegistrarImpl meterRegistrar; + String metricsName = ""; + if (isBuiltInMetrics) { + meterRegistrar = null; + builder.metrics(true); + + metricsName = isHttp2 ? "http2.testDisposeInactivePoolsInBackground" : "testDisposeInactivePoolsInBackground"; + } + else { + meterRegistrar = new MeterRegistrarImpl(); + builder.metrics(true, () -> meterRegistrar); + } + CountDownLatch latch = new CountDownLatch(10); DefaultPooledConnectionProvider provider = (DefaultPooledConnectionProvider) builder.build(); HttpClient client = @@ -694,6 +708,12 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole .verify(Duration.ofSeconds(5)); assertThat(provider.channelPools.size()).isEqualTo(1); + if (meterRegistrar != null) { + assertThat(meterRegistrar.registered.get()).isTrue(); + } + else { + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isNotEqualTo(-1); + } if (enableEvictInBackground) { assertThat(latch.await(5, TimeUnit.SECONDS)).isTrue(); @@ -706,6 +726,17 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole .isEqualTo(enableEvictInBackground ? 0 : 1)); assertThat(provider.isDisposed()).isEqualTo(enableEvictInBackground); + if (meterRegistrar != null) { + assertThat(meterRegistrar.deRegistered.get()).isEqualTo(enableEvictInBackground); + } + else { + if (enableEvictInBackground) { + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isEqualTo(-1); + } + else { + assertThat(getGaugeValue(CONNECTION_PROVIDER_PREFIX + ACTIVE_CONNECTIONS, NAME, metricsName)).isNotEqualTo(-1); + } + } } finally { if (!enableEvictInBackground) { @@ -717,10 +748,15 @@ void testDisposeInactivePoolsInBackground(boolean enableEvictInBackground, boole static Stream disposeInactivePoolsInBackgroundCombinations() { return Stream.of( - Arguments.of(false, false), - Arguments.of(false, true), - Arguments.of(true, false), - Arguments.of(true, true) + // enableEvictInBackground, isHttp2, isBuiltInMetrics + Arguments.of(false, false, false), + Arguments.of(false, false, true), + Arguments.of(false, true, false), + Arguments.of(false, true, true), + Arguments.of(true, false, false), + Arguments.of(true, false, true), + Arguments.of(true, true, false), + Arguments.of(true, true, true) ); } @@ -830,4 +866,22 @@ void testHttp2PoolAndGoAway() { .block(Duration.ofSeconds(5)); } } + + static final class MeterRegistrarImpl implements ConnectionProvider.MeterRegistrar { + AtomicBoolean registered = new AtomicBoolean(); + AtomicBoolean deRegistered = new AtomicBoolean(); + + MeterRegistrarImpl() { + } + + @Override + public void registerMetrics(String poolName, String id, SocketAddress remoteAddress, ConnectionPoolMetrics metrics) { + registered.compareAndSet(false, true); + } + + @Override + public void deRegisterMetrics(String poolName, String id, SocketAddress remoteAddress) { + deRegistered.compareAndSet(false, true); + } + } }