From afb3b9a1c9807cc1518354e5749cf45f13c8b61e Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Fri, 1 Apr 2022 17:48:04 +0800 Subject: [PATCH 1/8] Evicting cache data by the slowest markDeletedPosition --- .../mledger/ManagedLedgerConfig.java | 6 +++++ .../mledger/impl/ManagedCursorContainer.java | 4 +++ .../mledger/impl/ManagedLedgerImpl.java | 26 ++++++++++++++++--- .../pulsar/broker/ServiceConfiguration.java | 7 +++++ .../pulsar/broker/service/BrokerService.java | 2 ++ 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 1d30e169bfe20..181f66587a436 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -24,6 +24,9 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; + +import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; import org.apache.bookkeeper.client.api.DigestType; import org.apache.bookkeeper.common.annotation.InterfaceAudience; @@ -75,6 +78,9 @@ public class ManagedLedgerConfig { private ManagedLedgerInterceptor managedLedgerInterceptor; private Map properties; private int inactiveLedgerRollOverTimeMs = 0; + @Getter + @Setter + private boolean cacheEvictionByMarkDeletedPosition = false; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java index 65d254112d157..ef9a546a50781 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorContainer.java @@ -108,6 +108,10 @@ public PositionImpl getSlowestReadPositionForActiveCursors() { return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getReadPosition(); } + public PositionImpl getSlowestMarkDeletedPositionForActiveCursors() { + return heap.isEmpty() ? null : (PositionImpl) heap.get(0).cursor.getMarkDeletedPosition(); + } + public ManagedCursor get(String name) { long stamp = rwLock.readLock(); try { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 7eccca1c17a9b..bc53ac5793e7c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2166,10 +2166,15 @@ void doCacheEviction(long maxTimestamp) { if (entryCache.getSize() <= 0) { return; } - // Always remove all entries already read by active cursors - PositionImpl slowestReaderPos = getEarlierReadPositionForActiveCursors(); - if (slowestReaderPos != null) { - entryCache.invalidateEntries(slowestReaderPos); + PositionImpl evictionPos; + if (config.isCacheEvictionByMarkDeletedPosition()) { + evictionPos = getEarlierMarkDeletedPositionForActiveCursors(); + } else { + // Always remove all entries already read by active cursors + evictionPos = getEarlierReadPositionForActiveCursors(); + } + if (evictionPos != null) { + entryCache.invalidateEntries(evictionPos); } // Remove entries older than the cutoff threshold @@ -2188,6 +2193,19 @@ private PositionImpl getEarlierReadPositionForActiveCursors() { return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; } + private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { + PositionImpl nonDurablePosition = nonDurableActiveCursors.getSlowestMarkDeletedPositionForActiveCursors(); + PositionImpl durablePosition = activeCursors.getSlowestMarkDeletedPositionForActiveCursors(); + if (nonDurablePosition == null) { + return durablePosition; + } + if (durablePosition == null) { + return nonDurablePosition; + } + return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; + } + + void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); if (pair == null) { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index f75de1d63ae4e..60cade2dcd9ed 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2422,6 +2422,13 @@ public class ServiceConfiguration implements PulsarConfiguration { ) private int managedLedgerInactiveLedgerRolloverTimeSeconds = 0; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + + "The default is to evict through readPosition." + ) + private boolean cacheEvictionByMarkDeletedPosition = false; + /**** --- Transaction config variables. --- ****/ @FieldContext( category = CATEGORY_TRANSACTION, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 83d509cc1c323..60ab332777b9f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1598,6 +1598,8 @@ public CompletableFuture getManagedLedgerConfig(TopicName t managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); managedLedgerConfig.setInactiveLedgerRollOverTime( serviceConfig.getManagedLedgerInactiveLedgerRolloverTimeSeconds(), TimeUnit.SECONDS); + managedLedgerConfig.setCacheEvictionByMarkDeletedPosition( + serviceConfig.isCacheEvictionByMarkDeletedPosition()); OffloadPoliciesImpl nsLevelOffloadPolicies = (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); From d24a134eadfaeee3253781948ddf35d27a35e3ae Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Fri, 1 Apr 2022 19:03:24 +0800 Subject: [PATCH 2/8] check style --- .../java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 181f66587a436..8b0375d23f3f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.Map; import java.util.concurrent.TimeUnit; - import lombok.Getter; import lombok.Setter; import org.apache.bookkeeper.client.EnsemblePlacementPolicy; From 5497474e5cff1bf75d4bb92776538fc8d38ac799 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Fri, 1 Apr 2022 21:15:58 +0800 Subject: [PATCH 3/8] check style --- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 60cade2dcd9ed..7f73b84fc4ac5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2424,8 +2424,8 @@ public class ServiceConfiguration implements PulsarConfiguration { @FieldContext( category = CATEGORY_STORAGE_ML, - doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + - "The default is to evict through readPosition." + doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + + "The default is to evict through readPosition." ) private boolean cacheEvictionByMarkDeletedPosition = false; From deeaec2ef701c3f1b10cd339d1b98e53a5c47c26 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Sat, 2 Apr 2022 16:03:05 +0800 Subject: [PATCH 4/8] 1.add unit test; --- .../mledger/ManagedLedgerConfig.java | 2 +- .../mledger/impl/ManagedLedgerImpl.java | 2 +- .../mledger/impl/ManagedLedgerTest.java | 183 ++++++++++++++++++ .../pulsar/broker/ServiceConfiguration.java | 2 +- 4 files changed, 186 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 8b0375d23f3f0..767fdba055ce9 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -79,7 +79,7 @@ public class ManagedLedgerConfig { private int inactiveLedgerRollOverTimeMs = 0; @Getter @Setter - private boolean cacheEvictionByMarkDeletedPosition = false; + private boolean cacheEvictionByMarkDeletedPosition = true; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index bc53ac5793e7c..484689fd5149b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2168,7 +2168,7 @@ void doCacheEviction(long maxTimestamp) { } PositionImpl evictionPos; if (config.isCacheEvictionByMarkDeletedPosition()) { - evictionPos = getEarlierMarkDeletedPositionForActiveCursors(); + evictionPos = getEarlierMarkDeletedPositionForActiveCursors().getNext(); } else { // Always remove all entries already read by active cursors evictionPos = getEarlierReadPositionForActiveCursors(); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 601913c7dfdaf..2f2bb0b61903a 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -291,6 +291,189 @@ public void acknowledge1() throws Exception { ledger.close(); } + @Test + public void testCacheEvictionByMarkDeletedPosition() throws Throwable { + final CountDownLatch counter = new CountDownLatch(1); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos(30000)); + factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + ManagedLedger ledger = (ManagedLedger) ctx; + String message1 = "test"; + ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + @SuppressWarnings("unchecked") + Pair pair = (Pair) ctx; + ManagedLedger ledger = pair.getLeft(); + ManagedCursor cursor = pair.getRight(); + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + + cursor.asyncReadEntries(1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + ManagedCursor cursor = (ManagedCursor) ctx; + assertEquals(entries.size(), 1); + Entry entry = entries.get(0); + final Position position = entry.getPosition(); + assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + + log.debug("Mark-Deleting to position {}", position); + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.debug("Mark delete complete"); + ManagedCursor cursor = (ManagedCursor) ctx; + assertFalse(cursor.hasMoreEntries()); + // wait eviction finish. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); + + counter.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + + }, cursor); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, cursor, PositionImpl.LATEST); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, Pair.of(ledger, cursor)); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + + }, ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, null, null); + + counter.await(); + + log.info("Test completed"); + } + + @Test + public void testCacheEvictionByReadPosition() throws Throwable { + final CountDownLatch counter = new CountDownLatch(1); + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(false); + factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos(30000)); + factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { + @Override + public void openLedgerComplete(ManagedLedger ledger, Object ctx) { + ledger.asyncOpenCursor("test-cursor", new OpenCursorCallback() { + @Override + public void openCursorComplete(ManagedCursor cursor, Object ctx) { + ManagedLedger ledger = (ManagedLedger) ctx; + String message1 = "test"; + ledger.asyncAddEntry(message1.getBytes(Encoding), new AddEntryCallback() { + @Override + public void addComplete(Position position, ByteBuf entryData, Object ctx) { + @SuppressWarnings("unchecked") + Pair pair = (Pair) ctx; + ManagedLedger ledger = pair.getLeft(); + ManagedCursor cursor = pair.getRight(); + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + + cursor.asyncReadEntries(1, new ReadEntriesCallback() { + @Override + public void readEntriesComplete(List entries, Object ctx) { + ManagedCursor cursor = (ManagedCursor) ctx; + assertEquals(entries.size(), 1); + Entry entry = entries.get(0); + final Position position = entry.getPosition(); + assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); + // wait eviction finish. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + } + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); + + log.debug("Mark-Deleting to position {}", position); + cursor.asyncMarkDelete(position, new MarkDeleteCallback() { + @Override + public void markDeleteComplete(Object ctx) { + log.debug("Mark delete complete"); + ManagedCursor cursor = (ManagedCursor) ctx; + assertFalse(cursor.hasMoreEntries()); + assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); + + counter.countDown(); + } + + @Override + public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + + }, cursor); + } + + @Override + public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, cursor, PositionImpl.LATEST); + } + + @Override + public void addFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, Pair.of(ledger, cursor)); + } + + @Override + public void openCursorFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + + }, ledger); + } + + @Override + public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { + fail(exception.getMessage()); + } + }, null, null); + + counter.await(); + + log.info("Test completed"); + } + @Test(timeOut = 20000) public void asyncAPI() throws Throwable { final CountDownLatch counter = new CountDownLatch(1); diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 7f73b84fc4ac5..04b0f44af13e7 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2427,7 +2427,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + "The default is to evict through readPosition." ) - private boolean cacheEvictionByMarkDeletedPosition = false; + private boolean cacheEvictionByMarkDeletedPosition = true; /**** --- Transaction config variables. --- ****/ @FieldContext( From 4b976cd5542cc656a8402996ca25b79ac63ef9cf Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Sat, 2 Apr 2022 16:45:02 +0800 Subject: [PATCH 5/8] 1.use readposition by default; --- .../java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java | 2 +- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 2 +- .../java/org/apache/pulsar/broker/ServiceConfiguration.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 767fdba055ce9..8b0375d23f3f0 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -79,7 +79,7 @@ public class ManagedLedgerConfig { private int inactiveLedgerRollOverTimeMs = 0; @Getter @Setter - private boolean cacheEvictionByMarkDeletedPosition = true; + private boolean cacheEvictionByMarkDeletedPosition = false; public boolean isCreateIfMissing() { return createIfMissing; diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 2f2bb0b61903a..344b127639862 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -295,6 +295,7 @@ public void acknowledge1() throws Exception { public void testCacheEvictionByMarkDeletedPosition() throws Throwable { final CountDownLatch counter = new CountDownLatch(1); ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setCacheEvictionByMarkDeletedPosition(true); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { @@ -386,7 +387,6 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { public void testCacheEvictionByReadPosition() throws Throwable { final CountDownLatch counter = new CountDownLatch(1); ManagedLedgerConfig config = new ManagedLedgerConfig(); - config.setCacheEvictionByMarkDeletedPosition(false); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); factory.asyncOpen("my_test_ledger", config, new OpenLedgerCallback() { diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 04b0f44af13e7..7f73b84fc4ac5 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2427,7 +2427,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Evicting cache data by the slowest markDeletedPosition or readPosition. " + "The default is to evict through readPosition." ) - private boolean cacheEvictionByMarkDeletedPosition = true; + private boolean cacheEvictionByMarkDeletedPosition = false; /**** --- Transaction config variables. --- ****/ @FieldContext( From 0afb77d3a62bc35ce7776b10edda0fdc585d8db0 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Sat, 2 Apr 2022 21:38:10 +0800 Subject: [PATCH 6/8] use doCacheEviction instead of using sleep. --- .../mledger/impl/ManagedLedgerTest.java | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index 344b127639862..cfd15db3c41f7 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -323,6 +323,8 @@ public void readEntriesComplete(List entries, Object ctx) { Entry entry = entries.get(0); final Position position = entry.getPosition(); assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); log.debug("Mark-Deleting to position {}", position); @@ -332,11 +334,8 @@ public void markDeleteComplete(Object ctx) { log.debug("Mark delete complete"); ManagedCursor cursor = (ManagedCursor) ctx; assertFalse(cursor.hasMoreEntries()); - // wait eviction finish. - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); counter.countDown(); @@ -414,11 +413,8 @@ public void readEntriesComplete(List entries, Object ctx) { Entry entry = entries.get(0); final Position position = entry.getPosition(); assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); - // wait eviction finish. - try { - Thread.sleep(100); - } catch (InterruptedException e) { - } + ((ManagedLedgerImpl) ledger).doCacheEviction( + System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); log.debug("Mark-Deleting to position {}", position); From dc7ff9698addbcf442e352aae66414cccdcb2981 Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Sun, 3 Apr 2022 11:55:55 +0800 Subject: [PATCH 7/8] check style --- .../org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 - 1 file changed, 1 deletion(-) diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index 484689fd5149b..c7ad5fd4f225b 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -2205,7 +2205,6 @@ private PositionImpl getEarlierMarkDeletedPositionForActiveCursors() { return durablePosition.compareTo(nonDurablePosition) > 0 ? nonDurablePosition : durablePosition; } - void updateCursor(ManagedCursorImpl cursor, PositionImpl newPosition) { Pair pair = cursors.cursorUpdated(cursor, newPosition); if (pair == null) { From bda7422b128e62f075a37e875114a4937617cace Mon Sep 17 00:00:00 2001 From: chenlin <1572139390@qq.com> Date: Sun, 3 Apr 2022 20:34:18 +0800 Subject: [PATCH 8/8] use CompletableFuture in unit test --- .../mledger/impl/ManagedLedgerTest.java | 77 ++++++++++++------- 1 file changed, 48 insertions(+), 29 deletions(-) diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index cfd15db3c41f7..2cb14c6c37f4f 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -293,7 +293,7 @@ public void acknowledge1() throws Exception { @Test public void testCacheEvictionByMarkDeletedPosition() throws Throwable { - final CountDownLatch counter = new CountDownLatch(1); + CompletableFuture result = new CompletableFuture<>(); ManagedLedgerConfig config = new ManagedLedgerConfig(); config.setCacheEvictionByMarkDeletedPosition(true); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS @@ -313,8 +313,10 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { Pair pair = (Pair) ctx; ManagedLedger ledger = pair.getLeft(); ManagedCursor cursor = pair.getRight(); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); - + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } cursor.asyncReadEntries(1, new ReadEntriesCallback() { @Override public void readEntriesComplete(List entries, Object ctx) { @@ -322,10 +324,16 @@ public void readEntriesComplete(List entries, Object ctx) { assertEquals(entries.size(), 1); Entry entry = entries.get(0); final Position position = entry.getPosition(); - assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); + if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { + result.complete(false); + return; + } ((ManagedLedgerImpl) ledger).doCacheEviction( System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } log.debug("Mark-Deleting to position {}", position); cursor.asyncMarkDelete(position, new MarkDeleteCallback() { @@ -333,17 +341,18 @@ public void readEntriesComplete(List entries, Object ctx) { public void markDeleteComplete(Object ctx) { log.debug("Mark delete complete"); ManagedCursor cursor = (ManagedCursor) ctx; - assertFalse(cursor.hasMoreEntries()); + if (cursor.hasMoreEntries()) { + result.complete(false); + return; + } ((ManagedLedgerImpl) ledger).doCacheEviction( System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); - - counter.countDown(); + result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, cursor); @@ -351,21 +360,21 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, cursor, PositionImpl.LATEST); } @Override public void addFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, Pair.of(ledger, cursor)); } @Override public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, ledger); @@ -373,18 +382,18 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, null, null); - counter.await(); + assertTrue(result.get()); log.info("Test completed"); } @Test public void testCacheEvictionByReadPosition() throws Throwable { - final CountDownLatch counter = new CountDownLatch(1); + CompletableFuture result = new CompletableFuture<>(); ManagedLedgerConfig config = new ManagedLedgerConfig(); factory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS .toNanos(30000)); @@ -403,7 +412,10 @@ public void addComplete(Position position, ByteBuf entryData, Object ctx) { Pair pair = (Pair) ctx; ManagedLedger ledger = pair.getLeft(); ManagedCursor cursor = pair.getRight(); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), message1.getBytes(Encoding).length); + if (((ManagedLedgerImpl) ledger).getCacheSize() != message1.getBytes(Encoding).length) { + result.complete(false); + return; + } cursor.asyncReadEntries(1, new ReadEntriesCallback() { @Override @@ -412,10 +424,16 @@ public void readEntriesComplete(List entries, Object ctx) { assertEquals(entries.size(), 1); Entry entry = entries.get(0); final Position position = entry.getPosition(); - assertEquals(new String(entry.getDataAndRelease(), Encoding), message1); + if (!message1.equals(new String(entry.getDataAndRelease(), Encoding))) { + result.complete(false); + return; + } ((ManagedLedgerImpl) ledger).doCacheEviction( System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(30000)); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); + if (((ManagedLedgerImpl) ledger).getCacheSize() != 0) { + result.complete(false); + return; + } log.debug("Mark-Deleting to position {}", position); cursor.asyncMarkDelete(position, new MarkDeleteCallback() { @@ -423,15 +441,16 @@ public void readEntriesComplete(List entries, Object ctx) { public void markDeleteComplete(Object ctx) { log.debug("Mark delete complete"); ManagedCursor cursor = (ManagedCursor) ctx; - assertFalse(cursor.hasMoreEntries()); - assertEquals(((ManagedLedgerImpl) ledger).getCacheSize(), 0); - - counter.countDown(); + if (cursor.hasMoreEntries()) { + result.complete(false); + return; + } + result.complete(((ManagedLedgerImpl) ledger).getCacheSize() == 0); } @Override public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, cursor); @@ -439,21 +458,21 @@ public void markDeleteFailed(ManagedLedgerException exception, Object ctx) { @Override public void readEntriesFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, cursor, PositionImpl.LATEST); } @Override public void addFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, Pair.of(ledger, cursor)); } @Override public void openCursorFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, ledger); @@ -461,11 +480,11 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) { @Override public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { - fail(exception.getMessage()); + result.completeExceptionally(exception); } }, null, null); - counter.await(); + assertTrue(result.get()); log.info("Test completed"); }