diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 9cb82fde04118..cd598585c8e87 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -30,6 +30,7 @@ import java.util.Map; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.Data; @@ -45,6 +46,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import org.testng.collections.Lists; @Test(groups = "broker-api") public class RetryTopicTest extends ProducerConsumerBase { @@ -713,4 +715,70 @@ public void testRetryProducerWillCloseByConsumer() throws Exception { admin.topics().delete(topicDLQ, false); } + + @Test(timeOut = 30000L) + public void testRetryTopicExceptionWithConcurrent() throws Exception { + final String topic = "persistent://my-property/my-ns/retry-topic"; + final int maxRedeliveryCount = 2; + final int sendMessages = 10; + // subscribe before publish + Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) + .topic(topic) + .subscriptionName("my-subscription") + .subscriptionType(SubscriptionType.Shared) + .enableRetry(true) + .receiverQueueSize(100) + .deadLetterPolicy(DeadLetterPolicy.builder() + .maxRedeliverCount(maxRedeliveryCount) + .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .build()) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + Producer producer = pulsarClient.newProducer(Schema.BYTES) + .topic(topic) + .create(); + for (int i = 0; i < sendMessages; i++) { + producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send(); + } + producer.close(); + + // mock a retry producer exception when reconsumelater is called + MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; + List> consumers = multiTopicsConsumer.getConsumers(); + for (ConsumerImpl c : consumers) { + Set deadLetterPolicyField = + ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); + + if (deadLetterPolicyField.size() != 0) { + Field field = deadLetterPolicyField.iterator().next(); + field.setAccessible(true); + DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); + deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#"); + } + } + + List> messages = Lists.newArrayList(); + for (int i = 0; i < sendMessages; i++) { + messages.add(consumer.receive()); + } + + // mock call the reconsumeLater method concurrently + CountDownLatch latch = new CountDownLatch(messages.size()); + for (Message message : messages) { + new Thread(() -> { + try { + consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); + } catch (Exception ignore) { + + } finally { + latch.countDown(); + } + }).start(); + } + + latch.await(); + consumer.close(); + } + } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index a3f209fcd9ed6..fdc966f4f2e02 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -708,6 +708,8 @@ protected CompletableFuture doReconsumeLater(Message message, AckType a } catch (Exception e) { result.completeExceptionally(e); } + } else { + result.completeExceptionally(new PulsarClientException("Retry letter producer is null.")); } MessageId finalMessageId = messageId; result.exceptionally(ex -> {