-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[fix][broker] Avoid compaction task stuck when the last message to compact is a marker #21718
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -45,6 +45,7 @@ | |
import org.apache.pulsar.client.impl.RawBatchConverter; | ||
import org.apache.pulsar.common.api.proto.MessageMetadata; | ||
import org.apache.pulsar.common.protocol.Commands; | ||
import org.apache.pulsar.common.protocol.Markers; | ||
import org.apache.pulsar.common.util.FutureUtil; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
@@ -130,7 +131,10 @@ private void phaseOneLoop(RawReader reader, | |
boolean replaceMessage = false; | ||
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); | ||
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); | ||
if (RawBatchConverter.isReadableBatch(metadata)) { | ||
if (Markers.isTxnMarker(metadata)) { | ||
mxBean.addCompactionRemovedEvent(reader.getTopic()); | ||
deletedMessage = true; | ||
} else if (RawBatchConverter.isReadableBatch(metadata)) { | ||
try { | ||
int numMessagesInBatch = metadata.getNumMessagesInBatch(); | ||
int deleteCnt = 0; | ||
|
@@ -262,7 +266,10 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId> | |
MessageId id = m.getMessageId(); | ||
Optional<RawMessage> messageToAdd = Optional.empty(); | ||
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes()); | ||
if (RawBatchConverter.isReadableBatch(m)) { | ||
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload()); | ||
if (Markers.isTxnMarker(metadata)) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why only check the txn marker here? Should we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, I will fix it. |
||
messageToAdd = Optional.empty(); | ||
} else if (RawBatchConverter.isReadableBatch(metadata)) { | ||
try { | ||
messageToAdd = rebatchMessage(reader.getTopic(), | ||
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey); | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -106,6 +106,7 @@ | |
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; | ||
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; | ||
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl; | ||
import org.apache.pulsar.client.admin.LongRunningProcessStatus; | ||
import org.apache.pulsar.client.admin.PulsarAdminException; | ||
import org.apache.pulsar.client.api.Consumer; | ||
import org.apache.pulsar.client.api.Message; | ||
|
@@ -1849,4 +1850,62 @@ public void testReadCommittedWithReadCompacted() throws Exception{ | |
Assert.assertEquals(messages, List.of("V2", "V3")); | ||
} | ||
|
||
|
||
@Test | ||
public void testReadCommittedWithCompaction() throws Exception{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we add a test to send a replicated subscription snapshot marker and test the compaction works? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I add test |
||
final String namespace = "tnx/ns-prechecks"; | ||
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID(); | ||
admin.namespaces().createNamespace(namespace); | ||
admin.topics().createNonPartitionedTopic(topic); | ||
|
||
admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024); | ||
|
||
@Cleanup | ||
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING) | ||
.topic(topic) | ||
.create(); | ||
|
||
producer.newMessage().key("K1").value("V1").send(); | ||
|
||
Transaction txn = pulsarClient.newTransaction() | ||
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); | ||
producer.newMessage(txn).key("K2").value("V2").send(); | ||
producer.newMessage(txn).key("K3").value("V3").send(); | ||
txn.commit().get(); | ||
|
||
producer.newMessage().key("K1").value("V4").send(); | ||
|
||
Transaction txn2 = pulsarClient.newTransaction() | ||
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); | ||
producer.newMessage(txn2).key("K2").value("V5").send(); | ||
producer.newMessage(txn2).key("K3").value("V6").send(); | ||
txn2.commit().get(); | ||
|
||
admin.topics().triggerCompaction(topic); | ||
|
||
Awaitility.await().untilAsserted(() -> { | ||
assertEquals(admin.topics().compactionStatus(topic).status, | ||
LongRunningProcessStatus.Status.SUCCESS); | ||
}); | ||
|
||
@Cleanup | ||
Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING) | ||
.topic(topic) | ||
.subscriptionName("sub") | ||
.subscriptionType(SubscriptionType.Exclusive) | ||
.readCompacted(true) | ||
.subscribe(); | ||
List<String> result = new ArrayList<>(); | ||
while (true) { | ||
Message<String> receive = consumer.receive(2, TimeUnit.SECONDS); | ||
if (receive == null) { | ||
break; | ||
} | ||
|
||
result.add(receive.getValue()); | ||
} | ||
|
||
Assert.assertEquals(result, List.of("V4", "V5", "V6")); | ||
} | ||
coderzc marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should pass all the markers and filter them at the client side when doing compaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.