Skip to content

Commit

Permalink
raft: check pending conf change before campaign
Browse files Browse the repository at this point in the history
fix etcd-io#12133

Signed-off-by: Jay Lee <BusyJayLee@gmail.com>
  • Loading branch information
BusyJay committed Jul 14, 2020
1 parent 4c6881f commit d513788
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 24 deletions.
63 changes: 39 additions & 24 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -766,6 +766,40 @@ func (r *raft) becomeLeader() {
r.logger.Infof("%x became leader at term %d", r.id, r.Term)
}

func (r *raft) hup(t CampaignType) {
if r.state == StateLeader {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
return
}

if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
return
}

// If there is a pending snapshot, its index will be returned by
// `maybeFirstIndex`. Note that snapshot updates configuration
// already, so as long as pending entries don't contain conf change
// it's safe to start campaign.
var firstIndex uint64
if i, ok := r.raftLog.unstable.maybeFirstIndex(); ok {
firstIndex = i
} else {
firstIndex = r.raftLog.applied + 1
}
ents, err := r.raftLog.slice(firstIndex, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
r.campaign(t)
}

// campaign transitions the raft instance to candidate state. This must only be
// called after verifying that this is a legitimate transition.
func (r *raft) campaign(t CampaignType) {
Expand Down Expand Up @@ -907,30 +941,11 @@ func (r *raft) Step(m pb.Message) error {

switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if !r.promotable() {
r.logger.Warningf("%x is unpromotable and can not campaign; ignoring MsgHup", r.id)
return nil
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
r.logger.Warningf("%x cannot campaign at term %d since there are still %d pending configuration changes to apply", r.id, r.Term, n)
return nil
}

r.logger.Infof("%x is starting a new election at term %d", r.id, r.Term)
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
if r.preVote {
r.hup(campaignPreElection)
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
r.hup(campaignElection)
}

case pb.MsgVote, pb.MsgPreVote:
// We can vote if this is a repeat of a vote we've already cast...
canVote := r.Vote == m.From ||
Expand Down Expand Up @@ -1349,7 +1364,7 @@ func stepFollower(r *raft, m pb.Message) error {
// Leadership transfers never use pre-vote even if r.preVote is true; we
// know we are not recovering from a partition so there is no need for the
// extra round trip.
r.campaign(campaignTransfer)
r.hup(campaignTransfer)
} else {
r.logger.Infof("%x received MsgTimeoutNow from %x but is not promotable", r.id, m.From)
}
Expand Down Expand Up @@ -1675,7 +1690,7 @@ func (r *raft) reduceUncommittedSize(ents []pb.Entry) {
func numOfPendingConf(ents []pb.Entry) int {
n := 0
for i := range ents {
if ents[i].Type == pb.EntryConfChange {
if ents[i].Type == pb.EntryConfChange || ents[i].Type == pb.EntryConfChangeV2 {
n++
}
}
Expand Down
91 changes: 91 additions & 0 deletions raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4141,6 +4141,97 @@ func TestPreVoteMigrationWithFreeStuckPreCandidate(t *testing.T) {
}
}

func testConfChangeCheckBeforeCampaign(t *testing.T, v2 bool) {
nt := newNetwork(nil, nil, nil)
n1 := nt.peers[1].(*raft)
n2 := nt.peers[2].(*raft)
nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgHup})
if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}

// Begin to remove the third node.
cc := pb.ConfChange{
Type: pb.ConfChangeRemoveNode,
NodeID: 2,
}
var ccData []byte
var err error
var ty pb.EntryType
if v2 {
ccv2 := cc.AsV2()
ccData, err = ccv2.Marshal()
ty = pb.EntryConfChangeV2
} else {
ccData, err = cc.Marshal()
ty = pb.EntryConfChange
}
if err != nil {
t.Fatal(err)
}
nt.send(pb.Message{
From: 1,
To: 1,
Type: pb.MsgProp,
Entries: []pb.Entry{
{Type: ty, Data: ccData},
},
})

// Trigger campaign in node 2
for i := 0; i < n2.randomizedElectionTimeout; i++ {
n2.tick()
}
// It's still follower because committed conf change is not applied.
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}

// Transfer leadership to peer 2.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
if n1.state != StateLeader {
t.Errorf("node 1 state: %s, want %s", n1.state, StateLeader)
}
// It's still follower because committed conf change is not applied.
if n2.state != StateFollower {
t.Errorf("node 2 state: %s, want %s", n2.state, StateFollower)
}
// Abort transfer leader
for i := 0; i < n1.electionTimeout; i++ {
n1.tick()
}

nextEnts(n2, nt.storage[2])

// Transfer leadership to peer 2 again.
nt.send(pb.Message{From: 2, To: 1, Type: pb.MsgTransferLeader})
if n1.state != StateFollower {
t.Errorf("node 1 state: %s, want %s", n1.state, StateFollower)
}
if n2.state != StateLeader {
t.Errorf("node 2 state: %s, want %s", n2.state, StateLeader)
}

nextEnts(n1, nt.storage[1])
// Trigger campaign in node 2
for i := 0; i < n1.randomizedElectionTimeout; i++ {
n1.tick()
}
if n1.state != StateCandidate {
t.Errorf("node 1 state: %s, want %s", n1.state, StateCandidate)
}
}

// Tests if unapplied ConfChange is checked before campaign.
func TestConfChangeCheckBeforeCampaign(t *testing.T) {
testConfChangeCheckBeforeCampaign(t, false)
}

// Tests if unapplied ConfChangeV2 is checked before campaign.
func TestConfChangeV2CheckBeforeCampaign(t *testing.T) {
testConfChangeCheckBeforeCampaign(t, true)
}

func entsWithConfig(configFunc func(*Config), terms ...uint64) *raft {
storage := NewMemoryStorage()
for i, term := range terms {
Expand Down

0 comments on commit d513788

Please # to comment.