From cdbb6cd08ea1576be782bec429eec07dde56ed19 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 26 May 2023 00:46:02 +0800 Subject: [PATCH] feat(core): add switch to enable/disable snapshot feature 1. add switch to enable/disable snapshot feature Closes https://github.com/openmessaging/dledger/issues/290 --- .../storage/dledger/DLedgerConfig.java | 10 +++++++ .../storage/dledger/DLedgerServer.java | 6 +++-- .../statemachine/StateMachineCaller.java | 12 +++++---- .../storage/dledger/AppendAndReadTest.java | 3 +-- .../storage/dledger/ServerTestHarness.java | 26 +++++++++++++++---- .../dledger/snapshot/SnapshotManagerTest.java | 16 ++++++------ .../statemachine/StateMachineCallerTest.java | 13 +++++----- 7 files changed, 58 insertions(+), 28 deletions(-) diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java index 244e107e..dbc72cb4 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerConfig.java @@ -91,6 +91,8 @@ public class DLedgerConfig { private long leadershipTransferWaitTimeout = 1000; + private boolean enableSnapshot = false; + private int snapshotThreshold = 1000; private int maxSnapshotReservedNum = 3; @@ -485,4 +487,12 @@ public int getMaxSnapshotReservedNum() { public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) { this.maxSnapshotReservedNum = maxSnapshotReservedNum; } + + public boolean isEnableSnapshot() { + return enableSnapshot; + } + + public void setEnableSnapshot(boolean enableSnapshot) { + this.enableSnapshot = enableSnapshot; + } } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 42bb9442..aecf7863 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -144,7 +144,7 @@ public synchronized void startup() { this.fsmCaller.ifPresent(x -> { // Start state machine caller and load existing snapshots for data recovery x.start(); - x.getSnapshotManager().loadSnapshot(); + Optional.ofNullable(x.getSnapshotManager()).ifPresent(sm -> sm.loadSnapshot()); }); if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) { this.dLedgerRpcService.startup(); @@ -191,7 +191,9 @@ public synchronized void registerStateMachine(final StateMachine fsm) { throw new IllegalStateException("can not register statemachine after dledger server starts"); } final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher); - fsmCaller.registerSnapshotManager(new SnapshotManager(this)); + if (this.dLedgerConfig.isEnableSnapshot()) { + fsmCaller.registerSnapshotManager(new SnapshotManager(this)); + } this.fsmCaller = Optional.of(fsmCaller); // Register state machine caller to entry pusher this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java index 8fc5373a..79496689 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/statemachine/StateMachineCaller.java @@ -32,6 +32,7 @@ import io.openmessaging.storage.dledger.store.DLedgerStore; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; @@ -87,7 +88,7 @@ public Thread newThread(Runnable r) { }); private final Function completeEntryCallback; private volatile DLedgerException error; - private SnapshotManager snapshotManager; + private Optional snapshotManager; public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine, final DLedgerEntryPusher entryPusher) { @@ -103,6 +104,7 @@ public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine st } else { this.completeEntryCallback = index -> true; } + this.snapshotManager = Optional.empty(); } private boolean enqueueTask(final ApplyTask task) { @@ -170,7 +172,7 @@ private void doCommitted(final long committedIndex) { if (this.error != null) { return; } - if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) { + if (this.snapshotManager.isPresent() && (this.snapshotManager.get().isLoadingSnapshot() || this.snapshotManager.get().isSavingSnapshot())) { this.scheduledExecutorService.schedule(() -> { try { onCommitted(committedIndex); @@ -196,7 +198,7 @@ private void doCommitted(final long committedIndex) { this.lastAppliedTerm = dLedgerEntry.getTerm(); } // Take snapshot - snapshotManager.saveSnapshot(dLedgerEntry); + snapshotManager.ifPresent(x -> x.saveSnapshot(dLedgerEntry)); // Check response timeout. if (iter.getCompleteAckNums() == 0) { if (this.entryPusher != null) { @@ -300,11 +302,11 @@ public long getLastAppliedTerm() { } public void registerSnapshotManager(SnapshotManager snapshotManager) { - this.snapshotManager = snapshotManager; + this.snapshotManager = Optional.of(snapshotManager); } public SnapshotManager getSnapshotManager() { - return this.snapshotManager; + return this.snapshotManager.orElse(null); } public DLedgerStore getdLedgerStore() { diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndReadTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndReadTest.java index dde14cec..56dd0755 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndReadTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/AppendAndReadTest.java @@ -19,7 +19,6 @@ import io.openmessaging.storage.dledger.client.DLedgerClient; import io.openmessaging.storage.dledger.protocol.AppendEntryResponse; import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode; -import io.openmessaging.storage.dledger.protocol.RequestOrResponse; import io.openmessaging.storage.dledger.statemachine.register.RegisterReadProcessor; import io.openmessaging.storage.dledger.statemachine.register.RegisterReadRequest; import io.openmessaging.storage.dledger.statemachine.register.RegisterReadResponse; @@ -38,7 +37,7 @@ public void testSingleServerInMemory() throws Exception { String group = UUID.randomUUID().toString(); String selfId = "n0"; String peers = "n0-localhost:11001"; - DLedgerServer dLedgerServer = launchServerWithStateMachine(group, peers, selfId, selfId, DLedgerConfig.MEMORY, + DLedgerServer dLedgerServer = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, selfId, DLedgerConfig.MEMORY, 100000, 102400, new RegisterStateMachine()); dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer))); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java index 2cf531ac..bf6c600a 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/ServerTestHarness.java @@ -36,7 +36,8 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St return dLedgerServer; } - protected synchronized DLedgerServer launchServer(String group, String peers, String selfId, String preferredLeaderId) { + protected synchronized DLedgerServer launchServer(String group, String peers, String selfId, + String preferredLeaderId) { DLedgerConfig config = new DLedgerConfig(); config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); config.group(group).selfId(selfId).peers(peers); @@ -73,12 +74,27 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St return dLedgerServer; } - protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers, String selfId, String leaderId, - String storeType, int snapshotThreshold, int mappedFileSizeForEntryData, StateMachine stateMachine) { + protected DLedgerServer launchServerWithStateMachineDisableSnapshot(String group, String peers, + String selfIf, String leaderId, String storeType, int mappedFileSizeForEntryData, StateMachine stateMachine) { + return this.launchServerWithStateMachine(group, peers, selfIf, leaderId, storeType, false, 0, + mappedFileSizeForEntryData, stateMachine); + } + + protected DLedgerServer launchServerWithStateMachineEnableSnapshot(String group, String peers, + String selfId, String leaderId, String storeType, int snapshotThreshold, int mappedFileSizeForEntryData, + StateMachine stateMachine) { + return this.launchServerWithStateMachine(group, peers, selfId, leaderId, storeType, true, snapshotThreshold, + mappedFileSizeForEntryData, stateMachine); + } + + protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers, + String selfId, String leaderId, String storeType, boolean enableSnapshot, int snapshotThreshold, int mappedFileSizeForEntryData, + StateMachine stateMachine) { DLedgerConfig config = new DLedgerConfig(); config.group(group).selfId(selfId).peers(peers); config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); config.setStoreType(storeType); + config.setEnableSnapshot(enableSnapshot); config.setSnapshotThreshold(snapshotThreshold); config.setMappedFileSizeForEntryData(mappedFileSizeForEntryData); config.setEnableLeaderElector(false); @@ -100,8 +116,8 @@ protected synchronized DLedgerServer launchServerWithStateMachine(String group, return dLedgerServer; } - protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId, String leaderId, - String storeType) { + protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId, + String leaderId, String storeType) { DLedgerConfig config = new DLedgerConfig(); config.group(group).selfId(selfId).peers(peers); config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java index 8527cd25..bbb628db 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/snapshot/SnapshotManagerTest.java @@ -29,9 +29,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { // Launch server String group = UUID.randomUUID().toString(); String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort()); - DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); - DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); - DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); final List serverList = new ArrayList() { { add(dLedgerServer0); @@ -77,9 +77,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException { dLedgerServer2.shutdown(); serverList.clear(); // Restart server and apply snapshot - DLedgerServer newDLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); - DLedgerServer newDLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); - DLedgerServer newDLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer newDLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer newDLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer newDLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); serverList.add(newDLedgerServer0); serverList.add(newDLedgerServer1); serverList.add(newDLedgerServer2); @@ -99,7 +99,7 @@ public void testSnapshotReservedNum() throws InterruptedException { String group = UUID.randomUUID().toString(); String selfId = "n0"; String peers = String.format("%s-localhost:%d", selfId, nextPort()); - DLedgerServer server = launchServerWithStateMachine(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); + DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine()); DLedgerClient dLedgerClient = launchClient(group, peers); for (int i = 0; i < 120; i++) { @@ -139,7 +139,7 @@ public void testLoadErrorSnapshot() throws Exception { IOUtils.string2File(JSON.toJSONString(snapshotMeta), snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE); IOUtils.string2File("80", snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE); - DLedgerServer server = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine()); + DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine()); Thread.sleep(1000); StateMachineCaller caller = server.getFsmCaller(); diff --git a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java index 470d79c5..2f8f07b9 100644 --- a/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java +++ b/dledger/src/test/java/io/openmessaging/storage/dledger/statemachine/StateMachineCallerTest.java @@ -57,7 +57,7 @@ public void testOnCommittedAndOnSnapshotSave() throws Exception { String leaderId = "n0"; String peers = String.format("%s-localhost:%d", selfId, nextPort()); - final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId); + final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId); final Pair result = mockCaller(dLedgerServer); updateFileStore((DLedgerMmapFileStore) dLedgerServer.getDLedgerStore(), 10); final StateMachineCaller caller = result.getKey(); @@ -88,7 +88,7 @@ public void testOnSnapshotLoad() throws Exception { String leaderId = "n0"; String peers = String.format("%s-localhost:%d", selfId, nextPort()); - final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId); + final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId); final Pair result = mockCaller(dLedgerServer); final StateMachineCaller caller = result.getKey(); final MockStateMachine fsm = result.getValue(); @@ -124,7 +124,7 @@ public void doCallBack(SnapshotStatus status) { caller.shutdown(); } - private DLedgerServer createDLedgerServer(String group, String peers, String selfId, String leaderId) { + private DLedgerServer createDLedgerServerInStateMachineMode(String group, String peers, String selfId, String leaderId) { this.config = new DLedgerConfig(); this.config.group(group).selfId(selfId).peers(peers); this.config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group); @@ -134,6 +134,7 @@ private DLedgerServer createDLedgerServer(String group, String peers, String sel this.config.setEnableLeaderElector(false); this.config.setEnableDiskForceClean(false); this.config.setDiskSpaceRatioToForceClean(0.90f); + this.config.setEnableSnapshot(true); DLedgerServer dLedgerServer = new DLedgerServer(this.config); MemberState memberState = dLedgerServer.getMemberState(); memberState.setCurrTermForTest(0); @@ -176,9 +177,9 @@ private void updateFileStore(DLedgerMmapFileStore fileStore, int entryNum) { public void testServerWithStateMachine() throws InterruptedException { String group = UUID.randomUUID().toString(); String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort()); - DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); - DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); - DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); + DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); + DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); + DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine()); final List serverList = new ArrayList() { { add(dLedgerServer0);