diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java index 8dfc0af8e1d93..5cbbcc4429883 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerStatsRecorderImpl.java @@ -146,15 +146,16 @@ private void init(ConsumerConfigurationData conf) { receivedMsgsRate = currentNumMsgsReceived / elapsed; receivedBytesRate = currentNumBytesReceived / elapsed; + int prefetchQueueSize = consumerImpl.incomingMessages.size(); if ((currentNumMsgsReceived | currentNumBytesReceived | currentNumReceiveFailed | currentNumAcksSent - | currentNumAcksFailed) != 0) { + | currentNumAcksFailed | prefetchQueueSize) != 0) { log.info( "[{}] [{}] [{}] Prefetched messages: {} --- " + "Consume throughput received: {} msgs/s --- {} Mbit/s --- " + "Ack sent rate: {} ack/s --- " + "Failed messages: {} --- batch messages: {} ---" + "Failed acks: {}", consumerImpl.getTopic(), consumerImpl.getSubscription(), consumerImpl.consumerName, - consumerImpl.incomingMessages.size(), THROUGHPUT_FORMAT.format(receivedMsgsRate), + prefetchQueueSize, THROUGHPUT_FORMAT.format(receivedMsgsRate), THROUGHPUT_FORMAT.format(receivedBytesRate * 8 / 1024 / 1024), THROUGHPUT_FORMAT.format(currentNumAcksSent / elapsed), currentNumReceiveFailed, currentNumBatchReceiveFailed, currentNumAcksFailed);