Skip to content

Commit

Permalink
[feat][broker][PIP-195]Implement Filter out all delayed messages and …
Browse files Browse the repository at this point in the history
…skip them when reading messages from bookies - part7 (#19035)
  • Loading branch information
coderzc authored Jan 3, 2023
1 parent f912fb3 commit 9ec1d07
Show file tree
Hide file tree
Showing 9 changed files with 230 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,21 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition);

/**
* Asynchronously read entries from the ManagedLedger.
*
* @param numberOfEntriesToRead maximum number of entries to return
* @param maxSizeBytes max size in bytes of the entries to return
* @param callback callback object
* @param ctx opaque context
* @param maxPosition max position can read
* @param skipCondition predicate of read filter out
*/
default void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
asyncReadEntries(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Get 'N'th entry from the mark delete position in the cursor without updating any cursor positions.
*
Expand Down Expand Up @@ -264,6 +279,55 @@ void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallback callb
void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition);

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
*
* <p/>If no entries are available, the callback will not be triggered. Instead it will be registered to wait until
* a new message will be persisted into the managed ledger
*
* @see #readEntriesOrWait(int, long)
* @param maxEntries
* maximum number of entries to return
* @param callback
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
* @param skipCondition
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
asyncReadEntriesOrWait(maxEntries, callback, ctx, maxPosition);
}

/**
* Asynchronously read entries from the ManagedLedger, up to the specified number and size.
*
* <p/>If no entries are available, the callback will not be triggered. Instead it will be registered to wait until
* a new message will be persisted into the managed ledger
*
* @see #readEntriesOrWait(int, long)
* @param maxEntries
* maximum number of entries to return
* @param maxSizeBytes
* max size in bytes of the entries to return
* @param callback
* callback object
* @param ctx
* opaque context
* @param maxPosition
* max position can read
* @param skipCondition
* predicate of read filter out
*/
default void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
asyncReadEntriesOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition);
}

