Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add a cache eviction policy:Evicting cache data by the slowest markDeletedPosition #14985

Merged
merged 8 commits into from
Apr 6, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.common.annotation.InterfaceAudience;
Expand Down Expand Up @@ -75,6 +77,9 @@ public class ManagedLedgerConfig {
private ManagedLedgerInterceptor managedLedgerInterceptor;
private Map<String, String> properties;
private int inactiveLedgerRollOverTimeMs = 0;
@Getter
@Setter
private boolean cacheEvictionByMarkDeletedPosition = false;

public boolean isCreateIfMissing() {
return createIfMissing;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ public PositionImpl getSlowestReadPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition();
}

public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() {
return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition();
}

public ManagedCursor get(String name) {
long stamp = rwLock.readLock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2166,10 +2166,15 @@ void doCacheEviction(long maxTimestamp) {
if (entryCache.getSize() <= 0) {
return;
}
// Always remove all entries already read by active cursors
PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors();
if (slowestReaderPos != null) {
entryCache.invalidateEntries(slowestReaderPos);
PositionImpl evictionPos;
if (config.isCacheEvictionByMarkDeletedPosition()) {
evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext();
} else {
// Always remove all entries already read by active cursors
evictionPos = getEarlierReadPositionForActiveCursors();
}
if (evictionPos != null) {
entryCache.invalidateEntries(evictionPos);
}

// Remove entries older than the cutoff threshold
Expand All @@ -2188,6 +2193,19 @@ private PositionImpl getEarlierReadPositionForActiveCursors() {
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}

private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() {
PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors();
PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors();
if (nonDurablePosition == null) {
return durablePosition;
}
if (durablePosition == null) {
return nonDurablePosition;
}
return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition;
}


void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) {
Pair<PositionImpl, PositionImpl> pair = cursors.cursorUpdated(cursor, newPosition);
if (pair == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,189 @@ public void acknowledge1() throws Exception {
ledger.close();
}

@Test
public void testCacheEvictionByMarkDeletedPosition() throws Throwable {
final CountDownLatch counter = new CountDownLatch(1);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setCacheEvictionByMarkDeletedPosition(true);
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
ManagedLedger ledger = (ManagedLedger) ctx;
String message1 = "test";
ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@SuppressWarnings("unchecked")
Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
ManagedLedger ledger = pair.getLeft();
ManagedCursor cursor = pair.getRight();
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length);

cursor.asyncReadEntries(1, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
ManagedCursor cursor = (ManagedCursor) ctx;
assertEquals(entries.size(), 1);
Entry entry = entries.get(0);
final Position position = entry.getPosition();
assertEquals(new String(entry.getDataAndRelease(), Encoding), message1);
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length);

log.debug("Mark-Deleting to position {}", position);
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.debug("Mark delete complete");
ManagedCursor cursor = (ManagedCursor) ctx;
assertFalse(cursor.hasMoreEntries());
// wait eviction finish.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0);

counter.countDown();
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}

}, cursor);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, cursor, PositionImpl.LATEST);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, Pair.of(ledger, cursor));
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}

}, ledger);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, null, null);

counter.await();

log.info("Test completed");
}

@Test
public void testCacheEvictionByReadPosition() throws Throwable {
final CountDownLatch counter = new CountDownLatch(1);
ManagedLedgerConfig config = new ManagedLedgerConfig();
factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos(30000));
factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() {
@Override
public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
ManagedLedger ledger = (ManagedLedger) ctx;
String message1 = "test";
ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() {
@Override
public void addComplete(Position position, ByteBuf entryData, Object ctx) {
@SuppressWarnings("unchecked")
Pair<ManagedLedger, ManagedCursor> pair = (Pair<ManagedLedger, ManagedCursor>) ctx;
ManagedLedger ledger = pair.getLeft();
ManagedCursor cursor = pair.getRight();
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length);

cursor.asyncReadEntries(1, new ReadEntriesCallback() {
@Override
public void readEntriesComplete(List<Entry> entries, Object ctx) {
ManagedCursor cursor = (ManagedCursor) ctx;
assertEquals(entries.size(), 1);
Entry entry = entries.get(0);
final Position position = entry.getPosition();
assertEquals(new String(entry.getDataAndRelease(), Encoding), message1);
// wait eviction finish.
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0);

log.debug("Mark-Deleting to position {}", position);
cursor.asyncMarkDelete(position, new MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
log.debug("Mark delete complete");
ManagedCursor cursor = (ManagedCursor) ctx;
assertFalse(cursor.hasMoreEntries());
assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0);

counter.countDown();
}

@Override
public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}

}, cursor);
}

@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, cursor, PositionImpl.LATEST);
}

@Override
public void addFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, Pair.of(ledger, cursor));
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}

}, ledger);
}

@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
fail(exception.getMessage());
}
}, null, null);

counter.await();

log.info("Test completed");
}

@Test(timeOut = 20000)
public void asyncAPI() throws Throwable {
final CountDownLatch counter = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2422,6 +2422,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
)
private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0;

@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. "
+ "The default is to evict through readPosition."
)
private boolean cacheEvictionByMarkDeletedPosition = false;

/**** --- Transaction config variables. --- ****/
@FieldContext(
category = CATEGORY_TRANSACTION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,8 @@ public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName t
managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery());
managedLedgerConfig.setInactiveLedgerRollOverTime(
serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS);
managedLedgerConfig.setCacheEvictionByMarkDeletedPosition(
serviceConfig.isCacheEvictionByMarkDeletedPosition());

OffloadPoliciesImpl nsLevelOffloadPolicies =
(OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null);
Expand Down