From c616deb1dc580d564a3f4694f23c6be96ccfe316 Mon Sep 17 00:00:00 2001 From: coderzc Date: Mon, 13 Nov 2023 18:07:55 +0800 Subject: [PATCH] Revert "[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465)" This reverts commit 80f921a45bb023fca36faf98038f3ec687e05f16. --- ...sistentDispatcherSingleActiveConsumer.java | 6 +- .../pulsar/compaction/CompactedTopic.java | 5 +- .../pulsar/compaction/CompactedTopicImpl.java | 3 +- .../compaction/CompactedTopicUtils.java | 10 ++-- .../broker/transaction/TransactionTest.java | 55 ------------------- .../compaction/CompactedTopicUtilsTest.java | 4 +- 6 files changed, 12 insertions(+), 71 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java index 5e9183df0b1df..d96429693fda8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java @@ -55,7 +55,6 @@ import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; import org.apache.pulsar.common.util.Codec; import org.apache.pulsar.compaction.CompactedTopicUtils; -import org.apache.pulsar.compaction.TopicCompactionService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -351,9 +350,8 @@ protected void readMoreEntries(Consumer consumer) { havePendingRead = true; if (consumer.readCompacted()) { boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId()); - TopicCompactionService topicCompactionService = topic.getTopicCompactionService(); - CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead, - bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer); + CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor, + messagesToRead, bytesToRead, readFromEarliest, this, true, consumer); } else { ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch()); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java index 146ba4327d252..8c17e0f3ca34d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java @@ -24,7 +24,6 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.Position; -import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.pulsar.broker.service.Consumer; public interface CompactedTopic { @@ -35,14 +34,12 @@ public interface CompactedTopic { * Read entries from compacted topic. * * @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor, - * int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)} - * instead. + * int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead. */ @Deprecated void asyncReadEntriesOrWait(ManagedCursor cursor, int maxEntries, long bytesToRead, - PositionImpl maxReadPosition, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java index 8794e2736d4d4..b028b708c49e4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java @@ -96,7 +96,6 @@ public CompletableFuture deleteCompactedLedger(long compactedLedgerId) { public void asyncReadEntriesOrWait(ManagedCursor cursor, int maxEntries, long bytesToRead, - PositionImpl maxReadPosition, boolean isFirstRead, ReadEntriesCallback callback, Consumer consumer) { PositionImpl cursorPosition; @@ -113,7 +112,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor, if (currentCompactionHorizon == null || currentCompactionHorizon.compareTo(cursorPosition) < 0) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); + cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST); } else { ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor; int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java index d3464d402e9c6..66bcf4c3002bd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicUtils.java @@ -42,8 +42,8 @@ public class CompactedTopicUtils { @Beta public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService, ManagedCursor cursor, int maxEntries, - long bytesToRead, PositionImpl maxReadPosition, - boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback, + long bytesToRead, boolean readFromEarliest, + AsyncCallbacks.ReadEntriesCallback callback, boolean wait, @Nullable Consumer consumer) { Objects.requireNonNull(topicCompactionService); Objects.requireNonNull(cursor); @@ -68,9 +68,11 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact || readPosition.compareTo( lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) { if (wait) { - cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); + cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, + PositionImpl.LATEST); } else { - cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition); + cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, + PositionImpl.LATEST); } return CompletableFuture.completedFuture(null); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index e4cc33de14bf1..cf389824794e5 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -1783,59 +1783,4 @@ private void getTopic(String topicName) { }); } - @Test - public void testReadCommittedWithReadCompacted() throws Exception{ - final String namespace = "tnx/ns-prechecks"; - final String topic = "persistent://" + namespace + "/test_transaction_topic"; - admin.namespaces().createNamespace(namespace); - admin.topics().createNonPartitionedTopic(topic); - - admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024); - - @Cleanup - Consumer consumer = this.pulsarClient.newConsumer(Schema.STRING) - .topic(topic) - .subscriptionName("sub") - .subscriptionType(SubscriptionType.Exclusive) - .readCompacted(true) - .subscribe(); - - @Cleanup - Producer producer = this.pulsarClient.newProducer(Schema.STRING) - .topic(topic) - .create(); - - producer.newMessage().key("K1").value("V1").send(); - - Transaction txn = pulsarClient.newTransaction() - .withTransactionTimeout(1, TimeUnit.MINUTES).build().get(); - producer.newMessage(txn).key("K2").value("V2").send(); - producer.newMessage(txn).key("K3").value("V3").send(); - - List messages = new ArrayList<>(); - while (true) { - Message message = consumer.receive(5, TimeUnit.SECONDS); - if (message == null) { - break; - } - messages.add(message.getValue()); - } - - Assert.assertEquals(messages, List.of("V1")); - - txn.commit(); - - messages.clear(); - - while (true) { - Message message = consumer.receive(5, TimeUnit.SECONDS); - if (message == null) { - break; - } - messages.add(message.getValue()); - } - - Assert.assertEquals(messages, List.of("V2", "V3")); - } - } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java index 2545c0362e82a..94f2a17a2a3f4 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicUtilsTest.java @@ -69,8 +69,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { } }; - CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, - PositionImpl.LATEST, false, readEntriesCallback, false, null); + CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false, + readEntriesCallback, false, null); List entries = completableFuture.get(); Assert.assertTrue(entries.isEmpty());