Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ISSUE #290]: add switch to enable/disable snapshot feature #291

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ public class DLedgerConfig {

private long leadershipTransferWaitTimeout = 1000;

private boolean enableSnapshot = false;

private int snapshotThreshold = 1000;
private int maxSnapshotReservedNum = 3;

Expand Down Expand Up @@ -485,4 +487,12 @@ public int getMaxSnapshotReservedNum() {
public void setMaxSnapshotReservedNum(int maxSnapshotReservedNum) {
this.maxSnapshotReservedNum = maxSnapshotReservedNum;
}

public boolean isEnableSnapshot() {
return enableSnapshot;
}

public void setEnableSnapshot(boolean enableSnapshot) {
this.enableSnapshot = enableSnapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public synchronized void startup() {
this.fsmCaller.ifPresent(x -> {
// Start state machine caller and load existing snapshots for data recovery
x.start();
x.getSnapshotManager().loadSnapshot();
Optional.ofNullable(x.getSnapshotManager()).ifPresent(sm -> sm.loadSnapshot());
});
if (RpcServiceMode.EXCLUSIVE.equals(this.rpcServiceMode)) {
this.dLedgerRpcService.startup();
Expand Down Expand Up @@ -191,7 +191,9 @@ public synchronized void registerStateMachine(final StateMachine fsm) {
throw new IllegalStateException("can not register statemachine after dledger server starts");
}
final StateMachineCaller fsmCaller = new StateMachineCaller(this.dLedgerStore, fsm, this.dLedgerEntryPusher);
fsmCaller.registerSnapshotManager(new SnapshotManager(this));
if (this.dLedgerConfig.isEnableSnapshot()) {
fsmCaller.registerSnapshotManager(new SnapshotManager(this));
}
this.fsmCaller = Optional.of(fsmCaller);
// Register state machine caller to entry pusher
this.dLedgerEntryPusher.registerStateMachine(this.fsmCaller);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.openmessaging.storage.dledger.store.DLedgerStore;

import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -87,7 +88,7 @@ public Thread newThread(Runnable r) {
});
private final Function<Long, Boolean> completeEntryCallback;
private volatile DLedgerException error;
private SnapshotManager snapshotManager;
private Optional<SnapshotManager> snapshotManager;

public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine statemachine,
final DLedgerEntryPusher entryPusher) {
Expand All @@ -103,6 +104,7 @@ public StateMachineCaller(final DLedgerStore dLedgerStore, final StateMachine st
} else {
this.completeEntryCallback = index -> true;
}
this.snapshotManager = Optional.empty();
}

private boolean enqueueTask(final ApplyTask task) {
Expand Down Expand Up @@ -170,7 +172,7 @@ private void doCommitted(final long committedIndex) {
if (this.error != null) {
return;
}
if (this.snapshotManager.isLoadingSnapshot() || this.snapshotManager.isSavingSnapshot()) {
if (this.snapshotManager.isPresent() && (this.snapshotManager.get().isLoadingSnapshot() || this.snapshotManager.get().isSavingSnapshot())) {
this.scheduledExecutorService.schedule(() -> {
try {
onCommitted(committedIndex);
Expand All @@ -196,7 +198,7 @@ private void doCommitted(final long committedIndex) {
this.lastAppliedTerm = dLedgerEntry.getTerm();
}
// Take snapshot
snapshotManager.saveSnapshot(dLedgerEntry);
snapshotManager.ifPresent(x -> x.saveSnapshot(dLedgerEntry));
// Check response timeout.
if (iter.getCompleteAckNums() == 0) {
if (this.entryPusher != null) {
Expand Down Expand Up @@ -300,11 +302,11 @@ public long getLastAppliedTerm() {
}

public void registerSnapshotManager(SnapshotManager snapshotManager) {
this.snapshotManager = snapshotManager;
this.snapshotManager = Optional.of(snapshotManager);
}

public SnapshotManager getSnapshotManager() {
return this.snapshotManager;
return this.snapshotManager.orElse(null);
}

public DLedgerStore getdLedgerStore() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import io.openmessaging.storage.dledger.client.DLedgerClient;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
import io.openmessaging.storage.dledger.protocol.RequestOrResponse;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadProcessor;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadRequest;
import io.openmessaging.storage.dledger.statemachine.register.RegisterReadResponse;
Expand All @@ -38,7 +37,7 @@ public void testSingleServerInMemory() throws Exception {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = "n0-localhost:11001";
DLedgerServer dLedgerServer = launchServerWithStateMachine(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
DLedgerServer dLedgerServer = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, selfId, DLedgerConfig.MEMORY,
100000, 102400, new RegisterStateMachine());
dLedgerServer.registerUserDefineProcessors(Collections.singletonList(new RegisterReadProcessor(dLedgerServer)));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
return dLedgerServer;
}

protected synchronized DLedgerServer launchServer(String group, String peers, String selfId, String preferredLeaderId) {
protected synchronized DLedgerServer launchServer(String group, String peers, String selfId,
String preferredLeaderId) {
DLedgerConfig config = new DLedgerConfig();
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
config.group(group).selfId(selfId).peers(peers);
Expand Down Expand Up @@ -73,12 +74,27 @@ protected synchronized DLedgerServer launchServer(String group, String peers, St
return dLedgerServer;
}

protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers, String selfId, String leaderId,
String storeType, int snapshotThreshold, int mappedFileSizeForEntryData, StateMachine stateMachine) {
protected DLedgerServer launchServerWithStateMachineDisableSnapshot(String group, String peers,
String selfIf, String leaderId, String storeType, int mappedFileSizeForEntryData, StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfIf, leaderId, storeType, false, 0,
mappedFileSizeForEntryData, stateMachine);
}

protected DLedgerServer launchServerWithStateMachineEnableSnapshot(String group, String peers,
String selfId, String leaderId, String storeType, int snapshotThreshold, int mappedFileSizeForEntryData,
StateMachine stateMachine) {
return this.launchServerWithStateMachine(group, peers, selfId, leaderId, storeType, true, snapshotThreshold,
mappedFileSizeForEntryData, stateMachine);
}

protected synchronized DLedgerServer launchServerWithStateMachine(String group, String peers,
String selfId, String leaderId, String storeType, boolean enableSnapshot, int snapshotThreshold, int mappedFileSizeForEntryData,
StateMachine stateMachine) {
DLedgerConfig config = new DLedgerConfig();
config.group(group).selfId(selfId).peers(peers);
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
config.setStoreType(storeType);
config.setEnableSnapshot(enableSnapshot);
config.setSnapshotThreshold(snapshotThreshold);
config.setMappedFileSizeForEntryData(mappedFileSizeForEntryData);
config.setEnableLeaderElector(false);
Expand All @@ -100,8 +116,8 @@ protected synchronized DLedgerServer launchServerWithStateMachine(String group,
return dLedgerServer;
}

protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId, String leaderId,
String storeType) {
protected synchronized DLedgerServer launchServerEnableBatchPush(String group, String peers, String selfId,
String leaderId, String storeType) {
DLedgerConfig config = new DLedgerConfig();
config.group(group).selfId(selfId).peers(peers);
config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
// Launch server
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
final List<DLedgerServer> serverList = new ArrayList<DLedgerServer>() {
{
add(dLedgerServer0);
Expand Down Expand Up @@ -77,9 +77,9 @@ public void testSaveAndLoadSnapshot() throws InterruptedException {
dLedgerServer2.shutdown();
serverList.clear();
// Restart server and apply snapshot
DLedgerServer newDLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer newDLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
serverList.add(newDLedgerServer0);
serverList.add(newDLedgerServer1);
serverList.add(newDLedgerServer2);
Expand All @@ -99,7 +99,7 @@ public void testSnapshotReservedNum() throws InterruptedException {
String group = UUID.randomUUID().toString();
String selfId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());
DLedgerServer server = launchServerWithStateMachine(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());
DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, selfId, "n0", DLedgerConfig.FILE, 10, 1024, new MockStateMachine());

DLedgerClient dLedgerClient = launchClient(group, peers);
for (int i = 0; i < 120; i++) {
Expand Down Expand Up @@ -139,7 +139,7 @@ public void testLoadErrorSnapshot() throws Exception {
IOUtils.string2File(JSON.toJSONString(snapshotMeta), snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_META_FILE);
IOUtils.string2File("80", snapshotStoreBasePath + File.separator + SnapshotManager.SNAPSHOT_DATA_FILE);

DLedgerServer server = launchServerWithStateMachine(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer server = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n0", DLedgerConfig.FILE, 10, 10 * 1024 * 1024, new MockStateMachine());
Thread.sleep(1000);

StateMachineCaller caller = server.getFsmCaller();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void testOnCommittedAndOnSnapshotSave() throws Exception {
String leaderId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());

final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId);
final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId);
final Pair<StateMachineCaller, MockStateMachine> result = mockCaller(dLedgerServer);
updateFileStore((DLedgerMmapFileStore) dLedgerServer.getDLedgerStore(), 10);
final StateMachineCaller caller = result.getKey();
Expand Down Expand Up @@ -88,7 +88,7 @@ public void testOnSnapshotLoad() throws Exception {
String leaderId = "n0";
String peers = String.format("%s-localhost:%d", selfId, nextPort());

final DLedgerServer dLedgerServer = createDLedgerServer(group, peers, selfId, leaderId);
final DLedgerServer dLedgerServer = createDLedgerServerInStateMachineMode(group, peers, selfId, leaderId);
final Pair<StateMachineCaller, MockStateMachine> result = mockCaller(dLedgerServer);
final StateMachineCaller caller = result.getKey();
final MockStateMachine fsm = result.getValue();
Expand Down Expand Up @@ -124,7 +124,7 @@ public void doCallBack(SnapshotStatus status) {
caller.shutdown();
}

private DLedgerServer createDLedgerServer(String group, String peers, String selfId, String leaderId) {
private DLedgerServer createDLedgerServerInStateMachineMode(String group, String peers, String selfId, String leaderId) {
this.config = new DLedgerConfig();
this.config.group(group).selfId(selfId).peers(peers);
this.config.setStoreBaseDir(FileTestUtil.TEST_BASE + File.separator + group);
Expand All @@ -134,6 +134,7 @@ private DLedgerServer createDLedgerServer(String group, String peers, String sel
this.config.setEnableLeaderElector(false);
this.config.setEnableDiskForceClean(false);
this.config.setDiskSpaceRatioToForceClean(0.90f);
this.config.setEnableSnapshot(true);
DLedgerServer dLedgerServer = new DLedgerServer(this.config);
MemberState memberState = dLedgerServer.getMemberState();
memberState.setCurrTermForTest(0);
Expand Down Expand Up @@ -176,9 +177,9 @@ private void updateFileStore(DLedgerMmapFileStore fileStore, int entryNum) {
public void testServerWithStateMachine() throws InterruptedException {
String group = UUID.randomUUID().toString();
String peers = String.format("n0-localhost:%d;n1-localhost:%d;n2-localhost:%d", nextPort(), nextPort(), nextPort());
DLedgerServer dLedgerServer0 = launchServerWithStateMachine(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachine(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachine(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer0 = launchServerWithStateMachineEnableSnapshot(group, peers, "n0", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer1 = launchServerWithStateMachineEnableSnapshot(group, peers, "n1", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
DLedgerServer dLedgerServer2 = launchServerWithStateMachineEnableSnapshot(group, peers, "n2", "n1", DLedgerConfig.FILE, 0, 10 * 1024 * 1024, new MockStateMachine());
final List<DLedgerServer> serverList = new ArrayList<DLedgerServer>() {
{
add(dLedgerServer0);
Expand Down