Skip to content

3: 详细设计

Colin edited this page Mar 30, 2023 · 10 revisions

ERaftKV RPC

注意:设计文档里面代码都是伪代码

消息定义

  • 投票请求
message RequestVoteReq 
{
    bool prevote;

    int64 term;

    int64 candidtate_id;

    int64 last_log_idx;

    int64 last_log_term;
}
  • 投票响应
message RequestVoteResp
{
    bool prevote;

    int64 request_term;

    int64 term;

    bool vote_granted;
}
  • 日志项
enum EntryType {
  Normal = 0;
  JoinNode = 1;
  LeaveNode = 2;
  JoinGroup = 3;
  LeaveGroup = 4;
  NoOp = 5;
}

message Entry {
  int64     term = 1;
  int64     id = 2;
  EntryType e_type = 3;
  int64     data_size = 4;
  bytes     data = 5;
}
  • 追加日志请求
message AppendEntriesReq {
  int64          leader_id = 1;
  int64          message_index = 2;
  int64          term = 3;
  int64          prev_log_index = 4;
  int64          prev_log_term = 5;
  int64          leader_commit = 6;
  int64          entries_size = 7;
  repeated Entry entries = 8;
}
  • 追加日志的响应
message AppendEntriesResp {
  string message_token = 1;
  int64  term = 2;
  bool   success = 3;
  int64  current_index = 4;
}
  • 发送快照的请求
message SnapshotBlock {
  int64 offset = 1;
  bytes data = 2;
  int64 block_size = 3;
  bool  is_last_chunk = 4;
}

message SnapshotReq {
  int64         term = 1;
  int64         leader_id = 2;
  string        message_index = 3;
  int64         snapshot_index = 4;
  int64         snapshot_term = 5;
  SnapshotBlock block = 6;
}
  • 发送快照的响应

message SnapshotResp { int64 term = 1; string message_index = 2; int64 offset = 3; bool success = 4; bool is_last_chunk = 5; }


* KeyRange 定义

enum KeyRangeStatus { Running = 0; Migrating = 1; Importing = 2; Init = 3; }

message KeyRange { KeyRangeStatus key_range_status = 1; int64 shard_id = 2; int64 status_modify_time = 3; string start = 4; string end = 5; }


* 服务定义

enum ServerStatus { Up = 0; Down = 1; }

message Server { string id = 1; string address = 2; ServerStatus server_status = 3; }


* 集群分片定义

message ShardGroup { int64 id = 1; KeyRange key_range = 2; repeated Server servers = 3; }


* 增删节点到分片的请求和响应

enum ClusterConfigChangeType { AddServer = 0; RemoveServer = 1; }

message ClusterConfigChangeReq { ClusterConfigChangeType change_type = 1; int64 shard_id = 2; Server server = 3; int64 config_version = 4; }

message ClusterConfigChangeResp { bool success = 1; repeated ShardGroup shard_group = 2; int64 config_version = 3; }


* 客户数据读写请求

enum ClientOpType { Put = 0; Get = 1; Del = 2; Scan = 3; }

message KvOpPair { ClientOpType op_type = 1; string key = 2; string value = 3; uint64 cursor = 4; }

message ClientOperationReq { uint64 op_timestamp = 1; repeated KvOpPair kvs = 2; }

message ClientOperationResp { bool success = 1; repeated KvOpPair ops = 2; }


## 服务定义

service ERaftKv { rpc RequestVote(RequestVoteReq) returns (RequestVoteResp); rpc AppendEntries(AppendEntriesReq) returns (RequestVoteResp); rpc Snapshot(SnapshotReq) returns (SnapshotResp);

rpc ProcessRWOperation(ClientOperationReq) returns (ClientOperationResp); rpc ClusterConfigChange(ClusterConfigChangeReq) returns (ClusterConfigChangeResp); }


#  ERaftKV Raft 服务层数据结构以及操作接口

* raft_server 核心

enum RaftStateEnum { Follower, PreCandidate, Candidate, Leader };

/**

  • @brief

/ class Network { public: /*

  • @brief Destroy the Network object

*/ virtual ~Network() {}

/**

  • @brief
  • @param raft
  • @param target_node
  • @param req
  • @return EStatus / virtual EStatus SendRequestVote(RaftServer raft, RaftNode* target_node, eraftkv::RequestVoteReq* req) = 0;

/**

  • @brief
  • @param raft
  • @param target_node
  • @param req
  • @return EStatus / virtual EStatus SendAppendEntries(RaftServer raft, RaftNode* target_node, eraftkv::AppendEntriesReq* req) = 0;

/**

  • @brief
  • @param raft
  • @param target_node
  • @param req
  • @return EStatus / virtual EStatus SendSnapshot(RaftServer raft, RaftNode* target_node, eraftkv::SnapshotReq* req) = 0; }

/**

  • @brief

/ class Event { public: /*

  • @brief Destroy the Event object

*/ virtual ~Event() {}

