From e516f05a3f181ec0981c91a1fa34e5e2729d4381 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 24 Apr 2024 17:24:52 +0300 Subject: [PATCH 1/9] [fix][broker] Fix BufferOverflowException bug in /metrics gzip compression --- .../prometheus/PrometheusMetricsGenerator.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 8cd68caf1ee26..c2687fdc66dc7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -24,6 +24,7 @@ import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import io.netty.buffer.Unpooled; import io.prometheus.client.Collector; import java.io.BufferedOutputStream; import java.io.IOException; @@ -233,17 +234,22 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is } } if (isLast) { - // write gzip footer, integer values are in little endian byte order - compressBuffer.order(ByteOrder.LITTLE_ENDIAN); - // write CRC32 checksum - compressBuffer.putInt((int) crc.getValue()); - // write uncompressed size - compressBuffer.putInt(deflater.getTotalIn()); // append the last compressed buffer backingCompressBuffer.setIndex(0, compressBuffer.position()); resultBuffer.addComponent(true, backingCompressBuffer); backingCompressBuffer = null; compressBuffer = null; + + // write gzip trailer + ByteBuffer trailer = ByteBuffer.allocate(8); + // write gzip trailer, integer values are in little endian byte order + trailer.order(ByteOrder.LITTLE_ENDIAN); + // write CRC32 checksum + trailer.putInt((int) crc.getValue()); + // write uncompressed size + trailer.putInt(deflater.getTotalIn()); + trailer.flip(); + resultBuffer.addComponent(true, Unpooled.wrappedBuffer(trailer)); } } From 377b1c24830aae61c6fb3f2133d08600a1fc2e07 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 08:03:21 +0300 Subject: [PATCH 2/9] Add test case --- .../PrometheusMetricsGenerator.java | 4 +- .../PrometheusMetricsGeneratorTest.java | 85 +++++++++++++++++++ 2 files changed, 87 insertions(+), 2 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index c2687fdc66dc7..7e6399c3d4e61 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -288,7 +288,7 @@ public PrometheusMetricsGenerator(PulsarService pulsar, boolean includeTopicMetr this.clock = clock; } - private ByteBuf generate0(List metricsProviders) { + protected ByteBuf generateMetrics(List metricsProviders) { ByteBuf buf = allocateMultipartCompositeDirectBuffer(); boolean exceptionHappens = false; //Used in namespace/topic and transaction aggregators as share metric names @@ -498,7 +498,7 @@ public MetricsBuffer renderToBuffer(Executor executor, List bufferFuture = newMetricsBuffer.getBufferFuture(); executor.execute(() -> { try { - bufferFuture.complete(new ResponseBuffer(generate0(metricsProviders))); + bufferFuture.complete(new ResponseBuffer(generateMetrics(metricsProviders))); } catch (Exception e) { bufferFuture.completeExceptionally(e); } finally { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java new file mode 100644 index 0000000000000..ed5c5a6335ceb --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGeneratorTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.stats.prometheus; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import com.google.common.util.concurrent.MoreExecutors; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.time.Clock; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutionException; +import java.util.zip.GZIPInputStream; +import org.apache.commons.io.IOUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.testng.annotations.Test; + +public class PrometheusMetricsGeneratorTest { + + // reproduce issue #22575 + @Test + public void testReproducingBufferOverflowExceptionAndEOFExceptionBugsInGzipCompression() + throws ExecutionException, InterruptedException, IOException { + PulsarService pulsar = mock(PulsarService.class); + ServiceConfiguration serviceConfiguration = new ServiceConfiguration(); + when(pulsar.getConfiguration()).thenReturn(serviceConfiguration); + + // generate a random byte buffer which is 8 bytes less than the minimum compress buffer size limit + // this will trigger the BufferOverflowException bug in writing the gzip trailer + // it will also trigger another bug in finishing the gzip compression stream when the compress buffer is full + // which results in EOFException + Random random = new Random(); + byte[] inputBytes = new byte[8192 - 8]; + random.nextBytes(inputBytes); + ByteBuf byteBuf = Unpooled.wrappedBuffer(inputBytes); + + PrometheusMetricsGenerator generator = + new PrometheusMetricsGenerator(pulsar, false, false, false, false, Clock.systemUTC()) { + // override the generateMetrics method to return the random byte buffer for gzip compression + // instead of the actual metrics + @Override + protected ByteBuf generateMetrics(List metricsProviders) { + return byteBuf; + } + }; + + PrometheusMetricsGenerator.MetricsBuffer metricsBuffer = + generator.renderToBuffer(MoreExecutors.directExecutor(), Collections.emptyList()); + try { + PrometheusMetricsGenerator.ResponseBuffer responseBuffer = metricsBuffer.getBufferFuture().get(); + + ByteBuf compressed = responseBuffer.getCompressedBuffer(MoreExecutors.directExecutor()).get(); + byte[] compressedBytes = new byte[compressed.readableBytes()]; + compressed.readBytes(compressedBytes); + try (GZIPInputStream gzipInputStream = new GZIPInputStream(new ByteArrayInputStream(compressedBytes))) { + byte[] uncompressedBytes = IOUtils.toByteArray(gzipInputStream); + assertEquals(uncompressedBytes, inputBytes); + } + } finally { + metricsBuffer.release(); + } + } +} \ No newline at end of file From ad79ddfe107cd56a2d53be0b6f03a7f9f1ab9916 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 08:59:19 +0300 Subject: [PATCH 3/9] Fix max components now that gzip trailer is a separate buffer --- .../broker/stats/prometheus/PrometheusMetricsGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 7e6399c3d4e61..0a9499aa896b4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -192,7 +192,7 @@ private static class GzipByteBufferWriter { crc = new CRC32(); this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; - this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 1); + this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); allocateBuffer(); } From ce32fbda46bdba07b41b5341d6eae0c24fa9f838 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 08:59:52 +0300 Subject: [PATCH 4/9] Fix another issue with GZIP compression when compress buffer is full at finishing --- .../PrometheusMetricsGenerator.java | 54 +++++++++++++------ 1 file changed, 37 insertions(+), 17 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 0a9499aa896b4..344b0eee7e13f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -218,38 +218,58 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is // write gzip header compressBuffer.put(GZIP_HEADER); } + // update the CRC32 checksum calculation nioBuffer.mark(); crc.update(nioBuffer); nioBuffer.reset(); + // pass the input buffer to the deflater deflater.setInput(nioBuffer); + // when the input buffer is the last one, set the flag to finish the deflater if (isLast) { deflater.finish(); } - while (!deflater.needsInput() && !deflater.finished()) { - int written = deflater.deflate(compressBuffer); - if (written == 0 && !compressBuffer.hasRemaining()) { - backingCompressBuffer.setIndex(0, compressBuffer.position()); - resultBuffer.addComponent(true, backingCompressBuffer); - allocateBuffer(); + int written = -1; + // the deflater may need multiple calls to deflate the input buffer + // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer + // for the last buffer, the completion is checked by the deflater.finished() method + while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { + // when the previous deflater.deflate() call returns 0, it means the output buffer is full + // append the compressed buffer to the result buffer and allocate a new buffer + if (written == 0) { + if (compressBuffer.position() > 0) { + backingCompressBuffer.setIndex(0, compressBuffer.position()); + resultBuffer.addComponent(true, backingCompressBuffer); + allocateBuffer(); + } else { + // this is an unexpected case, throw an exception to prevent infinite loop + throw new IllegalStateException( + "Deflater didn't write any bytes while the compress buffer is empty."); + } } + written = deflater.deflate(compressBuffer); } if (isLast) { - // append the last compressed buffer - backingCompressBuffer.setIndex(0, compressBuffer.position()); - resultBuffer.addComponent(true, backingCompressBuffer); + // append the last compressed buffer when it is not empty + if (compressBuffer.position() > 0) { + backingCompressBuffer.setIndex(0, compressBuffer.position()); + resultBuffer.addComponent(true, backingCompressBuffer); + } else { + // release unused empty buffer + backingCompressBuffer.release(); + } backingCompressBuffer = null; compressBuffer = null; - // write gzip trailer - ByteBuffer trailer = ByteBuffer.allocate(8); - // write gzip trailer, integer values are in little endian byte order - trailer.order(ByteOrder.LITTLE_ENDIAN); + // write gzip trailer, 2 integers (CRC32 checksum and uncompressed size) + ByteBuffer trailerBuf = ByteBuffer.allocate(2 * Integer.BYTES); + // integer values are in little endian byte order + trailerBuf.order(ByteOrder.LITTLE_ENDIAN); // write CRC32 checksum - trailer.putInt((int) crc.getValue()); + trailerBuf.putInt((int) crc.getValue()); // write uncompressed size - trailer.putInt(deflater.getTotalIn()); - trailer.flip(); - resultBuffer.addComponent(true, Unpooled.wrappedBuffer(trailer)); + trailerBuf.putInt(deflater.getTotalIn()); + trailerBuf.flip(); + resultBuffer.addComponent(true, Unpooled.wrappedBuffer(trailerBuf)); } } From 3f14ddbd96829d9acc149402b64baa113c784698 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 09:54:00 +0300 Subject: [PATCH 5/9] Improve comments --- .../stats/prometheus/PrometheusMetricsGenerator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 344b0eee7e13f..c699ed6443d42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -233,15 +233,15 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer // for the last buffer, the completion is checked by the deflater.finished() method while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { - // when the previous deflater.deflate() call returns 0, it means the output buffer is full - // append the compressed buffer to the result buffer and allocate a new buffer + // when the previous deflater.deflate() call returns 0, it means that the output buffer is full. + // append the compressed buffer to the result buffer and allocate a new buffer. if (written == 0) { if (compressBuffer.position() > 0) { backingCompressBuffer.setIndex(0, compressBuffer.position()); resultBuffer.addComponent(true, backingCompressBuffer); allocateBuffer(); } else { - // this is an unexpected case, throw an exception to prevent infinite loop + // this is an unexpected case, throw an exception to prevent an infinite loop throw new IllegalStateException( "Deflater didn't write any bytes while the compress buffer is empty."); } @@ -254,7 +254,7 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is backingCompressBuffer.setIndex(0, compressBuffer.position()); resultBuffer.addComponent(true, backingCompressBuffer); } else { - // release unused empty buffer + // release an unused empty buffer backingCompressBuffer.release(); } backingCompressBuffer = null; From 230a5bebf8453c4d0bda7362dad2b4e5efbb6ed8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 09:58:35 +0300 Subject: [PATCH 6/9] Make comment more accurate --- .../broker/stats/prometheus/PrometheusMetricsGenerator.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index c699ed6443d42..6d633b4862d46 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -233,7 +233,8 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer // for the last buffer, the completion is checked by the deflater.finished() method while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { - // when the previous deflater.deflate() call returns 0, it means that the output buffer is full. + // when the previous deflater.deflate() call returns 0 (and needsInput/finished returns false), + // it means that the output buffer is full. // append the compressed buffer to the result buffer and allocate a new buffer. if (written == 0) { if (compressBuffer.position() > 0) { From d22853faf7f2c27fa77ba7f352a0611e349967c0 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 10:00:00 +0300 Subject: [PATCH 7/9] Polish comment --- .../broker/stats/prometheus/PrometheusMetricsGenerator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 6d633b4862d46..01ec684ce94cd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -233,7 +233,7 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is // the completion is checked by the deflater.needsInput() method for buffers that aren't the last buffer // for the last buffer, the completion is checked by the deflater.finished() method while (!isLast && !deflater.needsInput() || isLast && !deflater.finished()) { - // when the previous deflater.deflate() call returns 0 (and needsInput/finished returns false), + // when the previous deflater.deflate call returns 0 (and needsInput/finished returns false), // it means that the output buffer is full. // append the compressed buffer to the result buffer and allocate a new buffer. if (written == 0) { From 90f6612c3a0144e4cc98390d5783a6f6be8a09f1 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 10:13:46 +0300 Subject: [PATCH 8/9] Polish --- .../prometheus/PrometheusMetricsGenerator.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 01ec684ce94cd..7b5b3c870e215 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -193,7 +193,7 @@ private static class GzipByteBufferWriter { this.bufferSize = Math.max(Math.min(resolveChunkSize(bufAllocator), readableBytes), 8192); this.bufAllocator = bufAllocator; this.resultBuffer = bufAllocator.compositeDirectBuffer(readableBytes / bufferSize + 2); - allocateBuffer(); + allocateCompressBuffer(); } /** @@ -238,9 +238,8 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is // append the compressed buffer to the result buffer and allocate a new buffer. if (written == 0) { if (compressBuffer.position() > 0) { - backingCompressBuffer.setIndex(0, compressBuffer.position()); - resultBuffer.addComponent(true, backingCompressBuffer); - allocateBuffer(); + appendCompressBufferToResultBuffer(); + allocateCompressBuffer(); } else { // this is an unexpected case, throw an exception to prevent an infinite loop throw new IllegalStateException( @@ -252,8 +251,7 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is if (isLast) { // append the last compressed buffer when it is not empty if (compressBuffer.position() > 0) { - backingCompressBuffer.setIndex(0, compressBuffer.position()); - resultBuffer.addComponent(true, backingCompressBuffer); + appendCompressBufferToResultBuffer(); } else { // release an unused empty buffer backingCompressBuffer.release(); @@ -274,7 +272,12 @@ private void compressAndAppend(ByteBuffer nioBuffer, boolean isFirst, boolean is } } - private void allocateBuffer() { + private void appendCompressBufferToResultBuffer() { + backingCompressBuffer.setIndex(0, compressBuffer.position()); + resultBuffer.addComponent(true, backingCompressBuffer); + } + + private void allocateCompressBuffer() { backingCompressBuffer = bufAllocator.directBuffer(bufferSize); compressBuffer = backingCompressBuffer.nioBuffer(0, bufferSize); } From 4bd5888f32bd96a0cf1ed36f2d3acf28f16212c4 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Thu, 25 Apr 2024 10:46:34 +0300 Subject: [PATCH 9/9] Fix buffer preallocation --- .../broker/stats/prometheus/PrometheusMetricsGenerator.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java index 7b5b3c870e215..6b4d08c359d42 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsGenerator.java @@ -372,7 +372,9 @@ private ByteBuf allocateMultipartCompositeDirectBuffer() { int totalLen = 0; while (totalLen < initialBufferSize) { totalLen += chunkSize; - buf.addComponent(false, byteBufAllocator.directBuffer(chunkSize)); + // increase the capacity in increments of chunkSize to preallocate the buffers + // in the composite buffer + buf.capacity(totalLen); } return buf; }