/**
* Cancel a previously scheduled asyncReadEntriesOrWait operation.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,13 +762,19 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

@Override
public void asyncReadEntries(final int numberOfEntriesToRead, final ReadEntriesCallback callback,
final Object ctx, PositionImpl maxPosition) {
final Object ctx, PositionImpl maxPosition) {
asyncReadEntries(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition);
}

@Override
public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition) {
asyncReadEntriesWithSkip(numberOfEntriesToRead, maxSizeBytes, callback, ctx, maxPosition, null);
}

@Override
public void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
checkArgument(numberOfEntriesToRead > 0);
if (isClosed()) {
callback.readEntriesFailed(new ManagedLedgerException
Expand All @@ -779,7 +785,8 @@ public void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadE
int numOfEntriesToRead = applyMaxSizeCap(numberOfEntriesToRead, maxSizeBytes);

PENDING_READ_OPS_UPDATER.incrementAndGet(this);
OpReadEntry op = OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition);
OpReadEntry op =
OpReadEntry.create(this, readPosition, numOfEntriesToRead, callback, ctx, maxPosition, skipCondition);
ledger.asyncReadEntries(op);
}

Expand Down Expand Up @@ -901,6 +908,20 @@ public void asyncReadEntriesOrWait(int numberOfEntriesToRead, ReadEntriesCallbac
@Override
public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback, Object ctx,
PositionImpl maxPosition) {
asyncReadEntriesWithSkipOrWait(maxEntries, maxSizeBytes, callback, ctx, maxPosition, null);
}

@Override
public void asyncReadEntriesWithSkipOrWait(int maxEntries, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
asyncReadEntriesWithSkipOrWait(maxEntries, NO_MAX_SIZE_LIMIT, callback, ctx, maxPosition, skipCondition);
}

@Override
public void asyncReadEntriesWithSkipOrWait(int maxEntries, long maxSizeBytes, ReadEntriesCallback callback,
Object ctx, PositionImpl maxPosition,
Predicate<PositionImpl> skipCondition) {
checkArgument(maxEntries > 0);
if (isClosed()) {
callback.readEntriesFailed(new CursorAlreadyClosedException("Cursor was already closed"), ctx);
Expand All @@ -914,10 +935,11 @@ public void asyncReadEntriesOrWait(int maxEntries, long maxSizeBytes, ReadEntrie
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Read entries immediately", ledger.getName(), name);
}
asyncReadEntries(numberOfEntriesToRead, callback, ctx, maxPosition);
asyncReadEntriesWithSkip(numberOfEntriesToRead, NO_MAX_SIZE_LIMIT, callback, ctx,
maxPosition, skipCondition);
} else {
OpReadEntry op = OpReadEntry.create(this, readPosition, numberOfEntriesToRead, callback,
ctx, maxPosition);
ctx, maxPosition, skipCondition);

if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
op.recycle();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2054,6 +2054,39 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)

long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);

// Filer out and skip unnecessary read entry
if (opReadEntry.skipCondition != null) {
long firstValidEntry = -1L;
long lastValidEntry = -1L;
long entryId = firstEntry;
for (; entryId <= lastEntry; entryId++) {
if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
if (firstValidEntry == -1L) {
firstValidEntry = entryId;
}
} else {
if (firstValidEntry != -1L) {
break;
}
}

if (firstValidEntry != -1L) {
lastValidEntry = entryId;
}
}

// If all messages in [firstEntry...lastEntry] are filter out,
// then manual call internalReadEntriesComplete to advance read position.
if (firstValidEntry == -1L) {
opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
PositionImpl.get(ledger.getId(), lastEntry));
return;
}

firstEntry = firstValidEntry;
lastEntry = lastValidEntry;
}

if (log.isDebugEnabled()) {
log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
lastEntry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
Expand All @@ -45,8 +47,10 @@ class OpReadEntry implements ReadEntriesCallback {
private PositionImpl nextReadPosition;
PositionImpl maxPosition;

Predicate<PositionImpl> skipCondition;

public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPositionRef, int count,
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition) {
ReadEntriesCallback callback, Object ctx, PositionImpl maxPosition, Predicate<PositionImpl> skipCondition) {
OpReadEntry op = RECYCLER.get();
op.readPosition = cursor.ledger.startReadOperationOnLedger(readPositionRef);
op.cursor = cursor;
Expand All @@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
maxPosition = PositionImpl.LATEST;
}
op.maxPosition = maxPosition;
op.skipCondition = skipCondition;
op.ctx = ctx;
op.nextReadPosition = PositionImpl.get(op.readPosition);
return op;
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {
// Filter the returned entries for individual deleted messages
int entriesCount = returnedEntries.size();
long entriesSize = 0;
Expand All @@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
}
cursor.updateReadStats(entriesCount, entriesSize);

final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
if (entriesCount != 0) {
lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Read entries succeeded batch_size={} cumulative_size={} requested_count={}",
cursor.ledger.getName(), cursor.getName(), returnedEntries.size(), entries.size(), count);
}
List<Entry> filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);

List<Entry> filteredEntries = Collections.emptyList();
if (entriesCount != 0) {
filteredEntries = cursor.filterReadEntries(returnedEntries);
entries.addAll(filteredEntries);
}

// if entries have been filtered out then try to skip reading of already deletedMessages in that range
final Position nexReadPosition = entriesCount != filteredEntries.size()
Expand All @@ -87,6 +97,11 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
checkReadCompletion();
}

@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
internalReadEntriesComplete(returnedEntries, ctx, null);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
cursor.readOperationCompleted();
Expand Down Expand Up @@ -190,6 +205,7 @@ public void recycle() {
nextReadPosition = null;
maxPosition = null;
recyclerHandle.recycle(this);
skipCondition = null;
}

private static final Logger log = LoggerFactory.getLogger(OpReadEntry.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public void find() {
}
if (cursor.hasMoreEntries(searchPosition)) {
OpReadEntry opReadEntry = OpReadEntry.create(cursor, searchPosition, batchSize,
this, OpScan.this.ctx, null);
this, OpScan.this.ctx, null, null);
ledger.asyncReadEntries(opReadEntry);
} else {
callback.scanComplete(lastSeenPosition, ScanOutcome.COMPLETED, OpScan.this.ctx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Predicate;
import java.util.concurrent.CompletableFuture;
import java.util.function.Predicate;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ClearBacklogCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -4126,7 +4127,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

// op readPosition is bigger than maxReadPosition
OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
null, PositionImpl.get(lastPosition.getLedgerId(), -1));
null, PositionImpl.get(lastPosition.getLedgerId(), -1), null);
Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
field.setAccessible(true);
field.set(cursor, PositionImpl.EARLIEST);
Expand All @@ -4148,7 +4149,7 @@ public void testOpReadEntryRecycle() throws Exception {
};

@Cleanup final MockedStatic<OpReadEntry> mockedStaticOpReadEntry = Mockito.mockStatic(OpReadEntry.class);
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any()))
mockedStaticOpReadEntry.when(() -> OpReadEntry.create(any(), any(), anyInt(), any(), any(), any(), any()))
.thenAnswer(__ -> createOpReadEntry.get());

final ManagedLedgerConfig ledgerConfig = new ManagedLedgerConfig();
Expand Down Expand Up @@ -4252,5 +4253,67 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
factory2.shutdown();
}

@Test
public void testReadEntriesWithFilterOut() throws ManagedLedgerException, InterruptedException, ExecutionException {
int readMaxNumber = 10;
int sendNumber = 20;
ManagedLedger ledger = factory.open("testReadEntriesWithFilter");
ManagedCursor cursor = ledger.openCursor("c");
Position position = PositionImpl.EARLIEST;
Position maxCanReadPosition = PositionImpl.EARLIEST;
for (int i = 0; i < sendNumber; i++) {
if (i == readMaxNumber - 1) {
position = ledger.addEntry(new byte[1024]);
} else if (i == sendNumber - 1) {
maxCanReadPosition = ledger.addEntry(new byte[1024]);
} else {
ledger.addEntry(new byte[1024]);
}

}
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.completeExceptionally(exception);
}
}, null, (PositionImpl) position, pos -> {
return pos.getEntryId() % 2 != 0;
});

int number = completableFuture.get();
assertEquals(number, readMaxNumber / 2);

assertEquals(cursor.getReadPosition().getEntryId(), 10);

CompletableFuture<Integer> completableFuture2 = new CompletableFuture<>();
cursor.asyncReadEntriesWithSkipOrWait(sendNumber, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
completableFuture2.complete(entries.size());
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
completableFuture2.completeExceptionally(exception);
}
}, null, (PositionImpl) maxCanReadPosition, pos -> {
return pos.getEntryId() % 2 != 0;
});

int number2 = completableFuture2.get();
assertEquals(number2, readMaxNumber / 2);

assertEquals(cursor.getReadPosition().getEntryId(), 20);

cursor.close();
ledger.close();
}

private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
}
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ public void readEntriesComplete(List<Entry> entries, Object ctx) {
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {

}
}, null, maxPosition);
}, null, maxPosition, null);
Assert.assertEquals(opReadEntry.readPosition, position);
}

Expand Down Expand Up @@ -3030,7 +3030,7 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
responseException2.set(exception);
}

}, null, PositionImpl.LATEST);
}, null, PositionImpl.LATEST, null);
ledger.asyncReadEntry(ledgerHandle, PositionImpl.EARLIEST.getEntryId(), PositionImpl.EARLIEST.getEntryId(),
opReadEntry, ctxStr);
retryStrategically((test) -> {
Expand Down
Loading

0 comments on commit 9ec1d07

Please # to comment.