/**

  • @brief
  • @param raft
  • @param state / virtual void RaftStateChangeEvent(RaftServer raft, RaftStateEnum* state) = 0;

/**

  • @brief
  • @param raft
  • @param node
  • @param ety / virtual void RaftGroupMembershipChangeEvent(RaftServer raft, RaftNode* node, eraftkv::Entry* ety) = 0; }

/**

  • @brief

*/ class Storage {

public: /**

  • @brief Destroy the Storage object

*/ virtual ~Storage() {}

/**

  • @brief Get the Node Address object
  • @param raft
  • @param id
  • @return std::string / virtual std::string GetNodeAddress(RaftServer raft, std::string id) = 0;

/**

  • @brief
  • @param raft
  • @param id
  • @param address
  • @return EStatus / virtual EStatus SaveNodeAddress(RaftServer raft, std::string id, std::string address) = 0;

/**

  • @brief
  • @param raft
  • @param snapshot_index
  • @param snapshot_term
  • @return EStatus / virtual EStatus ApplyLog(RaftServer raft, int64_t snapshot_index, int64_t snapshot_term) = 0;

/**

  • @brief Get the Snapshot Block object
  • @param raft
  • @param node
  • @param offset
  • @param block
  • @return EStatus / virtual EStatus GetSnapshotBlock(RaftServer raft, RaftNode* node, int64_t offset, eraftkv::SnapshotBlock* block) = 0;

/**

  • @brief
  • @param raft
  • @param snapshot_index
  • @param offset
  • @param block
  • @return EStatus / virtual EStatus StoreSnapshotBlock(RaftServer raft, int64_t snapshot_index, int64_t offset, eraftkv::SnapshotBlock* block) = 0;

/**

  • @brief
  • @param raft
  • @return EStatus / virtual EStatus ClearSnapshot(RaftServer raft) = 0;

/**

  • @brief
  • @return EStatus */ virtual EStatus CreateDBSnapshot() = 0;

/**

  • @brief
  • @param raft
  • @param term
  • @param vote
  • @return EStatus / virtual EStatus SaveRaftMeta(RaftServer raft, int64_t term, int64_t vote) = 0;

/**

  • @brief
  • @param raft
  • @param term
  • @param vote
  • @return EStatus / virtual EStatus ReadRaftMeta(RaftServer raft, int64_t* term, int64_t* vote) = 0; };

/**

  • @brief

/ class LogStore { public: /*

  • @brief Destroy the Log Store object

*/ virtual ~LogStore() {}

/**

  • @brief

*/ virtual void Init() = 0;

/**

  • @brief

*/ virtual void Free() = 0;

/**

  • @brief Append add new entries
  • @param ety
  • @return EStatus / virtual EStatus Append(eraftkv::Entry ety) = 0;

/**

  • @brief EraseBefore erase all entries before the given index
  • @param first_index
  • @return EStatus */ virtual EStatus EraseBefore(int64_t first_index) = 0;

/**

  • @brief EraseAfter erase all entries after the given index
  • @param from_index
  • @return EStatus */ virtual EStatus EraseAfter(int64_t from_index) = 0;

/**

  • @brief Get get the given index entry
  • @param index
  • @return eraftkv::Entry* / virtual eraftkv::Entry Get(int64_t index) = 0;

/**

  • @brief Gets get the given index range entry
  • @param start_index
  • @param end_index
  • @return std::vectoreraftkv::Entry* */ virtual std::vectoreraftkv::Entry* Gets(int64_t start_index, int64_t end_index) = 0;

/**

  • @brief FirstIndex get the first index in the entry
  • @return int64_t */ virtual int64_t FirstIndex() = 0;

/**

  • @brief LastIndex get the last index in the entry
  • @return int64_t */ virtual int64_t LastIndex() = 0;

/**

  • @brief LogCount get the number of entries
  • @return int64_t */ virtual int64_t LogCount() = 0; }

