diff --git a/raft/raft.go b/raft/raft.go index 3aa645b63060..56ed7db7b437 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -766,6 +766,40 @@ func (r *raft) becomeLeader() { r.logger.Infof("%x became leader at term %d", r.id, r.Term) } +func (r *raft) hup(t CampaignType) { + if r.state == StateLeader { + r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) + return + } + + if !r.promotable() { + r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id) + return + } + + // If there is a pending snapshot, its index will be returned by + // `maybeFirstIndex`. Note that snapshot updates configuration + // already, so as long as pending entries don't contain conf change + // it's safe to start campaign. + var firstIndex uint64 + if i, ok := r.raftLog.unstable.maybeFirstIndex(); ok { + firstIndex = i + } else { + firstIndex = r.raftLog.applied + 1 + } + ents, err := r.raftLog.slice(firstIndex, r.raftLog.committed+1, noLimit) + if err != nil { + r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) + } + if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { + r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) + return + } + + r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) + r.campaign(t) +} + // campaign transitions the raft instance to candidate state. This must only be // called after verifying that this is a legitimate transition. func (r *raft) campaign(t CampaignType) { @@ -907,30 +941,11 @@ func (r *raft) Step(m pb.Message) error { switch m.Type { case pb.MsgHup: - if r.state != StateLeader { - if !r.promotable() { - r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id) - return nil - } - ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit) - if err != nil { - r.logger.Panicf("unexpected error getting unapplied entries (%v)", err) - } - if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied { - r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n) - return nil - } - - r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term) - if r.preVote { - r.campaign(campaignPreElection) - } else { - r.campaign(campaignElection) - } + if r.preVote { + r.hup(campaignPreElection) } else { - r.logger.Debugf("%x ignoring MsgHup because already leader", r.id) + r.hup(campaignElection) } - case pb.MsgVote, pb.MsgPreVote: // We can vote if this is a repeat of a vote we've already cast... canVote := r.Vote == m.From || @@ -1349,7 +1364,7 @@ func stepFollower(r *raft, m pb.Message) error { // Leadership transfers never use pre-vote even if r.preVote is true; we // know we are not recovering from a partition so there is no need for the // extra round trip. - r.campaign(campaignTransfer) + r.hup(campaignTransfer) } else { r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From) } @@ -1675,7 +1690,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) { func numOfPendingConf(ents []pb.Entry) int { n := 0 for i := range ents { - if ents[i].Type == pb.EntryConfChange { + if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 { n++ } } diff --git a/raft/raft_test.go b/raft/raft_test.go index e6558701beb1..2874766c70d1 100644 --- a/raft/raft_test.go +++ b/raft/raft_test.go @@ -4141,6 +4141,97 @@ func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) { } } +func testConfChangeCheckBeforeCampaign(t *testing.T, v2 bool) { + nt := newNetwork(nil, nil, nil) + n1 := nt.peers[1].(*raft) + n2 := nt.peers[2].(*raft) + nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup}) + if n1.state != StateLeader { + t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader) + } + + // Begin to remove the third node. + cc := pb.ConfChange{ + Type: pb.ConfChangeRemoveNode, + NodeID: 2, + } + var ccData []byte + var err error + var ty pb.EntryType + if v2 { + ccv2 := cc.AsV2() + ccData, err = ccv2.Marshal() + ty = pb.EntryConfChangeV2 + } else { + ccData, err = cc.Marshal() + ty = pb.EntryConfChange + } + if err != nil { + t.Fatal(err) + } + nt.send(pb.Message{ + From: 1, + To: 1, + Type: pb.MsgProp, + Entries: []pb.Entry{ + {Type: ty, Data: ccData}, + }, + }) + + // Trigger campaign in node 2 + for i := 0; i < n2.randomizedElectionTimeout; i++ { + n2.tick() + } + // It's still follower because committed conf change is not applied. + if n2.state != StateFollower { + t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower) + } + + // Transfer leadership to peer 2. + nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) + if n1.state != StateLeader { + t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader) + } + // It's still follower because committed conf change is not applied. + if n2.state != StateFollower { + t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower) + } + // Abort transfer leader + for i := 0; i < n1.electionTimeout; i++ { + n1.tick() + } + + nextEnts(n2, nt.storage[2]) + + // Transfer leadership to peer 2 again. + nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader}) + if n1.state != StateFollower { + t.Errorf("node 1 state: %s, want %s", n1.state, StateFollower) + } + if n2.state != StateLeader { + t.Errorf("node 2 state: %s, want %s", n2.state, StateLeader) + } + + nextEnts(n1, nt.storage[1]) + // Trigger campaign in node 2 + for i := 0; i < n1.randomizedElectionTimeout; i++ { + n1.tick() + } + if n1.state != StateCandidate { + t.Errorf("node 1 state: %s, want %s", n1.state, StateCandidate) + } +} + +// Tests if unapplied ConfChange is checked before campaign. +func TestConfChangeCheckBeforeCampaign(t *testing.T) { + testConfChangeCheckBeforeCampaign(t, false) +} + +// Tests if unapplied ConfChangeV2 is checked before campaign. +func TestConfChangeV2CheckBeforeCampaign(t *testing.T) { + testConfChangeCheckBeforeCampaign(t, true) +} + func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft { storage := NewMemoryStorage() for i, term := range terms {