Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[feat][broker] Implementation of PIP-323: Complete Backlog Quota Telemetry #21816

Merged
merged 11 commits into from
Jan 17, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ public interface ManagedLedgerMXBean {
*/
long getAddEntryErrors();

/**
* @return the number of entries read from the managed ledger (from cache or BK)
*/
long getEntriesReadTotalCount();

/**
* @return the number of readEntries requests that succeeded
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,46 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.StampedLock;
import lombok.Value;
import lombok.experimental.UtilityClass;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.lang3.tuple.Pair;

/**
* Contains cursors for a ManagedLedger.
*
* <p/>The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
*
* <p/>This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* <p>
* The goal is to always know the slowest consumer and hence decide which is the oldest ledger we need to keep.
* <p>
* This data structure maintains a heap and a map of cursors. The map is used to relate a cursor name with
* an entry index in the heap. The heap data structure sorts cursors in a binary tree which is represented
* in a single array. More details about heap implementations:
* https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation
*
* <p/>The heap is updated and kept sorted when a cursor is updated.
* <a href="https://en.wikipedia.org/wiki/Heap_(data_structure)#Implementation">here</a>
* <p>
* The heap is updated and kept sorted when a cursor is updated.
*
*/
public class ManagedCursorContainer implements Iterable<ManagedCursor> {

/**
* This field is incremented everytime the cursor information is updated.
*/
private long version;

@Value
public static class CursorInfo {
ManagedCursor cursor;
PositionImpl position;

/**
* Cursor info's version.
* <p>
* Use {@link DataVersion#compareVersions(long, long)} to compare between two versions,
* since it rolls over to 0 once reaching Long.MAX_VALUE
*/
long version;
}

private static class Item {
final ManagedCursor cursor;
PositionImpl position;
Expand All @@ -56,10 +77,66 @@ private static class Item {
}
}

public ManagedCursorContainer() {
/**
* Utility class to manage a data version, which rolls over to 0 when reaching Long.MAX_VALUE.
*/
@UtilityClass
public class DataVersion {

/**
* Compares two data versions, which either rolls overs to 0 when reaching Long.MAX_VALUE.
* <p>
* Use {@link DataVersion#getNextVersion(long)} to increment the versions. The assumptions
* are that metric versions are compared with close time proximity one to another, hence,
* they are expected not close to each other in terms of distance, hence we don't
* expect the distance ever to exceed Long.MAX_VALUE / 2, otherwise we wouldn't be able
* to know which one is a later version in case the furthest rolls over to beyond 0. We
* assume the shortest distance between them dictates that.
* <p>
* @param v1 First version to compare
* @param v2 Second version to compare
* @return the value {@code 0} if {@code v1 == v2};
* a value less than {@code 0} if {@code v1 < v2}; and
* a value greater than {@code 0} if {@code v1 > v2}
*/
public static int compareVersions(long v1, long v2) {
if (v1 == v2) {
return 0;
}

// 0-------v1--------v2--------MAX_LONG
if (v2 > v1) {
long distance = v2 - v1;
long wrapAroundDistance = (Long.MAX_VALUE - v2) + v1;
if (distance < wrapAroundDistance) {
return -1;
} else {
return 1;
}

// 0-------v2--------v1--------MAX_LONG
} else {
long distance = v1 - v2;
long wrapAroundDistance = (Long.MAX_VALUE - v1) + v2;
if (distance < wrapAroundDistance) {
return 1; // v1 is bigger
} else {
return -1; // v2 is bigger
}
}
}

public static long getNextVersion(long existingVersion) {
if (existingVersion == Long.MAX_VALUE) {
return 0;
} else {
return existingVersion + 1;
}
}
}

public ManagedCursorContainer() {}

// Used to keep track of slowest cursor.
private final ArrayList<Item> heap = new ArrayList<>();

Expand Down Expand Up @@ -94,6 +171,7 @@ public void add(ManagedCursor cursor, Position position) {
if (cursor.isDurable()) {
durableCursorCount++;
}
version = DataVersion.getNextVersion(version);
} finally {
rwLock.unlockWrite(stamp);
}
Expand Down Expand Up @@ -129,6 +207,7 @@ public boolean removeCursor(String name) {
if (item.cursor.isDurable()) {
durableCursorCount--;
}
version = DataVersion.getNextVersion(version);
return true;
} else {
return false;
Expand Down Expand Up @@ -162,6 +241,7 @@ public Pair<PositionImpl, PositionImpl> cursorUpdated(ManagedCursor cursor, Posi

PositionImpl previousSlowestConsumer = heap.get(0).position;
item.position = (PositionImpl) newPosition;
version = DataVersion.getNextVersion(version);

if (heap.size() == 1) {
return Pair.of(previousSlowestConsumer, item.position);
Expand Down Expand Up @@ -204,6 +284,24 @@ public ManagedCursor getSlowestReader() {
}
}

/**
* @return Returns the CursorInfo for the cursor with the oldest position,
* or null if there aren't any tracked cursors
*/
public CursorInfo getCursorWithOldestPosition() {
long stamp = rwLock.readLock();
try {
if (heap.isEmpty()) {
return null;
} else {
Item item = heap.get(0);
return new CursorInfo(item.cursor, item.position, version);
}
} finally {
rwLock.unlockRead(stamp);
}
}

/**
* Check whether there are any cursors.
* @return true is there are no cursors and false if there are
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ public enum PositionBound {

/**
* This variable is used for testing the tests.
* {@link ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()}
* ManagedLedgerTest#testManagedLedgerWithPlacementPolicyInCustomMetadata()
*/
@VisibleForTesting
Map<String, byte[]> createdLedgerCustomMetadata;
Expand Down Expand Up @@ -2129,6 +2129,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
}

protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
mbean.addEntriesRead(1);
if (config.getReadEntryTimeoutSeconds() > 0) {
// set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public class ManagedLedgerMBeanImpl implements ManagedLedgerMXBean {
private final Rate readEntriesOpsFailed = new Rate();
private final Rate readEntriesOpsCacheMisses = new Rate();
private final Rate markDeleteOps = new Rate();
private final Rate entriesRead = new Rate();

private final LongAdder dataLedgerOpenOp = new LongAdder();
private final LongAdder dataLedgerCloseOp = new LongAdder();
Expand Down Expand Up @@ -80,6 +81,7 @@ public void refreshStats(long period, TimeUnit unit) {
ledgerAddEntryLatencyStatsUsec.refresh();
ledgerSwitchLatencyStatsUsec.refresh();
entryStats.refresh();
entriesRead.calculateRate(seconds);
}

public void addAddEntrySample(long size) {
Expand Down Expand Up @@ -120,6 +122,10 @@ public void addReadEntriesSample(int count, long totalSize) {
readEntriesOps.recordMultipleEvents(count, totalSize);
}

public void addEntriesRead(int count) {
entriesRead.recordEvent(count);
}

public void startDataLedgerOpenOp() {
dataLedgerOpenOp.increment();
}
Expand Down Expand Up @@ -189,6 +195,11 @@ public String getName() {
return managedLedger.getName();
}

@Override
public long getEntriesReadTotalCount() {
return entriesRead.getTotalCount();
}

@Override
public double getAddEntryMessagesRate() {
return addEntryOps.getRate();
Expand Down Expand Up @@ -333,5 +344,4 @@ public PendingBookieOpsStats getPendingBookieOpsStats() {
result.cursorLedgerDeleteOp = cursorLedgerDeleteOp.longValue();
return result;
}

}
Loading
Loading