Skip to content

Commit

Permalink
[improve][ml] Avoid repetitive nested lock for isMessageDeleted in Ma…
Browse files Browse the repository at this point in the history
…nagedCursorImpl (apache#23609)

(cherry picked from commit 895e968)
  • Loading branch information
Denovo1998 authored and lhotari committed Nov 21, 2024
1 parent ea8b659 commit 96cf000
Showing 1 changed file with 10 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1514,7 +1514,7 @@ public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positi
Set<Position> alreadyAcknowledgedPositions = new HashSet<>();
lock.readLock().lock();
try {
positions.stream().filter(this::isMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
positions.stream().filter(this::internalIsMessageDeleted).forEach(alreadyAcknowledgedPositions::add);
} finally {
lock.readLock().unlock();
}
Expand Down Expand Up @@ -2286,7 +2286,7 @@ public void asyncDelete(Iterable<Position> positions, AsyncCallbacks.DeleteCallb
return;
}

if (isMessageDeleted(position)) {
if (internalIsMessageDeleted(position)) {
if (getConfig().isDeletionAtBatchIndexLevelEnabled()) {
BitSetRecyclable bitSetRecyclable = batchDeletedIndexes.remove(position);
if (bitSetRecyclable != null) {
Expand Down Expand Up @@ -3436,13 +3436,19 @@ public boolean isMessageDeleted(Position position) {
checkArgument(position instanceof PositionImpl);
lock.readLock().lock();
try {
return ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
return internalIsMessageDeleted(position);
} finally {
lock.readLock().unlock();
}
}

// When this method is called while the external has already acquired a write lock or a read lock,
// it avoids unnecessary lock nesting.
private boolean internalIsMessageDeleted(Position position) {
return ((PositionImpl) position).compareTo(markDeletePosition) <= 0
|| individualDeletedMessages.contains(position.getLedgerId(), position.getEntryId());
}

//this method will return a copy of the position's ack set
public long[] getBatchPositionAckSet(Position position) {
if (!(position instanceof PositionImpl)) {
Expand Down

0 comments on commit 96cf000

Please # to comment.