Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Flaky-test: SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription #23485

Closed
1 of 2 tasks
lhotari opened this issue Oct 18, 2024 · 2 comments · Fixed by #23546
Closed
1 of 2 tasks

Comments

@lhotari
Copy link
Member

lhotari commented Oct 18, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Example failure

https://github.com/apache/pulsar/actions/runs/11407423480/job/31743771210?pr=23484#step:11:1686

Exception stacktrace

  Error:  Tests run: 294, Failures: 1, Errors: 0, Skipped: 231, Time elapsed: 352.306 s <<< FAILURE! - in org.apache.pulsar.client.api.SimpleProducerConsumerTest
  Error:  org.apache.pulsar.client.api.SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription  Time elapsed: 10.489 s  <<< FAILURE!
  java.lang.AssertionError: expected [30] but found [29]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertEqualsImpl(Assert.java:149)
  	at org.testng.Assert.assertEquals(Assert.java:131)
  	at org.testng.Assert.assertEquals(Assert.java:1418)
  	at org.testng.Assert.assertEquals(Assert.java:1382)
  	at org.testng.Assert.assertEquals(Assert.java:1428)
  	at org.apache.pulsar.client.api.SimpleProducerConsumerTest.testMultiTopicsConsumerImplPauseForManualSubscription(SimpleProducerConsumerTest.java:3580)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@pdolif
Copy link
Contributor

pdolif commented Oct 19, 2024

Hi @lhotari
I would like to share my findings about the test case. It tests the pausing of a multi-topic consumer by doing the following:

  • There are 3 topics.
  • Step 1: Produce 5 messages per topic. Create a multi-topic consumer for topic 1 and 2 (but not topic 3) with receiverQueueSize of 1. Receive 8 messages. Here it looks to me like 4 messages of topic 1 and 4 messages of topic 2 should be consumed? In successful test runs this is the case. Then one message remains in the queue of each of the two (internal) consumers. However, in unsuccessful test runs, for me, it was always the case that 5 messages of one topic, and 3 messages of the other topic were received.
  • Step 2: Pause the multi-topic consumer.
  • Step 3: Add topic 3 to the subscription of the multi-topic consumer. Now there are 3 internal consumers, one per topic.
  • Step 4: Produce 5 more messages per topic.
  • Step 5: Here the receiver queues of consumer 1 and 2 should be cleared by receiving 2 messages. In successful test runs this is the case. In failing test runs 2 messages of the same topic are received.
  • Step 6: It is expected, that both consumer queues are empty, and since the multi-consumer is paused, no message should be consumed. A problem might be that the following code is used:
// 6. should not consume any messages
Awaitility.await().untilAsserted(() -> assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS)));

instead of:

// 6. should not consume any messages
assertNull(consumer.receive(RECEIVE_TIMEOUT_SECONDS, TimeUnit.SECONDS));

With the current test code, it is possible that a message is received in the first call of the await, but no message is received in the next call, and then assertNull succeeds. Then the test does not fail even though a message is received. This is why, in the end, the counter of received messages is 29 instead of 30, because one message is received in step 6.

  • Step 7: Resume multi-topic consumer.
  • Step 8: Consume the remaining messages.

It seems, that the multi-topic consumer is automatically resumed between steps 2 and 7 in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded().

@lhotari
Copy link
Member Author

lhotari commented Oct 21, 2024

I would like to share my findings about the test case. It tests the pausing of a multi-topic consumer by doing the following

Very good analysis @pdolif !

One detail that caught my eye in the test is step 3:

// 3. manually add the third consumer
((MultiTopicsConsumerImpl)consumer).subscribeAsync(topicNameBase + "3", true).join();

That violates principles of testing where tests should test the public API and externally observable behaviors. There are a lot of such tests in the Pulsar code base and we should also move towards a direction where this wouldn't carried forward.

It seems, that the multi-topic consumer is automatically resumed between steps 2 and 7 in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded().

It might be different. the pausing logic seems to be different than what is tested. Calling .pause() will stop sending flow permits to broker and the pausing referenced in MultiTopicsConsumerImpl.resumeReceivingFromPausedConsumersIfNeeded() is a different matter.

# for free to join this conversation on GitHub. Already have an account? # to comment