Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

raftstore: change the condition of proposing rollback merge #6584

Merged
merged 15 commits into from
Apr 27, 2020
8 changes: 4 additions & 4 deletions components/raftstore/src/store/fsm/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1994,12 +1994,11 @@ where
self.tag, source_region, e
),
};
match state.get_state() {
PeerState::Normal | PeerState::Merging => {}
_ => panic!(
if state.get_state() != PeerState::Merging {
panic!(
"{} unexpected state of merging region {:?}",
self.tag, state
),
);
}
let exist_region = state.get_region().to_owned();
if *source_region != exist_region {
Expand Down Expand Up @@ -2650,6 +2649,7 @@ where
}

fail_point!("on_handle_apply_1003", self.delegate.id() == 1003, |_| {});
fail_point!("on_handle_apply_2", self.delegate.id() == 2, |_| {});
fail_point!("on_handle_apply", |_| {});

if apply.entries.is_empty() || self.delegate.pending_remove || self.delegate.stopped {
Expand Down
70 changes: 61 additions & 9 deletions components/raftstore/src/store/fsm/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

if msg.has_merge_target() {
fail_point!("on_has_merge_target", |_| Ok(()));
if self.need_gc_merge(&msg)? {
self.on_stale_merge();
}
Expand Down Expand Up @@ -950,7 +951,9 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.reset_raft_tick(GroupState::Ordered);
}
ExtraMessageType::MsgWantRollbackMerge => {
unimplemented!("remove this after #6584 merged")
self.fsm
.peer
.maybe_add_want_rollback_merge_peer(msg.get_from_peer().get_id(), &extra_msg);
}
}
}
Expand Down Expand Up @@ -1905,18 +1908,64 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
}

fn on_check_merge(&mut self) {
if self.fsm.stopped || self.fsm.peer.pending_merge_state.is_none() {
if self.fsm.stopped
|| self.fsm.peer.pending_remove
|| self.fsm.peer.pending_merge_state.is_none()
{
return;
}
self.register_merge_check_tick();
fail_point!(
"on_check_merge_not_1001",
self.fsm.peer_id() != 1001,
|_| {}
);
if let Err(e) = self.schedule_merge() {
info!(
"failed to schedule merge, rollback";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
"err" => %e,
);
self.rollback_merge();
if self.fsm.peer.is_leader() {
self.fsm
.peer
.add_want_rollback_merge_peer(self.fsm.peer_id());
if self.fsm.peer.want_rollback_merge_peers.len()
>= raft::majority(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to ensure peers in want_rollback_merge_peers matches the current peer_list?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be ensured. If a peer can be added to want_rollback_merge_peers, it must be in current peer_list because its PrepareMerge commit index is the same as the leader's.

self.fsm
.peer
.raft_group
.status()
.progress
.unwrap()
.voter_ids()
.len(),
)
{
info!(
"failed to schedule merge, rollback";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
"err" => %e,
);
self.rollback_merge();
}
} else if !self.fsm.peer.peer.get_is_learner() {
info!(
"want to rollback merge";
"region_id" => self.fsm.region_id(),
"peer_id" => self.fsm.peer_id(),
"leader_id" => self.fsm.peer.leader_id(),
"err" => %e,
);
if self.fsm.peer.leader_id() != raft::INVALID_ID {
self.ctx.need_flush_trans = true;
self.fsm.peer.send_want_rollback_merge(
self.fsm
.peer
.pending_merge_state
.as_ref()
.unwrap()
.get_commit(),
&mut self.ctx.trans,
);
}
}
}
}

Expand Down Expand Up @@ -2084,7 +2133,10 @@ impl<'a, T: Transport, C: PdClient> PeerFsmDelegate<'a, T, C> {
self.fsm.peer.tag, pending_commit, commit
);
}
// Clear merge releted data
self.fsm.peer.pending_merge_state = None;
self.fsm.peer.want_rollback_merge_peers.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should clear when applying snapshot.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After applying a snapshot, on_ready_rollback_merge will be called.


