Skip to content

Commit

Permalink
Merge #2958 into 2.0.0-M4
Browse files Browse the repository at this point in the history
  • Loading branch information
violetagg committed Nov 2, 2023
2 parents 68f3048 + e2a5439 commit 707f0d6
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2022 VMware, Inc. or its affiliates, All Rights Reserved.
* Copyright (c) 2021-2023 VMware, Inc. or its affiliates, All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -44,6 +44,8 @@ public abstract class AbstractChannelMetricsHandler extends ChannelHandlerAdapte

final boolean onServer;

boolean channelOpened;

protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, boolean onServer) {
this.remoteAddress = remoteAddress;
this.onServer = onServer;
Expand All @@ -53,6 +55,7 @@ protected AbstractChannelMetricsHandler(@Nullable SocketAddress remoteAddress, b
public void channelActive(ChannelHandlerContext ctx) {
if (onServer) {
try {
channelOpened = true;
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -69,7 +72,10 @@ public void channelActive(ChannelHandlerContext ctx) {
public void channelInactive(ChannelHandlerContext ctx) {
if (onServer) {
try {
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
if (channelOpened) {
channelOpened = false;
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ abstract class AbstractHttpServerMetricsHandler extends ChannelHandlerAdapter {
private static final Logger log = Loggers.getLogger(AbstractHttpServerMetricsHandler.class);

boolean channelActivated;
boolean channelOpened;

long dataReceived;

Expand Down Expand Up @@ -78,6 +79,7 @@ public void channelActive(ChannelHandlerContext ctx) {
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
channelOpened = true;
recorder().recordServerConnectionOpened(ctx.channel().localAddress());
}
catch (RuntimeException e) {
Expand All @@ -95,7 +97,10 @@ public void channelInactive(ChannelHandlerContext ctx) {
if (!(ctx.channel() instanceof Http2StreamChannel) && recorder() instanceof MicrometerHttpServerMetricsRecorder) {
try {
// Always use the real connection local address without any proxy information
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
if (channelOpened) {
channelOpened = false;
recorder().recordServerConnectionClosed(ctx.channel().localAddress());
}
}
catch (RuntimeException e) {
if (log.isWarnEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,7 @@ static void configureHttp11Pipeline(ChannelPipeline p,
boolean accessLogEnabled,
@Nullable Function<AccessLogArgProvider, AccessLog> accessLog,
@Nullable BiPredicate<HttpServerRequest, HttpServerResponse> compressPredicate,
boolean channelOpened,
HttpRequestDecoderSpec decoder,
HttpServerFormDecoderProvider formDecoderProvider,
@Nullable BiFunction<ConnectionInfo, HttpRequest, ConnectionInfo> forwardedHeaderHandler,
Expand Down Expand Up @@ -700,7 +701,7 @@ static void configureHttp11Pipeline(ChannelPipeline p,

if (metricsRecorder != null) {
if (metricsRecorder instanceof HttpServerMetricsRecorder) {
ChannelHandler handler;
AbstractHttpServerMetricsHandler handler;
if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder micrometerHttpServerMetricsRecorder) {
handler = new MicrometerHttpServerMetricsHandler(micrometerHttpServerMetricsRecorder, uriTagValue);
}
Expand All @@ -710,6 +711,9 @@ else if (metricsRecorder instanceof ContextAwareHttpServerMetricsRecorder contex
else {
handler = new HttpServerMetricsHandler((HttpServerMetricsRecorder) metricsRecorder, uriTagValue);
}
if (channelOpened) {
handler.channelOpened = true;
}
p.addAfter(NettyPipeline.HttpTrafficHandler, NettyPipeline.HttpMetricsHandler, handler);
if (metricsRecorder instanceof MicrometerHttpServerMetricsRecorder) {
// For sake of performance, we can remove the ChannelMetricsHandler because the MicrometerHttpServerMetricsRecorder
Expand Down Expand Up @@ -1118,7 +1122,7 @@ protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
}

if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate,
configureHttp11Pipeline(p, accessLogEnabled, accessLog, compressPredicate, true,
decoder, formDecoderProvider, forwardedHeaderHandler, httpMessageLogFactory, idleTimeout, listener, mapHandle,
maxKeepAliveRequests, metricsRecorder, minCompressionSize, readTimeout, requestTimeout, uriTagValue);

Expand Down Expand Up @@ -1210,6 +1214,7 @@ else if ((protocols & h11) == h11) {
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
false,
decoder,
formDecoderProvider,
forwardedHeaderHandler,
Expand Down Expand Up @@ -1277,6 +1282,7 @@ else if ((protocols & h11) == h11) {
accessLogEnabled,
accessLog,
compressPredicate(compressPredicate, minCompressionSize),
false,
decoder,
formDecoderProvider,
forwardedHeaderHandler,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -933,6 +933,55 @@ void testServerConnectionsRecorderBadUriForwarded(HttpProtocol[] serverProtocols
Function.identity());
}


@ParameterizedTest
@MethodSource("combinationsIssue2956")
void testIssue2956(boolean isCustomRecorder, boolean isHttp2) throws Exception {
HttpServer server =
httpServer.secure(spec -> spec.sslContext(isHttp2 ? serverCtx2 : serverCtx11))
.protocol(isHttp2 ? HttpProtocol.H2 : HttpProtocol.HTTP11)
.doOnConnection(conn -> ServerCloseHandler.INSTANCE.register(conn.channel()));

if (isCustomRecorder) {
server = server.metrics(true, ServerRecorder.supplier());
}

disposableServer = server.bindNow();

httpClient.protocol(isHttp2 ? HttpProtocol.H2C : HttpProtocol.HTTP11)
.post()
.uri("/1")
.send(BufferFlux.fromString(Mono.just("hello")))
.responseContent()
.aggregate()
.asString()
.as(StepVerifier::create)
.expectError()
.verify(Duration.ofSeconds(5));

assertThat(ServerCloseHandler.INSTANCE.awaitClientClosedOnServer()).as("awaitClientClosedOnServer timeout").isTrue();

if (isCustomRecorder) {
assertThat(ServerRecorder.INSTANCE.onServerConnectionsAmount.get()).isEqualTo(0);
}
else {
InetSocketAddress sa = (InetSocketAddress) disposableServer.channel().localAddress();
String serverAddress = sa.getHostString() + ":" + sa.getPort();
String[] tags = new String[]{URI, HTTP, LOCAL_ADDRESS, serverAddress};
checkGauge(SERVER_CONNECTIONS_TOTAL, false, 0, tags);
}
}

static Stream<Arguments> combinationsIssue2956() {
return Stream.of(
// isCustomRecorder, isHttp2
Arguments.of(false, false),
Arguments.of(false, true),
Arguments.of(true, false),
Arguments.of(true, true)
);
}

private void testServerConnectionsRecorderBadUri(HttpProtocol[] serverProtocols, HttpProtocol[] clientProtocols,
@Nullable ProtocolSslContextSpec serverCtx, @Nullable ProtocolSslContextSpec clientCtx,
@Nullable String xForwardedFor, int xForwardedPort,
Expand Down

0 comments on commit 707f0d6

Please # to comment.