-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[fix] [broker] fix write all compacted out entry into compacted topic #21917
[fix] [broker] fix write all compacted out entry into compacted topic #21917
Conversation
PTAL, thanks. @coderzc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to add a test to reproduce this issue?
@thetumbled @codelipenghui The following test can reproduce the problem, and this PR can fix it. @Test
public void testAllCompactedOut() throws Exception {
String topicName = "persistent://my-property/use/my-ns/testAllCompactedOut";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
producer.newMessage().key("K1").value("V1").sendAsync();
producer.newMessage().key("K2").value("V2").sendAsync();
producer.newMessage().key("K2").value(null).sendAsync();
producer.flush();
admin.topics().triggerCompaction(topicName);
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});
producer.newMessage().key("K1").value(null).sendAsync();
producer.flush();
admin.topics().triggerCompaction(topicName);
Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topicName).status,
LongRunningProcessStatus.Status.SUCCESS);
});
@Cleanup
Reader<String> reader = pulsarClient.newReader(Schema.STRING)
.subscriptionName("reader-test")
.topic(topicName)
.readCompacted(true)
.startMessageId(MessageId.earliest)
.create();
Assert.assertEquals(reader.getLastMessageIds().get(0), MessageIdImpl.earliest);
while (reader.hasMessageAvailable()) {
Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
Assert.assertNotNull(message);
}
} |
pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
Outdated
Show resolved
Hide resolved
Need to add this test to this patch |
Great job, but there is something wrong with the test code. We need to set the config |
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
Show resolved
Hide resolved
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #21917 +/- ##
=========================================
Coverage 73.57% 73.58%
- Complexity 32375 32395 +20
=========================================
Files 1861 1861
Lines 138568 138591 +23
Branches 15185 15185
=========================================
+ Hits 101958 101978 +20
- Misses 28704 28730 +26
+ Partials 7906 7883 -23
Flags with carried forward coverage won't be shown. Click here to find out more.
|
…apache#21917) (cherry picked from commit e7f1d03)
…apache#21917) (cherry picked from commit e7f1d03)
Fixes #21916
Motivation
The method
reader.hasMessageAvailable()
return true, butreader.readNext
can't return any messages because the messages in compacted topic are all compacted out.Modifications
Check if the message without partition key is a valid message.
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: thetumbled#33