diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java index afb17a186477c..4fac35aa78f1e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/InterceptorsTest.java @@ -475,6 +475,84 @@ public void onAckTimeoutSend(Consumer consumer, Set messageId consumer.close(); } + @Test(dataProvider = "topicPartition") + public void testDoNotEarlierHitBeforeConsumerWithMessageListener(int partitions) throws Exception { + + AtomicInteger beforeConsumeCount = new AtomicInteger(0); + PulsarClient client = PulsarClient.builder() + .serviceUrl(lookupUrl.toString()) + .listenerThreads(1) + .build(); + + ConsumerInterceptor interceptor = new ConsumerInterceptor<>() { + @Override + public void close() { + } + + @Override + public Message beforeConsume(Consumer consumer, Message message) { + beforeConsumeCount.incrementAndGet(); + log.info("beforeConsume messageId: {}", message.getMessageId()); + return message; + } + + @Override + public void onAcknowledge(Consumer consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onAcknowledgeCumulative(Consumer consumer, MessageId messageId, Throwable cause) { + } + + @Override + public void onNegativeAcksSend(Consumer consumer, Set messageIds) { + } + + @Override + public void onAckTimeoutSend(Consumer consumer, Set messageIds) { + } + }; + + final String topicName = "persistent://my-property/my-ns/my-topic"; + + if (partitions > 0) { + admin.topics().createPartitionedTopic(topicName, partitions); + } else { + admin.topics().createNonPartitionedTopic(topicName); + } + + Consumer consumer = client.newConsumer(Schema.STRING) + .topic(topicName) + .subscriptionType(SubscriptionType.Shared) + .intercept(interceptor) + .subscriptionName("my-subscription") + .messageListener((c, m) -> { + // Simulate a long processing time + try { + Thread.sleep(60000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + }) + .subscribe(); + + Producer producer = client.newProducer(Schema.STRING) + .topic("persistent://my-property/my-ns/my-topic") + .create(); + + final int messages = 10; + for (int i = 0; i < messages; i++) { + producer.newMessage().value("Hello Pulsar!").send(); + } + Awaitility.await().untilAsserted(() -> { + // Ensure that the interceptor is not hit before the message listener + Assert.assertEquals(beforeConsumeCount.get(), 1); + }); + producer.close(); + consumer.close(); + client.close(); + } + @Test public void testConsumerInterceptorWithPatternTopicSubscribe() throws PulsarClientException { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 74abb82bfe809..12f0d75c3a416 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -1166,6 +1166,7 @@ protected void callMessageListener(Message msg) { id = msg.getMessageId(); } unAckedMessageTracker.add(id, msg.getRedeliveryCount()); + beforeConsume(msg); listener.received(ConsumerBase.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index b47b3f833084d..c05fd021a990b 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -545,7 +545,8 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC return null; } messageProcessed(message); - return beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; + return message; } catch (InterruptedException e) { ExceptionHandler.handleInterruptedException(e); State state = getState(); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bf8bd6cc95117..4270e0036afd4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -396,7 +396,7 @@ protected Message internalReceive(long timeout, TimeUnit unit) throws PulsarC decreaseIncomingMessageSize(message); checkArgument(message instanceof TopicMessageImpl); trackUnAckedMsgIfNoListener(message.getMessageId(), message.getRedeliveryCount()); - message = beforeConsume(message); + message = listener == null ? beforeConsume(message) : message; } resumeReceivingFromPausedConsumersIfNeeded(); return message;