if let Some(r) = region {
let mut meta = self.ctx.store_meta.lock().unwrap();
meta.set_region(&self.ctx.coprocessor_host, r, &mut self.fsm.peer);
Expand Down
61 changes: 59 additions & 2 deletions components/raftstore/src/store/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ use kvproto::raft_cmdpb::{
TransferLeaderResponse,
};
use kvproto::raft_serverpb::{
ExtraMessageType, MergeState, PeerState, RaftApplyState, RaftMessage, RaftSnapshotData,
ExtraMessage, ExtraMessageType, MergeState, PeerState, RaftApplyState, RaftMessage,
RaftSnapshotData,
};
use protobuf::Message;
use raft::eraftpb::{self, ConfChangeType, EntryType, MessageType};
Expand All @@ -40,7 +41,7 @@ use crate::store::{Callback, Config, PdTask, ReadResponse, RegionSnapshot};
use crate::{Error, Result};
use keys::{enc_end_key, enc_start_key};
use pd_client::INVALID_ID;
use tikv_util::collections::HashMap;
use tikv_util::collections::{HashMap, HashSet};
use tikv_util::time::Instant as UtilInstant;
use tikv_util::time::{duration_to_sec, monotonic_raw_now};
use tikv_util::worker::Scheduler;
Expand Down Expand Up @@ -235,6 +236,12 @@ pub struct Peer {
last_committed_prepare_merge_idx: u64,
/// The merge related state. It indicates this Peer is in merging.
pub pending_merge_state: Option<MergeState>,
/// The rollback merge proposal can be proposed only when the number
/// of peers is greater than the majority of all peers.
/// There are more details in the annotation above
/// `test_node_merge_write_data_to_source_region_after_merging`
/// The peers who want to rollback merge
pub want_rollback_merge_peers: HashSet<u64>,
/// source region is catching up logs for merge
pub catch_up_logs: Option<CatchUpLogs>,

Expand Down Expand Up @@ -301,6 +308,7 @@ impl Peer {
pending_remove: false,
should_wake_up: false,
pending_merge_state: None,
want_rollback_merge_peers: HashSet::default(),
pending_request_snapshot_count: Arc::new(AtomicUsize::new(0)),
last_proposed_prepare_merge_idx: 0,
last_committed_prepare_merge_idx: 0,
Expand Down Expand Up @@ -2468,6 +2476,22 @@ impl Peer {
self.mut_store().cancel_applying_snap();
self.pending_reads.clear_all(None);
}

pub fn maybe_add_want_rollback_merge_peer(&mut self, peer_id: u64, extra_msg: &ExtraMessage) {
if !self.is_leader() {
return;
}
if let Some(ref state) = self.pending_merge_state {
if state.get_commit() == extra_msg.get_premerge_commit() {
self.add_want_rollback_merge_peer(peer_id);
}
}
}

pub fn add_want_rollback_merge_peer(&mut self, peer_id: u64) {
assert!(self.pending_merge_state.is_some());
self.want_rollback_merge_peers.insert(peer_id);
}
}

impl Peer {
Expand Down Expand Up @@ -2618,6 +2642,39 @@ impl Peer {
}
}
}

pub fn send_want_rollback_merge<T: Transport>(&self, premerge_commit: u64, trans: &mut T) {
let mut send_msg = RaftMessage::default();
send_msg.set_region_id(self.region_id);
send_msg.set_from_peer(self.peer.clone());
send_msg.set_region_epoch(self.region().get_region_epoch().clone());
let to_peer = match self.get_peer_from_cache(self.leader_id()) {
Some(p) => p,
None => {
warn!(
"failed to look up recipient peer";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"to_peer" => self.leader_id(),
);
return;
}
};
send_msg.set_to_peer(to_peer.clone());
let extra_msg = send_msg.mut_extra_msg();
extra_msg.set_type(ExtraMessageType::MsgWantRollbackMerge);
extra_msg.set_premerge_commit(premerge_commit);
if let Err(e) = trans.send(send_msg) {
error!(
"failed to send want rollback merge message";
"region_id" => self.region_id,
"peer_id" => self.peer.get_id(),
"target_peer_id" => to_peer.get_id(),
"target_store_id" => to_peer.get_store_id(),
"err" => ?e
);
}
}
}

/// `RequestPolicy` decides how we handle a request.
Expand Down
75 changes: 74 additions & 1 deletion tests/failpoints/cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -846,7 +846,7 @@ fn test_node_merge_transfer_leader() {
}

#[test]
fn test_merge_cascade_merge_with_apply_yield() {
fn test_node_merge_cascade_merge_with_apply_yield() {
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
Expand Down Expand Up @@ -882,3 +882,76 @@ fn test_merge_cascade_merge_with_apply_yield() {
cluster.must_put(format!("k{}", i).as_bytes(), b"v3");
}
}

// Test if the rollback merge proposal is proposed before the majority of peers want to rollback
#[test]
fn test_node_mutiple_rollback_merge() {
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
cluster.cfg.raft_store.right_derive_when_split = true;
cluster.cfg.raft_store.merge_check_tick_interval = ReadableDuration::millis(20);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();

cluster.run();

for i in 0..10 {
cluster.must_put(format!("k{}", i).as_bytes(), b"v");
}

let region = pd_client.get_region(b"k1").unwrap();
cluster.must_split(&region, b"k2");

let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k2").unwrap();

let left_peer_1 = find_peer(&left, 1).unwrap().to_owned();
cluster.must_transfer_leader(left.get_id(), left_peer_1.clone());
assert_eq!(left_peer_1.get_id(), 1001);

let on_schedule_merge_fp = "on_schedule_merge";
let on_check_merge_not_1001_fp = "on_check_merge_not_1001";

let mut right_peer_1_id = find_peer(&right, 1).unwrap().get_id();

for i in 0..3 {
fail::cfg(on_schedule_merge_fp, "return()").unwrap();
cluster.try_merge(left.get_id(), right.get_id());
// Change the epoch of target region and the merge will fail
pd_client.must_remove_peer(right.get_id(), new_peer(1, right_peer_1_id));
right_peer_1_id += 100;
pd_client.must_add_peer(right.get_id(), new_peer(1, right_peer_1_id));
// Only the source leader is running `on_check_merge`
fail::cfg(on_check_merge_not_1001_fp, "return()").unwrap();
fail::remove(on_schedule_merge_fp);
// In previous implementation, rollback merge proposal can be proposed by leader itself
// So wait for the leader propose rollback merge if possible
sleep_ms(100);
// Check if the source region is still in merging mode.
let mut l_r = pd_client.get_region(b"k1").unwrap();
let req = new_request(
l_r.get_id(),
l_r.take_region_epoch(),
vec![new_put_cf_cmd(
"default",
format!("k1{}", i).as_bytes(),
b"vv",
)],
false,
);
let resp = cluster
.call_command_on_leader(req, Duration::from_millis(100))
.unwrap();
assert!(resp
.get_header()
.get_error()
.get_message()
.contains("merging mode"));

fail::remove(on_check_merge_not_1001_fp);
// Write data for waiting the merge to rollback easily
cluster.must_put(format!("k1{}", i).as_bytes(), b"vv");
// Make sure source region is not merged to target region
assert_eq!(pd_client.get_region(b"k1").unwrap().get_id(), left.get_id());
}
}
Loading