/**

  • @brief

*/ class RaftServer {

public: /**

  • @brief Construct a new Raft Server object
  • @param raft_config / RaftServer(RaftConfig raft_config) {}

/**

  • @brief Get the Entries To Be Send object
  • @param node
  • @param index
  • @param count
  • @return std::vectoreraftkv::Entry* / std::vectoreraftkv::Entry* GetEntriesToBeSend(RaftNode node, int64_t index, int64_t count) {}

/**

  • @brief
  • @param term
  • @param vote
  • @return EStatus */ EStatus SaveMetaData(int64 term, int64 vote) {}

/**

  • @brief
  • @return EStatus */ EStatus ReadMetaData() {}

/**

  • @brief
  • @param id
  • @param is_self
  • @return RaftNode* / RaftNode JoinNode(int64 id, bool is_self) {}

/**

  • @brief
  • @param node
  • @return EStatus / EStatus RemoveNode(RaftNode node) {}

/**

  • @brief raft core cycle
  • @return EStatus */ EStatus RunCycle() { // 1. only one node, to become the leader // call BecomeLeader
// 2. is leader, send append entries to other node
// call net_-> SendAppendEntries

// 3. if not leader, inter election
// call ElectionStart

}

/**

  • @brief
  • @return EStatus */ EStatus ApplyEntries() {}

/**

  • @brief
  • @param from_node
  • @param req
  • @param resp
  • @return EStatus / EStatus HandleRequestVoteReq(RaftNode from_node, eraftkv::RequestVoteReq* req, eraftkv::RequestVoteResp* resp) { // 1. deal request vote with raft paper description. }

/**

  • @brief
  • @param from_node
  • @param resp
  • @return EStatus / EStatus HandleRequestVoteResp(RaftNode from_node, eraftkv::RequestVoteResp* resp) { // 1.if resp term > this node, call become follower.
// 2.get majority vote, become candidate or leader.

}

/**

  • @brief
  • @param from_node
  • @param req
  • @param resp
  • @return EStatus / EStatus HandleAppendEntriesReq(RaftNode from_node, eraftkv::AppendEntriesReq* req, eraftkv::AppendEntriesResp* resp) { // 1. deal append entries req with raft paper description. }

/**

  • @brief
  • @param from_node
  • @param resp
  • @return EStatus / EStatus HandleAppendEntriesResp(RaftNode from_node, eraftkv::AppendEntriesResp* resp) { // 1.deal append entries resp with raft paper description. }

/**

  • @brief
  • @param from_node
  • @param req
  • @param resp
  • @return EStatus / EStatus HandleSnapshotReq(RaftNode from_node, eraftkv::SnapshotReq* req, eraftkv::SnapshotResp* resp) { // 1. deal snapshot req with raft paper description. }

/**

  • @brief
  • @param from_node
  • @param resp
  • @return EStatus / EStatus HandleSnapshotResp(RaftNode from_node, eraftkv::SnapshotResp* resp) { // 1. deal snapshot resp with raft paper description. }

/**

  • @brief
  • @param from_node
  • @param ety
  • @param ety_index
  • @return EStatus / EStatus HandleApplyConfigChange(RaftNode from_node, eraftkv::Entry* ety, int64_t ety_index) {}

/**

  • @brief
  • @param ety
  • @return EStatus / EStatus ProposeEntry(eraftkv::Entry ety) { // 1.append entry to log }

/**

  • @brief
  • @return EStatus */ EStatus BecomeLeader() { // 1.call net_->SendAppendEntries() }

/**

  • @brief
  • @return EStatus */ EStatus BecomeFollower() { // 1.reset election time out }

/**

  • @brief
  • @return EStatus */ EStatus BecomeCandidate() { // 1. set status canditate
// 2. incr current node term + 1
// call net_->SendRequestVote();

}

/**

  • @brief
  • @return EStatus */ EStatus BecomePreCandidate() { // 1. set status pre canditate
// 2. set request vote without current node term + 1
// call net_->SendRequestVote();

}

/**

  • @brief
  • @param is_prevote
  • @return EStatus */ EStatus ElectionStart(bool is_prevote) { // 1. set random election timeout
// 2. if is_prevote = true, BecomePreCandidate else BecomeCandidate

}

/**

  • @brief
  • @return EStatus */ EStatus BeginSnapshot() { // 1. apply all commited log // 2. set up snapshot status }

/**

  • @brief
  • @return EStatus */ EStatus EndSnapshot() { // 1. call net_->SendSnapshot() }

/**

  • @brief
  • @return true
  • @return false */ bool SnapshotRunning() {}

/**

  • @brief Get the Last Applied Entry object
  • @return Entry* / Entry GetLastAppliedEntry() {}

/**

  • @brief Get the First Entry Idx object
  • @return int64 */ int64 GetFirstEntryIdx() {}

/**

  • @brief
  • @return EStatus */ EStatus RestoreSnapshotAfterRestart() {}

/**

  • @brief
  • @param last_included_term
  • @param last_included_index
  • @return EStatus */ EStatus BeginLoadSnapshot(int64 last_included_term, int64 last_included_index) {}

/**

  • @brief
  • @return EStatus */ EStatus EndLoadSnapshot() {}

/**

  • @brief
  • @return EStatus */ EStatus ProposeReadReq() {}

/**

  • @brief Get the Logs Count Can Snapshot object
  • @return int64 */ int64 GetLogsCountCanSnapshot() {}

/**

  • @brief
  • @return EStatus */ EStatus RestoreLog() {}

private: /**

  • @brief

*/ std::string id_;

/**

  • @brief

/ int64_t current_term_; /*

  • @brief

/ int64_t voted_for_; /*

  • @brief

/ int64_t commit_idx_; /*

  • @brief

/ int64_t last_applied_idx_; /*

  • @brief

/ int64_t last_applied_term_; /*

  • @brief

/ RaftStateEnum state_; /*

  • @brief

/ int64_t leader_id_; /*

  • @brief

/ int64_t message_index_; /*

  • @brief

/ int64_t last_acked_message_index_; /*

  • @brief

/ int64_t node_count_; /*

  • @brief

/ std::vector nodes_; /*

  • @brief

/ RaftConfig config_; /*

  • @brief

/ Network net_; /**

  • @brief

/ Storage store_; /**

  • @brief

/ LogStore log_store_; };


#  ERaftKV KvServer 层数据结构以及操作接口

/**

  • @brief

*/ struct ERaftKvServerOptions { std::string svr_version; std::string svr_addr; std::string kv_db_path; std::string log_db_path;

int64_t tick_interval; int64_t request_timeout; int64_t election_timeout;

int64_t response_timeout;

int64_t ae_max_count; int64_t ae_max_size;

int64_t snap_max_count; int64_t snap_max_size;

int64_t grpc_max_recv_msg_size; int64_t grpc_max_send_msg_size; }

class ERaftKvServer : public grpc::EraftKv::Service {

/**

  • @brief Construct a new ERaftKvServer object
  • @param config */ ERaftKvServer(ERaftKvServerOptions config) { // init raft lib RaftConfig raft_config; raft_config.net_impl = new GRpcNetworkImpl(); raft_config.store_impl = new RocksDBStorageImpl(); raft_config.log_impl = new RocksDBLogStorageImpl(); raft_context_ = new RaftServer(raft_config); };

/**

  • @brief
  • @param req
  • @param resp
  • @return grpc::Status / grpc::Status RequestVote(RequestVoteReq req, RequestVoteResp* resp){ // // call raft_context_->HandleRequestVoteReq() // };

/**

  • @brief
  • @param req
  • @param resp
  • @return grpc::Status / grpc::Status AppendEntries(AppendEntriesReq req, RequestVoteResp* resp){ // 1.call raft_context_->HandleAppendEntriesReq() };

/**

  • @brief
  • @param req
  • @param resp
  • @return grpc::Status / grpc::Status Snapshot(SnapshotReq req, SnapshotResp* resp){ // raftcore // 1.call raft_context_->HandleSnapshotReq(); };

/**

  • @brief
  • @param req
  • @param resp
  • @return grpc::Status / grpc::Status ProcessRWOperation(ClientOperationReq req, ClientOperationResp* resp){ // 1. req into log entry // 2. call raft_context_->ProposeEntry() // 3. wait commit };

/**

  • @brief
  • @return grpc::Status */ grpc::Status ClusterConfigChange(ClusterConfigChangeReq, ClusterConfigChangeResp) { return EStatus::NotSupport(); }

/**

  • @brief
  • @param interval
  • @return EStatus */ EStatus InitTicker(int interval){ // 1.set up raft_context_->RunCycle() run interval with periodic_caller_ };

/**

  • @brief
  • @return EStatus */ EStatus BuildAndRunRpcServer() { // set up rpc ERaftKvServer service; grpc::EnableDefaultHealthCheckService(true); grpc::ServerBuilder builder; builder.AddListeningPort(this->options_.svr_addr, grpc::InsecureServerCredentials()); builder.RegisterService(&service); std::unique_ptrgrpc::Server server(builder.BuildAndStart()); server->Wait(); };

private: /**

  • @brief

/ RaftServer raft_context_;

/**

  • @brief

/ PeriodicCaller periodic_caller_;

/**

  • @brief

*/ ERaftKvServerOptions options_; };

Clone this wiki locally