diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java index c2f33639c3d26..3671385e60f75 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java @@ -2187,8 +2187,7 @@ public void operationFailed(ManagedLedgerException exception) { if (ledger.isNoMessagesAfterPos(mdEntry.newPosition)) { persistPositionToMetaStore(mdEntry, cb); } else { - mdEntry.callback.markDeleteFailed(new ManagedLedgerException("Create new cursor ledger failed"), - mdEntry.ctx); + cb.operationFailed(new ManagedLedgerException("Switch new cursor ledger failed")); } } else { persistPositionToLedger(cursorLedger, mdEntry, cb); @@ -2861,9 +2860,19 @@ public void operationFailed(ManagedLedgerException exception) { synchronized (pendingMarkDeleteOps) { // At this point we don't have a ledger ready STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger); - // Note: if the stat is NoLedger, will persist the mark deleted position to metadata store. - // Before giving up, try to persist the position in the metadata store. - flushPendingMarkDeletes(); + // There are two case may cause switch ledger fails. + // 1. No enough BKs; BKs are in read-only mode... + // 2. Write ZK fails. + // Regarding the case "No enough BKs", try to persist the position in the metadata store before + // giving up. + if (!(exception instanceof MetaStoreException)) { + flushPendingMarkDeletes(); + } else { + while (!pendingMarkDeleteOps.isEmpty()) { + MarkDeleteEntry entry = pendingMarkDeleteOps.poll(); + entry.callback.markDeleteFailed(exception, entry.ctx); + } + } } } }); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 4e3f8b7908438..5c10533e2476b 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -270,6 +270,48 @@ void testPersistentMarkDeleteIfCreateCursorLedgerFailed() throws Exception { ml.delete(); } + @Test + void testSwitchLedgerFailed() throws Exception { + final String cursorName = "c1"; + final String mlName = UUID.randomUUID().toString().replaceAll("-", ""); + final ManagedLedgerConfig mlConfig = new ManagedLedgerConfig(); + mlConfig.setMaxEntriesPerLedger(1); + mlConfig.setMetadataMaxEntriesPerLedger(1); + mlConfig.setThrottleMarkDelete(Double.MAX_VALUE); + ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName, mlConfig); + ManagedCursor cursor = ml.openCursor(cursorName); + + List positionList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + positionList.add(ml.addEntry(("entry-" + i).getBytes(Encoding))); + } + + // Inject an error when persistent at the third time. + AtomicInteger persistentCounter = new AtomicInteger(); + metadataStore.failConditional(new MetadataStoreException.BadVersionException("mock error"), (op, path) -> { + if (path.equals(String.format("/managed-ledgers/%s/%s", mlName, cursorName)) + && persistentCounter.incrementAndGet() == 3) { + log.info("Trigger an error"); + return true; + } + return false; + }); + + // Verify: the cursor can be recovered after it fails once. + int failedCount = 0; + for (Position position : positionList) { + try { + cursor.markDelete(position); + } catch (Exception ex) { + failedCount++; + } + } + assertEquals(failedCount, 1); + + // cleanup. + ml.delete(); + } + @Test void testPersistentMarkDeleteIfSwitchCursorLedgerFailed() throws Exception { final int entryCount = 10;