- 前情回顾:当节点成为 Leader 时会为每个 Follower 创建
Replicator
,并通过发送空的AppendEntries
请求确认各 Follower 的nextIndex
- 客户端通过
Node::apply
接口向 Leader 提交操作日志 - Leader 向本地追加日志:
- 3.1 为日志分配
index
,并将其追加到内存存储中 - 3.2 异步将内存中的日志持久化到磁盘
- 3.3 唤醒所有
Replicator
准备向 Follower 同步日志
- 3.1 为日志分配
- Leader 的
Replicator
将内存中的日志通过AppendEntries
请求并行地复制给所有 Follower - Follower 收到
AppenEntries
请求,将日志持久化到本地后返回成功响应 - Leader 若收到大多数确定,则提交日志,更新
commitIndex
并应用日志 - Leader 调用用户状态机的
on_apply
应用日志 - 待
on_apply
返回后,更新applyIndex
,并删除内存中的日志
备注:文章的结尾有一张整体流程图
日志复制是树形复制的模型,采用本地写入和复制同时进行的方式,并且只要大多数写入成功就可以应用日志,不必等待 Leader 本地写入成功。
实现利用异步+并行,全程流水线式处理,并且全链路采用了 Batch,整体十分高效:
- Batch:全链路 Batch,包括持久化、复制以及应用
- 异步:Leader 和 Follower 的日志持久化都是异步的
- 并行:Leader 本地持久化和复制是并行的,并且开启
pipeline
后可以保证复制和复制之间也是并行的
关于性能优化的详情,可参考<4.2 复制优化>。
节点在刚成为 Leader 时会为每个 Follower 创建一个 Replicator
,其运行在单独的 bthread
上,主要有以下几个作用:
- 记录 Follower 的一些状态,如
nextIndex
、flyingAppendEntriesSize
等 - 作为 RPC Client,所有从 Leader 发往 Follower 的 RPC 请求都由它发送,包括心跳、
AppendEntriesRequest
、InstallSnapshotRequest
- 同步日志:
Replicator
会不断地向 Follower 同步日志,直到 Follower 成功复制了 Leader 的所有日志,之后将在后台等待新日志的到来
nextIndex
是 Leader 记录下一个要发往 Follower 的日志 index
,只有确认了 nextIndex
才能给 Follower 发送日志,不然不知道要给 Follower 发送哪些日志。
节点刚成为 Leader 时是不知道每个 Follower 的 nextIndex
的,需要通过发送一次或多次探测信息来确认,其探测算法如下:
- (1)
matchIndex
为最后一条 Leader 与 Follower 匹配的日志,而nextIndex=matchIndex+1
- (2) 初始化
matchIndex
为当前 Leader 的最后一条日志的index
- (3) Leader 发送探测请求,携带
matchIndex
以及matchIndex
日志对应的term
- (4) Follower 接收请求后,根据自身日志获取
matchIndex
对应的term
:- 若和请求中的
term
相等,则代表日志匹配,返回成功响应; - 若日志不存在或者
term
不匹配,则返回失败响应; - 不管成功失败,响应中都携自身的
lastLogIndex
- 若和请求中的
- (5) Leader 接收到成功响应,则表示探测成功;否则回退
matchIndex
并重复步骤 (3):- 若 Follower 的
lastLogIndex<matchIndex
,则回退matchIndex
为lastLogIndex
- 否则回退
matchIndex
为matchIndex-1
- 若 Follower 的
下图展示的是一次探测过程,图中的数字代表的是日志的
term
enum EntryType {
ENTRY_TYPE_UNKNOWN = 0;
ENTRY_TYPE_NO_OP = 1;
ENTRY_TYPE_DATA = 2;
ENTRY_TYPE_CONFIGURATION= 3;
};
message EntryMeta {
required int64 term = 1;
required EntryType type = 2;
repeated string peers = 3;
optional int64 data_len = 4;
// Don't change field id of `old_peers' in the consideration of backward
// compatibility
repeated string old_peers = 5;
};
message AppendEntriesRequest {
required string group_id = 1;
required string server_id = 2;
required string peer_id = 3;
required int64 term = 4;
required int64 prev_log_term = 5; // matchIndex
required int64 prev_log_index = 6; // matchIndex
repeated EntryMeta entries = 7;
required int64 committed_index = 8;
};
message AppendEntriesResponse {
required int64 term = 1;
required bool success = 2;
optional int64 last_log_index = 3;
optional bool readonly = 4;
};
service RaftService {
rpc append_entries(AppendEntriesRequest) returns (AppendEntriesResponse);
};
需要注意的是,探测 nextIndex
、心跳、复制日志都是用的 append_entries
方法,区别在于其请求中携带的参数不同:
用途 | entries | committed_index |
---|---|---|
探测 | 空 | 0 |
心跳 | 空 | 当前 Leader 的 commitIndex |
复制日志 | 携带日志 | 当前 Leader 的 commitIndex |
除以上 2 个参数不同外,其余的参数都是一样的
用户提交任务:
class Node {
public:
// [Thread-safe and wait-free]
// apply task to the replicated-state-machine
//
// About the ownership:
// |task.data|: for the performance consideration, we will take away the
// content. If you want keep the content, copy it before call
// this function
// |task.done|: If the data is successfully committed to the raft group. We
// will pass the ownership to StateMachine::on_apply.
// Otherwise we will specify the error and call it.
//
void apply(const Task& task);
};
应用日志:
class StateMachine {
public:
// Update the StateMachine with a batch a tasks that can be accessed
// through |iterator|.
//
// Invoked when one or more tasks that were passed to Node::apply have been
// committed to the raft group (quorum of the group peers have received
// those tasks and stored them on the backing storage).
//
// Once this function returns to the caller, we will regard all the iterated
// tasks through |iter| have been successfully applied. And if you didn't
// apply all the the given tasks, we would regard this as a critical error
// and report a error whose type is ERROR_TYPE_STATE_MACHINE.
virtual void on_apply(::braft::Iterator& iter) = 0;
};
Replicator
在创建时会通过发送空的 AppendEntries
请求来探测 Follower 的 nextIndex
,只有确定了 nextIndex
才能正式向 Follower 发送日志。具体的匹配算法我们已经在 nextIndex 中详细介绍过了。
Replicator
调用 _send_empty_entries
向 Follower 发送空的 AppendEntries
请求,并设置响应回调函数为 _on_rpc_returned
:
void Replicator::_send_empty_entries(bool is_heartbeat) {
...
// (1) 填充请求中的字段
if (_fill_common_fields(
request.get(), _next_index - 1, is_heartbeat) != 0) {
...
}
// (2) 设置响应回调函数为 _on_rpc_returned
google::protobuf::Closure* done = brpc::NewCallback(
is_heartbeat ? _on_heartbeat_returned : _on_rpc_returned, ...);
// (3) 发送空的 AppendEntries 请求
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(),
response.release(), done);
}
// 填充字段
int Replicator::_fill_common_fields(AppendEntriesRequest* request,
int64_t prev_log_index,
bool is_heartbeat) {
// 获取 matchIndex 对应的 term,我们称之为 matchTerm
const int64_t prev_log_term = _options.log_manager->get_term(prev_log_index);
...
request->set_term(_options.term); // (1) 当前 Leader 的 term
...
request->set_prev_log_index(prev_log_index); // (2) matchIndex
request->set_prev_log_term(prev_log_term); // (3) matchTerm
...
return 0;
}
Follower 收到 AppendEntries
请求后,会调用 handle_append_entries_request
处理:
void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
const AppendEntriesRequest* request,
AppendEntriesResponse* response,
google::protobuf::Closure* done,
bool from_append_entries_cache) {
...
const int64_t prev_log_index = request->prev_log_index(); // macthIndex
const int64_t prev_log_term = request->prev_log_term(); // macthTerm
const int64_t local_prev_log_term = _log_manager->get_term(prev_log_index); // 获取 macthIndex 对应的 term
// (1) 如果本地的 term 和请求的 matchTerm 不一致,则代表日志不匹配,返回失败响应
if (local_prev_log_term != prev_log_term) {
int64_t last_index = _log_manager->last_log_index();
...
response->set_success(false); // (1.1) 失败响应
response->set_term(_current_term); // (1.2) 当前 term
response->set_last_log_index(last_index); // (1.3) 携带当前节点的 lastLogIndex
...
return;
}
// (2) 本地的 term 和请求的 matchTerm 一致,则代表日志匹配,返回成功响应
if (request->entries_size() == 0) {
response->set_success(true); // (2.1) 成功响应
response->set_term(_current_term); // (2.2) 当前 term
response->set_last_log_index(_log_manager->last_log_index()); // (2.3) 携带当前节点的 lastLogIndex
...
return;
}
...
}
Leader 收到 AppendEntries
响应后,调用 on_rpc_returned
处理响应:
void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
AppendEntriesRequest* request,
AppendEntriesResponse* response,
int64_t rpc_send_time) {
...
// (1) 返回不匹配响应
// 注意:matchIndex = nextIndex - 1
// 其实代码中都是直接修改的 nextIndex,再在发送请求的时候设置 macthIndex = nextIndex - 1
// 但是为了便于理解,下面我们都将由 macthIndex 代替讲解
if (!response->success()) {
...
// (1.1) 如果 follower 的 lastLogIndex < matchIndex,则回退 matchIndex 到 lastLogIndex
if (response->last_log_index() + 1 < r->_next_index) {
...
r->_next_index = response->last_log_index() + 1;
} else { // (1.2) 否则,将 macthIndex 设置为 matchIndex - 1
// The peer contains logs from old term which should be truncated,
// decrease _last_log_at_peer by one to test the right index to keep
if (BAIDU_LIKELY(r->_next_index > 1)) {
...
--r->_next_index;
}
...
}
// (1.3) 再发送空的 `AppendEntries` 请求进行探测
r->_send_empty_entries(false);
return;
}
// (2) 返回成功响应,那么就代表探测成功,nextIndex 就是之前设置的(如 1.1),无需修改
// 这时候就调用 _send_entries 向 Follower 同步日志
r->_send_entries();
return;
}
确定 nextIndex
后,Replicator
就会调用 _send_entries
向 Follower 同步日志,直至 Follower 成功复制了 Leader 的所有日志后,将会注册一个 waiter
,在后台等待新日志的到来,其整体流程如下:
void Replicator::_send_entries() {
...
// (3) 如果已经复制了全部日志,调用 _wait_more_entries 后在后台等待
if (request->entries_size() == 0) {
...
return _wait_more_entries();
}
...
// (1) 同步日志,并设置响应的回调函数为 _on_rpc_returned
google::protobuf::Closure* done = brpc::NewCallback(
_on_rpc_returned, _id.value, cntl.get(),
request.get(), response.get(), butil::monotonic_time_ms());
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(),
response.release(), done);
...
}
void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
AppendEntriesRequest* request,
AppendEntriesResponse* response,
int64_t rpc_send_time) {
...
// (2) 成功同步一批日志,继续同步日志
r->_send_entries();
return;
}
// (4) 注册一个 `waiter`,其回调函数是 `_continue_sending`
// 当有新日志到来时,这些 waiter 将被唤醒,并调用 `_continue_sending`
void Replicator::_wait_more_entries() {
...
_wait_id = _options.log_manager->wait(
_next_index - 1, _continue_sending, (void*)_id.value);
...
}
// (5) Replicator 被唤醒,继续调用 `_send_entries` 发送日志
int Replicator::_continue_sending(void* arg, int error_code) {
...
r->_send_entries();
...
}
#include <braft/raft.h>
...
void function(op, callback) {
butil::IOBuf data;
serialize(op, &data);
braft::Task task;
// The data applied to StateMachine
task.data = &data;
// Continuation when the data is applied to StateMachine or error occurs.
task.done = make_closure(callback);
// Reject this task if expected_term doesn't match the current term of
// this Node if the value is not -1
task.expected_term = expected_term;
return _node->apply(task);
}
客户端需要将操作序列化成 IOBuf,并构建一个 Task
向 braft::Node
提交。
Node 在收到 Task
后,会将其转换成 LogEntryAndClosure
并放入 ApplyQueue
中。至此,客户端的 apply 就完成返回了。ApplyQueue
也是个串行队列,由 BRPC ExecutionQueue 实现:
void NodeImpl::apply(const Task& task) {
...
LogEntryAndClosure m;
m.entry = entry; // m.entry = task.data
m.done = task.done;
m.expected_term = task.expected_term;
if (_apply_queue->execute(m, &bthread::TASK_OPTIONS_INPLACE, NULL) != 0) {
...
}
}
ApplyQueue
的消费函数是 execute_applying_tasks
,在该函数中会将 LogEntryAndClosure
进行 bacth 打包处理,并调用批量 apply
接口来处理:
int NodeImpl::execute_applying_tasks(void* meta, bthread::TaskIterator<LogEntryAndClosure>& iter) {
NodeImpl* m = (NodeImpl*)meta;
for (; iter; ++iter) {
if (cur_size == batch_size) { // batch_size = 256
m->apply(tasks, cur_size); // (2) 调用批量 apply 接口处理
cur_size = 0;
}
tasks[cur_size++] = *iter; // (1) 批处理
}
...
}
批量 apply
接口在接收到这些 tasks
后,主要做以下几件事情:
void NodeImpl::apply(LogEntryAndClosure tasks[], size_t size) {
std::vector<LogEntry*> entries;
...
for (size_t i = 0; i < size; ++i) {
...
// (1)获取 LogEntryAndClosure 中的 LogEntry,并填充 term 和 type
entries.push_back(tasks[i].entry);
entries.back()->id.term = _current_term;
entries.back()->type = ENTRY_TYPE_DATA;
...
// (2) 追加日志对应的回调函数(LogEntryAndClosure 中的 Closure)
// 当日志被应用(on_apply)后会调用该 Closure
_ballot_box->append_pending_task(_conf.conf,
_conf.stable() ? NULL : &_conf.old_conf,
tasks[i].done);
}
...
// (3)调用 `LogManager::append_entries` 接口进行追加日志,该接口:
// (3.1) 会对日志进行持久化存储,持久化完成后会调用回调函数 LeaderStableClosure,
// LeaderStableClosure 会调用 BallotBox::commit_at 将复制计数加一
// (3.2) 唤醒 `Replicator` 将日志发送给 Follower
// 每收到一个成功响应,都会调用 BallotBox::commit_at 将复制计数加一
// (3.3) 待日志复制数达到 `Quorum` 后,会提交该日志,并将其应用
_log_manager->append_entries(&entries,
new LeaderStableClosure(
NodeId(_group_id, _server_id),
entries.size(),
_ballot_box));
...
}
LogManager
在接收到这些日志(LogEntry
)后会做以下几件事情:
void LogManager::append_entries(std::vector<LogEntry*> *entries, StableClosure* done) {
...
// (1) 为每一个 LogEntry 分配 index
if (!entries->empty() && check_and_resolve_conflict(entries, done) != 0) {
...
return;
}
// (2) 将日志追加到内存存储中
if (!entries->empty()) {
_logs_in_memory.insert(_logs_in_memory.end(), entries->begin(), entries->end());
}
...
// (3) 往 DiskQueue 中追加一个持久化的任务
// done: LeaderStableClosure
done->_entries.swap(*entries);
int ret = bthread::execution_queue_execute(_disk_queue, done);
// (4) 唤醒所有 Replicator 向 Follower 同步日志
wakeup_all_waiter(lck);
}
LogManager
调用 check_and_resolve_conflict
为每一个 LogEntry
分配 index
:
int LogManager::check_and_resolve_conflict(
std::vector<LogEntry*> *entries, StableClosure* done) {
...
// 这个函数 Leader 和 Follower 都会调用,
// Leader 调用时日志的 index 都是没确定的(即是 0)
// last_log_index:当前 Leader 最后一条日志的 index
if (entries->front()->id.index == 0) {
for (size_t i = 0; i < entries->size(); ++i) {
(*entries)[i]->id.index = ++_last_log_index;
}
...
return 0;
} else {
...
}
...
}
DiskQueue
的消费函数是 disk_thread
,在该函数中,会利用 AppendBatcher
对持久化任务做 Batch 打包处理,最终调用 AppendBatcher::flush
将日志写入磁盘:
int LogManager::disk_thread(void* meta,
bthread::TaskIterator<StableClosure*>& iter) {
...
LogManager* log_manager = static_cast<LogManager*>(meta);
...
StableClosure* storage[256];
AppendBatcher ab(storage, ARRAY_SIZE(storage), &last_id, log_manager);
for (; iter; ++iter) {
// ^^^ Must iterate to the end to release to corresponding
// even if some error has occurred
StableClosure* done = *iter;
...
if (!done->_entries.empty()) {
ab.append(done); // (1) Batch 处理
} else {
ab.flush(); // (2) 写入磁盘
...
}
}
...
ab.flush();
...
return 0;
}
AppendBatcher
负责将数据写入磁盘,并调用最终的回调函数,具体的存储写入我们将在 <4.3 日志存储> 中详细介绍:
class AppendBatcher {
public:
void flush() {
// (1) 最终会调用 `SegmentLogStorage::append_entries` 将日志写入磁盘
_lm->append_to_storage(&_to_append, _last_id, &metric);
for (size_t i = 0; i < _size; ++i) {
// (2) 成功持久化后,调用回调函数,即 `LeaderStableClosure`
_storage[i]->Run();
}
}
void append(LogManager::StableClosure* done) {
// FLAGS_raft_max_append_buffer_size 默认 256KB
if (_size == _cap ||
_buffer_size >= (size_t)FLAGS_raft_max_append_buffer_size) {
flush();
}
_storage[_size++] = done;
_to_append.insert(_to_append.end(),
done->_entries.begin(), done->_entries.end());
for (size_t i = 0; i < done->_entries.size(); ++i) {
_buffer_size += done->_entries[i]->data.length();
}
}
...
};
待持久化成功后,会调用之前设置的回调函数 LeaderStableClosure::Run
,该函数会调用 BallotBox::commit_at
将对应日志的计数加一,详见以下小节 <提交日志>:
void LeaderStableClosure::Run() {
if (status().ok()) {
...
_ballot_box->commit_at(_first_log_index, _first_log_index + _nentries - 1, _node_id.peer_id);
}
...
}
LogManager
调用 wakeup_all_waiter
唤醒所有 Replicator
向 Follower 同步日志,Replicator
唤醒会调用 _continue_sending
继续发送 AppendEntries
请求:
void LogManager::wakeup_all_waiter(std::unique_lock<raft_mutex_t>& lck) {
...
// (1) 清空 `wait_map`
for (butil::FlatMap<int64_t, WaitMeta*>::const_iterator
iter = _wait_map.begin(); iter != _wait_map.end(); ++iter) {
wm[nwm++] = iter->second;
}
_wait_map.clear();
...
// (2) 唤醒所有 `Replicator`
for (size_t i = 0; i < nwm; ++i) {
...
if (bthread_start_background( &tid, &attr, run_on_new_log, wm[i]) != 0) { ...
}
}
}
void* LogManager::run_on_new_log(void *arg) {
...
// wm: Replicator
// on_new_log: _continue_sending
wm->on_new_log(wm->arg, wm->error_code);
...
}
Replicator
被唤醒后,会调用 _send_entries
发送 AppendEntries
请求:
int Replicator::_continue_sending(void* arg, int error_code) {
...
r->_send_entries();
...
}
在 _send_entries
函数中主要做以下几件事情:
void Replicator::_send_entries() {
// (1) 填充 AppendEntries 请求
if (_fill_common_fields(request.get(), _next_index - 1, false) != 0) {
...
// (1.1) 如果发现 Follower 需要的日志(nextIndex)已经被快照压缩了
// 则向 Follower 发送安装快照指令
return _install_snapshot();
}
...
// (2) 填充 AppendEntries 请求中的 entries 字段
for (int i = 0; i < max_entries_size; ++i) {
// (2.1) 调用 `_prepare_entry` 准备实际的日志数据
// 日志数据放在 RPC 请求的 attch 中
prepare_entry_rc = _prepare_entry(i, &em, &cntl->request_attachment());
if (prepare_entry_rc != 0) {
break;
}
request->add_entries()->Swap(&em);
}
...
// (3) 先更新 Follower 的 nextIndex
_next_index += request->entries_size();
...
// (4) 设置响应回调函数 _on_rpc_returned
google::protobuf::Closure* done = brpc::NewCallback(_on_rpc_returned, ...);
// (5) 发送 AppendEntries 请求
RaftService_Stub stub(&_sending_channel);
stub.append_entries(cntl.release(), request.release(),
response.release(), done);
// (6) 如果还有日志,则继续发送;否则注册 waiter 后,就在后台等待新日志到来
_wait_more_entries();
}
_fill_common_fields
函数负责填充 AppendEntries
请求中除 entries
的其余字段:
int Replicator::_fill_common_fields(AppendEntriesRequest* request,
int64_t prev_log_index // matchIndex
bool is_heartbeat) {
const int64_t prev_log_term = _options.log_manager->get_term(prev_log_index);
// (1) 查不到对应日志的 term,代表已经被快照压缩了
if (prev_log_term == 0 && prev_log_index != 0) {
...
return -1;
}
...
// (2) 填充字段
request->set_term(_options.term);
request->set_prev_log_index(prev_log_index);
request->set_prev_log_term(prev_log_term);
request->set_committed_index(_options.ballot_box->last_committed_index()); // commitIndex
return 0;
}
Follower 接收到 AppendEntries
请求后,会调用 handle_append_entries_request
处理请求。其实 Follower 持久化逻辑和 Leader 是一样的,都是调用 LogManager::append_entries
函数进行持久化,只不过在持久化成功后各自的回调函数不一样:
- Leader:回调函数是
LeaderStableClosure
;该回调函数主要是将Quorum
计数加一 - Follower:回调函数是
FollowerStableClosure
;该回调函数主要是发送AppendEntries
响应,并根据请求中携带的 LeadercommitIndex
更新自身的commitIndex
并应用日志
还有一个小的不同点是,Leader 端日志的 index
是自己生成的,而 Follower 中的日志完全来自于 Leader。
void NodeImpl::handle_append_entries_request(brpc::Controller* cntl,
const AppendEntriesRequest* request,
AppendEntriesResponse* response,
google::protobuf::Closure* done,
bool from_append_entries_cache) {
...
// (1) 持久化成功后的回调函数
FollowerStableClosure* c = new FollowerStableClosure(
cntl, request, response, done_guard.release(),
this, _current_term);
// (2) 追加日志
_log_manager->append_entries(&entries, c);
}
持久化成功后将运行回调函数 FollowerStableClosure::Run
:
class FollowerStableClosure : public LogManager::StableClosure {
public:
...
void Run() {
run();
...
}
private:
...
void run() {
// (3) 发送响应
brpc::ClosureGuard done_guard(_done);
...
_response->set_success(true);
_response->set_term(_term);
// (1) 计算 commitIndex
const int64_t committed_index =
std::min(_request->committed_index(),
// ^^^ committed_index is likely less than the
// last_log_index
_request->prev_log_index() + _request->entries_size()
// ^^^ The logs after the appended entries are
// untrustable so we can't commit them even if their
// indexes are less than request->committed_index()
);
// (2) 更新 commitIndex,并应用日志
//_ballot_box is thread safe and tolerates disorder.
_node->_ballot_box->set_last_committed_index(committed_index);
}
};
调用 set_last_committed_index
更新 commitIndex
并应用日志:
int BallotBox::set_last_committed_index(int64_t last_committed_index) {
...
if (last_committed_index > _last_committed_index.load(...)) {
// (1) 更新 `commitIndex`
_last_committed_index.store(last_committed_index, ...);
// (2) 调用 FSMCaller::on_committed 应用日志
// 注意,这里只是放队列里放入一个任务就返回了,
// 最终的 `on_apply` 回调由队列的消费函数调用
_waiter->on_committed(last_committed_index);
}
return 0;
}
Leader 在收到 AppendEntries
响应后,会根据响应的不同类型对相应的处理:
- RPC 失败:调用
_block
阻塞当前Replicator
一段时间(默认 100 毫秒),超时后调用_continue_sending
重新发送当前AppendEntries
请求。出现这种情况一般是对应的 Follower Crash 了,需要不断重试直到其恢复正常或被剔除集群 - 响应失败:这里又细分为 2 种情况
- Follower 的
term
比leader
高:调用increase_term_to
将自己step_down
成 Follower,并以错误状态调用所有日志的回调函数,通知用户apply
失败了 - 日志不匹配:重新探测
nextIndex
,待其确认后重新发送日志
- Follower 的
- 响应成功:调用
BallotBox::commit_at
对复制计算加一
void Replicator::_on_rpc_returned(ReplicatorId id, brpc::Controller* cntl,
AppendEntriesRequest* request,
AppendEntriesResponse* response,
int64_t rpc_send_time) {
// 情况 1:RPC 请求失败
if (cntl->Failed()) {
...
r->_reset_next_index();
return r->_block(start_time_us, cntl->ErrorCode()); // 阻塞一段时间后再重试
}
// 情况 2:收到失败响应
if (!response->success()) {
// 2.1 如果发现 Follower 的日志 term 比 Leader 大
// 则调用 `increase_term_to` 将自己 step_down 成 Follower
// 并以错误状态调用所有日志的回调函数,表明用户 `apply` 失败
if (response->term() > r->_options.term) {
...
butil::Status status;
status.set_error(EHIGHERTERMRESPONSE, "Leader receives higher term "
"%s from peer:%s", response->GetTypeName().c_str(), r->_options.peer_id.to_string().c_str());
r->_destroy();
node_impl->increase_term_to(response->term(), status);
...
return;
}
...
// 2.2 发现发送的日志不匹配,需要重新探测 nextIndex
// 确定 nextIndex 后重新发送 Follower 需要的日志
if (response->last_log_index() + 1 < r->_next_index) {
...
r->_next_index = response->last_log_index() + 1;
} else {
...
if (BAIDU_LIKELY(r->_next_index > 1)) {
...
--r->_next_index;
}
...
}
...
// 重新发送空的 AppendEntries 请求探测 nextIndex
r->_send_empty_entries(false);
return;
}
...
// 情况 3:响应成功
if (entries_size > 0) {
r->_options.ballot_box->commit_at(
min_flying_index, rpc_last_log_index,
r->_options.peer_id);
...
}
}
increase_term_to
函数处理失败的任务,调用 step_down
将自己降级成 Follower:
int NodeImpl::increase_term_to(int64_t new_term, const butil::Status& status) {
...
if (new_term <= _current_term) {
return EINVAL;
}
step_down(new_term, false, status);
return 0;
}
在 step_down
中会调用 BallotBox::clear_pending_tasks
,该函数将以失败状态调用所有用户任务的 Closure
:
void NodeImpl::step_down(const int64_t term, bool wakeup_a_candidate,
const butil::Status& status) {
...
// (1) BallotBox::clear_pending_tasks
_ballot_box->clear_pending_tasks();
...
}
int BallotBox::clear_pending_tasks() {
...
// (2) ClosureQueue::clear
_closure_queue->clear();
return 0;
}
void ClosureQueue::clear() {
...
// (3) saved_queue 中保存的是用户任务的 Closure
for (std::deque<Closure*>::iterator
it = saved_queue.begin(); it != saved_queue.end(); ++it) {
if (*it) {
// (4) 运行用户任务的 Closure
(*it)->status().set_error(EPERM, "leader stepped down");
run_closure_in_bthread_nosig(*it, _usercode_in_pthread);
...
}
}
...
}
Leader 本地持久化成功或每成功复制日志给一个 Follower,都会调用 BallotBox::commit_at
将对应日志的复制计数加一,如果达到 Quorum
,则更新 commitIndex
,并将其应用:
int BallotBox::commit_at(
int64_t first_log_index, int64_t last_log_index, const PeerId& peer) {
...
// (1) 将 index 在 [first_log_index, last_log_index] 之间的日志计数加一
int64_t last_committed_index = 0;
const int64_t start_at = std::max(_pending_index, first_log_index);
Ballot::PosHint pos_hint;
for (int64_t log_index = start_at; log_index <= last_log_index; ++log_index) {
Ballot& bl = _pending_meta_queue[log_index - _pending_index];
pos_hint = bl.grant(peer, pos_hint);
if (bl.granted()) { // (2) 该日志已经达到 `Quorum`,保存 commitIndex
last_committed_index = log_index;
}
}
if (last_committed_index == 0) {
return 0;
}
...
_pending_index = last_committed_index + 1;
// (3) 更新 commitIndex
_last_committed_index.store(last_committed_index, butil::memory_order_relaxed);
// (4) 调用 FSMCaller::do_committed 开始应用日志
// The order doesn't matter
_waiter->on_committed(last_committed_index);
return 0;
}
当日志复制数已达到 Quorum
,则调用 FSMCaller::on_committed
应用日志,该函数会将应用任务放如串行队列 ApplyTaskQueue
当中。ApplyTaskQueue
也是个串行队列,由 BRPC ExecutionQueue 实现:
int FSMCaller::on_committed(int64_t committed_index) {
ApplyTask t;
t.type = COMMITTED;
t.committed_index = committed_index;
return bthread::execution_queue_execute(_queue_id, t);
}
ApplyTaskQueue
的消费函数是 FSMCaller::run
,在该函数中会对应用任务进行 Bacth 打包,并调用 FSMCaller::do_committed
进行批处理:
int FSMCaller::run(void* meta, bthread::TaskIterator<ApplyTask>& iter) {
for (; iter; ++iter) {
if (iter->type == COMMITTED && counter < batch_size) {
// (1) Batch 打包
if (iter->committed_index > max_committed_index) {
max_committed_index = iter->committed_index;
counter++;
}
} else {
// (2) 调用 `do_committed` 进行批处理
if (max_committed_index >= 0) {
caller->_cur_task = COMMITTED;
caller->do_committed(max_committed_index);
max_committed_index = -1;
counter = 0;
}
...
}
...
}
}
在 do_committed
函数中会将这批应用任务生成迭代器 braft::iterator,并将其作为参数调用用户状态机的 on_apply
,待这批日志全部被 on_apply
后,再更新 applyIndex
。最后调用 LogManager::set_applied_id
删除内存中的日志:
void FSMCaller::do_committed(int64_t committed_index) {
// (1) 生产迭代器 Iterator
IteratorImpl iter_impl(_fsm, _log_manager, ...);
for (; iter_impl.is_good();) {
...
Iterator iter(&iter_impl);
_fsm->on_apply(iter); // (2) 调用状态机的 on_apply
...
iter.next();
}
...
// (3) 更新 applyIndex
LogId last_applied_id(last_index, last_term);
_last_applied_index.store(committed_index, butil::memory_order_release);
_last_applied_term = last_term;
// (4) 删除内存中的日志
_log_manager->set_applied_id(last_applied_id);
}
调用 set_applied_id
删除内存中的日志。注意,不删除 Leader 未持久化的日志,即使其已被 apply
:
void LogManager::set_applied_id(const LogId& applied_id) {
if (applied_id < _applied_id) {
return;
}
...
// _disk_id:已经持久化的日志 Id
// _applied_id:已经应用的日志 Id
// 正常情况下,_applied_id >= _disk_id
// 但是为了性能考虑,实现中只要复制数达到 `Quorum` 就可以提交日志,即使 Leader 未持久化
// 所以可能出现日志已经被 apply 了,但是 Leader 还没有持久化
_applied_id = applied_id;
LogId clear_id = std::min(_disk_id, _applied_id);
return clear_memory_logs(clear_id);
}
删除内存中的日志,并释放其内存:
void LogManager::clear_memory_logs(const LogId& id) {
LogEntry* entries_to_clear[256];
size_t nentries = 0;
do {
nentries = 0;
{
BAIDU_SCOPED_LOCK(_mutex);
while (!_logs_in_memory.empty()
&& nentries < ARRAY_SIZE(entries_to_clear)) {
LogEntry* entry = _logs_in_memory.front();
if (entry->id > id) {
break;
}
entries_to_clear[nentries++] = entry;
_logs_in_memory.pop_front();
}
} // out of _mutex
for (size_t i = 0; i < nentries; ++i) {
entries_to_clear[i]->Release();
}
} while (nentries == ARRAY_SIZE(entries_to_clear));
}