Skip to content

Commit

Permalink
Avoid compaction task stuck when the last message is marker
Browse files Browse the repository at this point in the history
  • Loading branch information
coderzc committed Dec 13, 2023
1 parent 50007c3 commit b6b3ea7
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.compaction.Compactor;
import org.checkerframework.checker.nullness.qual.Nullable;

@Slf4j
Expand Down Expand Up @@ -174,13 +175,15 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
if (msgMetadata != null && msgMetadata.hasTxnidMostBits()
&& msgMetadata.hasTxnidLeastBits()) {
if (Markers.isTxnMarker(msgMetadata)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
if (cursor == null || !cursor.getName().equals(Compactor.COMPACTION_SUBSCRIPTION)) {
// because consumer can receive message is smaller than maxReadPosition,
// so this marker is useless for this subscription
individualAcknowledgeMessageIfNeeded(Collections.singletonList(entry.getPosition()),
Collections.emptyMap());
entries.set(i, null);
entry.release();
continue;
}
} else if (((PersistentTopic) subscription.getTopic())
.isTxnAborted(new TxnID(msgMetadata.getTxnidMostBits(), msgMetadata.getTxnidLeastBits()),
(PositionImpl) entry.getPosition())) {
Expand All @@ -192,7 +195,7 @@ public int filterEntriesForConsumer(@Nullable MessageMetadata[] metadataArray, i
}
}

if (msgMetadata == null || Markers.isServerOnlyMarker(msgMetadata)) {
if (msgMetadata == null || (Markers.isServerOnlyMarker(msgMetadata) && !Markers.isTxnMarker(msgMetadata))) {
PositionImpl pos = (PositionImpl) entry.getPosition();
// Message metadata was corrupted or the messages was a server-only marker

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.pulsar.client.impl.RawBatchConverter;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -130,7 +131,10 @@ private void phaseOneLoop(RawReader reader,
boolean replaceMessage = false;
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (RawBatchConverter.isReadableBatch(metadata)) {
if (Markers.isTxnMarker(metadata)) {
mxBean.addCompactionRemovedEvent(reader.getTopic());
deletedMessage = true;
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
Expand Down Expand Up @@ -262,7 +266,10 @@ private void phaseTwoLoop(RawReader reader, MessageId to, Map<String, MessageId>
MessageId id = m.getMessageId();
Optional<RawMessage> messageToAdd = Optional.empty();
mxBean.addCompactionReadOp(reader.getTopic(), m.getHeadersAndPayload().readableBytes());
if (RawBatchConverter.isReadableBatch(m)) {
MessageMetadata metadata = Commands.parseMessageMetadata(m.getHeadersAndPayload());
if (Markers.isTxnMarker(metadata)) {
messageToAdd = Optional.empty();
} else if (RawBatchConverter.isReadableBatch(metadata)) {
try {
messageToAdd = rebatchMessage(reader.getTopic(),
m, (key, subid) -> subid.equals(latestForKey.get(key)), topicCompactionRetainNullKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider;
import org.apache.pulsar.broker.transaction.pendingack.impl.PendingAckHandleImpl;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -1849,4 +1850,35 @@ public void testReadCommittedWithReadCompacted() throws Exception{
Assert.assertEquals(messages, List.of("V2", "V3"));
}


@Test
public void testReadCommittedWithCompaction() throws Exception{
final String namespace = "tnx/ns-prechecks";
final String topic = "persistent://" + namespace + "/test_transaction_topic" + UUID.randomUUID();
admin.namespaces().createNamespace(namespace);
admin.topics().createNonPartitionedTopic(topic);

admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);

@Cleanup
Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
.topic(topic)
.create();

producer.newMessage().key("K1").value("V1").send();

Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
producer.newMessage(txn).key("K2").value("V2").send();
producer.newMessage(txn).key("K3").value("V3").send();
txn.commit();

admin.topics().triggerCompaction(topic);

Awaitility.await().untilAsserted(() -> {
assertEquals(admin.topics().compactionStatus(topic).status,
LongRunningProcessStatus.Status.SUCCESS);
});
}

}

0 comments on commit b6b3ea7

Please # to comment.