Skip to content

Commit

Permalink
[fix][broker][branch-3.1] Fix issue with consumer read uncommitted me…
Browse files Browse the repository at this point in the history
…ssages from compacted topic (apache#21465) (apache#21571)
  • Loading branch information
coderzc authored and srinath-ctds committed Dec 20, 2023
1 parent 1ee83ff commit b26a52f
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.compaction.CompactedTopicUtils;
import org.apache.pulsar.compaction.TopicCompactionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -350,8 +351,9 @@ protected void readMoreEntries(Consumer consumer) {
havePendingRead = true;
if (consumer.readCompacted()) {
boolean readFromEarliest = isFirstRead && MessageId.earliest.equals(consumer.getStartMessageId());
CompactedTopicUtils.asyncReadCompactedEntries(topic.getTopicCompactionService(), cursor,
messagesToRead, bytesToRead, readFromEarliest, this, true, consumer);
TopicCompactionService topicCompactionService = topic.getTopicCompactionService();
CompactedTopicUtils.asyncReadCompactedEntries(topicCompactionService, cursor, messagesToRead,
bytesToRead, topic.getMaxReadPosition(), readFromEarliest, this, true, consumer);
} else {
ReadEntriesCtx readEntriesCtx =
ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.service.Consumer;

public interface CompactedTopic {
Expand All @@ -34,12 +35,14 @@ public interface CompactedTopic {
* Read entries from compacted topic.
*
* @deprecated Use {@link CompactedTopicUtils#asyncReadCompactedEntries(TopicCompactionService, ManagedCursor,
* int, long, boolean, ReadEntriesCallback, boolean, Consumer)} instead.
* int, long, org.apache.bookkeeper.mledger.impl.PositionImpl, boolean, ReadEntriesCallback, boolean, Consumer)}
* instead.
*/
@Deprecated
void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback,
Consumer consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public CompletableFuture<Void> deleteCompactedLedger(long compactedLedgerId) {
public void asyncReadEntriesOrWait(ManagedCursor cursor,
int maxEntries,
long bytesToRead,
PositionImpl maxReadPosition,
boolean isFirstRead,
ReadEntriesCallback callback, Consumer consumer) {
PositionImpl cursorPosition;
Expand All @@ -112,7 +113,7 @@ public void asyncReadEntriesOrWait(ManagedCursor cursor,

if (currentCompactionHorizon == null
|| currentCompactionHorizon.compareTo(cursorPosition) < 0) {
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
} else {
ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,21 @@

public class CompactedTopicUtils {

@Beta
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
ManagedCursor cursor, int maxEntries,
long bytesToRead, boolean readFromEarliest,
AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable Consumer consumer) {
asyncReadCompactedEntries(topicCompactionService, cursor, maxEntries, bytesToRead, PositionImpl.LATEST,
readFromEarliest, callback, wait, consumer);
}

@Beta
public static void asyncReadCompactedEntries(TopicCompactionService topicCompactionService,
ManagedCursor cursor, int maxEntries,
long bytesToRead, PositionImpl maxReadPosition,
boolean readFromEarliest, AsyncCallbacks.ReadEntriesCallback callback,
boolean wait, @Nullable Consumer consumer) {
Objects.requireNonNull(topicCompactionService);
Objects.requireNonNull(cursor);
checkArgument(maxEntries > 0);
Expand All @@ -68,11 +77,9 @@ public static void asyncReadCompactedEntries(TopicCompactionService topicCompact
|| readPosition.compareTo(
lastCompactedPosition.getLedgerId(), lastCompactedPosition.getEntryId()) > 0) {
if (wait) {
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
} else {
cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx,
PositionImpl.LATEST);
cursor.asyncReadEntries(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
}
return CompletableFuture.completedFuture(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
}
};

CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100, false,
readEntriesCallback, false, null);
CompactedTopicUtils.asyncReadCompactedEntries(service, cursor, 1, 100,
PositionImpl.LATEST, false, readEntriesCallback, false, null);

List<Entry> entries = completableFuture.get();
Assert.assertTrue(entries.isEmpty());
Expand Down

0 comments on commit b26a52f

Please # to comment.