-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcandidate.go
75 lines (67 loc) · 1.69 KB
/
candidate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package jsn_raft
import (
"time"
"github.com/jsn4ke/jsn_raft/v2/pb"
)
func (r *Raft) runCandidate() {
r.logger.Info("[%v] run candidate",
r.who)
r.addCurrentTerm()
lastLogIndex, lastLogTerm := r.lastLog()
req := &pb.VoteRequest{
Term: r.getCurrentTerm(),
CandidateId: []byte(r.who),
LastLogIndex: lastLogIndex,
LastLogTerm: lastLogTerm,
}
var (
needVotes = len(r.config.List)/2 + 1
grantedVotes = 0
)
voteResponseChannel := make(chan *pb.VoteResponse, len(r.config.List)-1)
for _, v := range r.config.List {
who := v.Who
if who == r.who {
grantedVotes++
r.setVoteFor(who)
} else {
r.safeGo("peer vote request", func() {
resp := &pb.VoteResponse{}
// err := r.rpcClients[who].Call(req, resp, nil, r.rpcTimeout())
err := r.rpcCall(who, req, resp, nil, r.rpcTimeout())
if nil != err {
r.logger.Error("`[%v] vote to %v err %v",
r.who, who, err.Error())
}
voteResponseChannel <- resp
})
}
}
electionTimer := time.After(randomTimeout(r.electionTimeout()))
for r.getServerState() == candidate {
select {
case wrap := <-r.rpcChannel:
r.handlerRpc(wrap)
case resp := <-voteResponseChannel:
if resp.CurrentTerm > r.getCurrentTerm() {
r.setServerState(follower)
r.setCurrentTerm(resp.CurrentTerm)
return
}
if resp.VoteGranted {
grantedVotes++
r.logger.Debug("[%v] receive vote, current votes %v need %v",
r.who, grantedVotes, needVotes)
}
if grantedVotes >= needVotes {
r.setServerState(leader)
r.voteFor = r.who
}
case <-electionTimer:
// 重新开始投票
r.logger.Debug("[%v] current votes %v need %v timeout",
r.who, grantedVotes, needVotes)
return
}
}
}