Skip to content

Commit

Permalink
[improve][broker] Reduce the re-schedule message read operation for P…
Browse files Browse the repository at this point in the history
…ersistentDispatcherMultipleConsumers (#16241)

### Motivation

Fix the CPU consumption while having many consumers (> 100k) and enabled dispatch rate limit.

![image](https://user-images.githubusercontent.com/12592133/175940861-7be13d62-042d-46b9-923d-3b1e8354d331.png)

[broker_perf.html.txt](https://github.com/apache/pulsar/files/8991916/broker_perf.html.txt)

### Modification

- Added `isRescheduleReadInProgress` to ensure the dispatcher only has one pending re-schedule read task at a time.
- Added DEBUG log for the re-schedule read operation

(cherry picked from commit eec46dd)
(cherry picked from commit e83c26e)
  • Loading branch information
codelipenghui authored and nicoloboschi committed Jul 4, 2022
1 parent da72a32 commit 8b1d7a0
Showing 1 changed file with 14 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
Expand Down Expand Up @@ -105,6 +106,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
"blockedDispatcherOnUnackedMsgs");
protected Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();

private AtomicBoolean isRescheduleReadInProgress = new AtomicBoolean(false);

protected enum ReadType {
Normal, Replay
}
Expand Down Expand Up @@ -290,8 +293,17 @@ public synchronized void readMoreEntries() {

@Override
protected void reScheduleRead() {
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
TimeUnit.MILLISECONDS);
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Reschedule message read in {} ms", topic.getName(), name, MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(
() -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
},
MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
}

// left pair is messagesToRead, right pair is bytesToRead
Expand Down

0 comments on commit 8b1d7a0

Please # to comment.