diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java index 021ac2a6..ca1f96e5 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerEntryPusher.java @@ -222,7 +222,7 @@ private void checkResponseFuturesElapsed(final long endIndex) { private void updateCommittedIndex(final long term, final long committedIndex) { dLedgerStore.updateCommittedIndex(term, committedIndex); - this.fsmCaller.ifPresent(caller -> caller.onCommitted(committedIndex)); + this.fsmCaller.ifPresent(caller -> caller.onCommitted(dLedgerStore.getCommittedIndex())); } /** @@ -874,7 +874,7 @@ private void handleDoAppend(long writeIndex, PushEntryRequest request, future.complete(buildResponse(request, DLedgerResponseCode.SUCCESS.getCode())); updateCommittedIndex(request.getTerm(), request.getCommitIndex()); } catch (Throwable t) { - logger.error("[HandleDoWrite] writeIndex={}", writeIndex, t); + logger.error("[HandleDoAppend] writeIndex={}", writeIndex, t); future.complete(buildResponse(request, DLedgerResponseCode.INCONSISTENT_STATE.getCode())); } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 3f8ab7f6..78bb59ae 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -114,6 +114,7 @@ public StateMachine getStateMachine() { } public boolean onCommitted(final long committedIndex) { + if (committedIndex <= this.lastAppliedIndex.get()) return false; final ApplyTask task = new ApplyTask(); task.type = TaskType.COMMITTED; task.committedIndex = committedIndex; diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java index c9a08366..d3969a0a 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/store/file/DLedgerMmapFileStore.java @@ -609,7 +609,7 @@ public DLedgerEntry get(Long index) { } } - public Pair getEntryPosAndSize(Long index) { + private Pair getEntryPosAndSize(Long index) { indexCheck(index); SelectMmapBufferResult indexSbr = null; try { @@ -625,7 +625,7 @@ public Pair getEntryPosAndSize(Long index) { } } - public void indexCheck(Long index) { + private void indexCheck(Long index) { PreConditions.check(index >= 0, DLedgerResponseCode.INDEX_OUT_OF_RANGE, "%d should gt 0", index); PreConditions.check(index > ledgerBeforeBeginIndex, DLedgerResponseCode.INDEX_LESS_THAN_LOCAL_BEGIN, "%d should be gt %d, beforeBeginIndex may be revised", index, ledgerBeforeBeginIndex); PreConditions.check(index <= ledgerEndIndex, DLedgerResponseCode.INDEX_OUT_OF_RANGE, "%d should between (%d-%d]", index, ledgerBeforeBeginIndex, ledgerEndIndex);