diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java index 79185991..73b3ce09 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerRpcNettyService.java @@ -26,6 +26,8 @@ import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; import io.openmessaging.storage.dledger.protocol.HeartBeatRequest; import io.openmessaging.storage.dledger.protocol.HeartBeatResponse; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; import io.openmessaging.storage.dledger.protocol.MetadataRequest; @@ -73,7 +75,7 @@ public class DLedgerRpcNettyService extends DLedgerRpcService { private AbstractDLedgerServer dLedger; - private final ConcurrentHashMap> userDefineProcessors = new ConcurrentHashMap>(); + private final ConcurrentHashMap> userDefineProcessors = new ConcurrentHashMap>(); private final ExecutorService futureExecutor = Executors.newFixedThreadPool(4, new NamedThreadFactory("FutureExecutor")); @@ -118,7 +120,7 @@ public boolean rejectRequest() { } @Override - public void registerUserDefineProcessor(UserDefineProcessor userDefineProcessor) { + public void registerUserDefineProcessor(UserDefineProcessor userDefineProcessor) { this.userDefineProcessors.put(userDefineProcessor.getRequestTypeCode(), userDefineProcessor); } @@ -132,6 +134,7 @@ private void registerProcessor(NettyRemotingServer remotingServer, NettyRequestP remotingServer.registerProcessor(DLedgerRequestCode.HEART_BEAT.getCode(), protocolProcessor, null); remotingServer.registerProcessor(DLedgerRequestCode.LEADERSHIP_TRANSFER.getCode(), protocolProcessor, null); remotingServer.registerProcessor(DLedgerRequestCode.USER_DEFINE_REQUEST.getCode(), protocolProcessor, null); + remotingServer.registerProcessor(DLedgerRequestCode.INSTALL_SNAPSHOT.getCode(), protocolProcessor, null); } private NettyRemotingServer registerRemotingServer(NettyServerConfig nettyServerConfig, ChannelEventListener channelEventListener, NettyRequestProcessor protocolProcessor) { @@ -277,6 +280,11 @@ public CompletableFuture push(PushEntryRequest request) { return future; } + @Override + public CompletableFuture installSnapshot(InstallSnapshotRequest request) throws Exception { + return null; + } + @Override public CompletableFuture leadershipTransfer( LeadershipTransferRequest request) { @@ -395,6 +403,14 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand }, futureExecutor); break; } + case INSTALL_SNAPSHOT: { + InstallSnapshotRequest installSnapshotRequest = JSON.parseObject(request.getBody(), InstallSnapshotRequest.class); + CompletableFuture future = handleInstallSnapshot(installSnapshotRequest); + future.whenCompleteAsync((x, y) -> { + writeResponse(x, y, request, ctx); + }, futureExecutor); + break; + } case USER_DEFINE_REQUEST: UserDefineCommandHeader header = (UserDefineCommandHeader) request.decodeCommandCustomHeader(UserDefineCommandHeader.class); UserDefineProcessor userDefineProcessor = this.userDefineProcessors.get(header.getRequestTypeCode()); @@ -453,6 +469,11 @@ public CompletableFuture handlePush(PushEntryRequest request) return this.dLedger.handlePush(request); } + @Override + public CompletableFuture handleInstallSnapshot(InstallSnapshotRequest request) throws Exception { + return this.dLedger.handleInstallSnapshot(request); + } + public RemotingCommand handleResponse(RequestOrResponse response, RemotingCommand request) { RemotingCommand remotingCommand = RemotingCommand.createResponseCommand(DLedgerResponseCode.SUCCESS.getCode(), null); remotingCommand.setBody(JSON.toJSONBytes(response)); diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java index 42bb9442..5cbd69ff 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/DLedgerServer.java @@ -26,6 +26,8 @@ import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; import io.openmessaging.storage.dledger.protocol.HeartBeatRequest; import io.openmessaging.storage.dledger.protocol.HeartBeatResponse; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; import io.openmessaging.storage.dledger.protocol.MetadataRequest; @@ -429,7 +431,11 @@ public CompletableFuture handlePush(PushEntryRequest request) response.setLeaderId(memberState.getLeaderId()); return CompletableFuture.completedFuture(response); } + } + @Override + public CompletableFuture handleInstallSnapshot(InstallSnapshotRequest request) throws Exception { + return null; } @Override diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java index 75234e6d..5260af97 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/DLedgerRequestCode.java @@ -29,8 +29,11 @@ public enum DLedgerRequestCode { PULL(51003, ""), PUSH(51004, ""), LEADERSHIP_TRANSFER(51005, ""), + + INSTALL_SNAPSHOT(51006, ""), USER_DEFINE_REQUEST(59999, ""); + private static Map codeMap = new HashMap<>(); static { diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotRequest.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotRequest.java new file mode 100644 index 00000000..5faeaf48 --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotRequest.java @@ -0,0 +1,59 @@ +/** + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.protocol; + +public class InstallSnapshotRequest extends RequestOrResponse { + + private long lastIncludedIndex; + + private long lastIncludedTerm; + + private byte[] data; + + public InstallSnapshotRequest() { + } + + public InstallSnapshotRequest(long lastIncludedIndex, long lastIncludedTerm, byte[] data) { + this.lastIncludedIndex = lastIncludedIndex; + this.lastIncludedTerm = lastIncludedTerm; + this.data = data; + } + + public long getLastIncludedIndex() { + return lastIncludedIndex; + } + + public void setLastIncludedIndex(long lastIncludedIndex) { + this.lastIncludedIndex = lastIncludedIndex; + } + + public long getLastIncludedTerm() { + return lastIncludedTerm; + } + + public void setLastIncludedTerm(long lastIncludedTerm) { + this.lastIncludedTerm = lastIncludedTerm; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotResponse.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotResponse.java new file mode 100644 index 00000000..b01e9e7c --- /dev/null +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/InstallSnapshotResponse.java @@ -0,0 +1,30 @@ +/** + * Copyright 2017-2022 The DLedger Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.openmessaging.storage.dledger.protocol; + +public class InstallSnapshotResponse extends RequestOrResponse { + + public InstallSnapshotResponse(int term) { + this.term = term; + } + + @Override + public RequestOrResponse code(int code) { + this.code = code; + return this; + } +} diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/handler/DLedgerRaftProtocolHandler.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/handler/DLedgerRaftProtocolHandler.java index 81e1a309..ee25d40d 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/handler/DLedgerRaftProtocolHandler.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/handler/DLedgerRaftProtocolHandler.java @@ -18,6 +18,8 @@ import io.openmessaging.storage.dledger.protocol.HeartBeatRequest; import io.openmessaging.storage.dledger.protocol.HeartBeatResponse; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; import io.openmessaging.storage.dledger.protocol.PullEntriesRequest; import io.openmessaging.storage.dledger.protocol.PullEntriesResponse; import io.openmessaging.storage.dledger.protocol.PushEntryRequest; @@ -37,4 +39,6 @@ public interface DLedgerRaftProtocolHandler { CompletableFuture handlePush(PushEntryRequest request) throws Exception; + CompletableFuture handleInstallSnapshot(InstallSnapshotRequest request) throws Exception; + } diff --git a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/protocol/DLedgerRaftProtocol.java b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/protocol/DLedgerRaftProtocol.java index b37b576d..52614fe7 100644 --- a/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/protocol/DLedgerRaftProtocol.java +++ b/dledger/src/main/java/io/openmessaging/storage/dledger/protocol/protocol/DLedgerRaftProtocol.java @@ -18,6 +18,8 @@ import io.openmessaging.storage.dledger.protocol.HeartBeatRequest; import io.openmessaging.storage.dledger.protocol.HeartBeatResponse; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; import io.openmessaging.storage.dledger.protocol.PullEntriesRequest; import io.openmessaging.storage.dledger.protocol.PullEntriesResponse; import io.openmessaging.storage.dledger.protocol.PushEntryRequest; @@ -37,4 +39,6 @@ public interface DLedgerRaftProtocol { CompletableFuture push(PushEntryRequest request) throws Exception; + CompletableFuture installSnapshot(InstallSnapshotRequest request) throws Exception; + } diff --git a/proxy/src/main/java/io/openmessaging/storage/dledger/proxy/DLedgerProxy.java b/proxy/src/main/java/io/openmessaging/storage/dledger/proxy/DLedgerProxy.java index b165e316..0892c01c 100644 --- a/proxy/src/main/java/io/openmessaging/storage/dledger/proxy/DLedgerProxy.java +++ b/proxy/src/main/java/io/openmessaging/storage/dledger/proxy/DLedgerProxy.java @@ -1,4 +1,4 @@ -/* +/** * Copyright 2017-2022 The DLedger Authors. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -31,6 +31,8 @@ import io.openmessaging.storage.dledger.protocol.GetEntriesResponse; import io.openmessaging.storage.dledger.protocol.HeartBeatRequest; import io.openmessaging.storage.dledger.protocol.HeartBeatResponse; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotRequest; +import io.openmessaging.storage.dledger.protocol.InstallSnapshotResponse; import io.openmessaging.storage.dledger.protocol.LeadershipTransferRequest; import io.openmessaging.storage.dledger.protocol.LeadershipTransferResponse; import io.openmessaging.storage.dledger.protocol.MetadataRequest; @@ -247,6 +249,11 @@ public CompletableFuture handlePush(PushEntryRequest request) } } + @Override + public CompletableFuture handleInstallSnapshot(InstallSnapshotRequest request) throws Exception { + return null; + } + public void startup() { this.dLedgerRpcService.startup(); this.dLedgerManager.startup();