-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfsm.go
73 lines (67 loc) · 1.6 KB
/
fsm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package jsn_raft
import (
"crypto/md5"
"sort"
"sync"
"github.com/jsn4ke/jsn_raft/v2/pb"
"google.golang.org/protobuf/proto"
)
type (
// log 提交 与 apply的 序列化反序列化可以走额外的goroutine处理
// 比如 apply时候,进入 marshal 的channel 完成后 投递到 leader的channel
// 可以log记录的时候就反序列化好,等commit完成直接apply,减少时间
// commit update 之后,反序列化然后apply
Fsm struct {
rw sync.RWMutex
pb.KVStore
}
LogOp struct {
Op pb.LogOperation
}
)
func (f *Fsm) Md5() [md5.Size]byte {
f.rw.RLock()
defer f.rw.RUnlock()
arr := new(pb.HelpArray)
for k := range f.Data {
arr.Data = append(arr.Data, k)
}
sort.Slice(arr.Data, func(i, j int) bool {
return arr.Data[i] < arr.Data[j]
})
body1, _ := proto.Marshal(arr)
for i, v := range arr.Data {
arr.Data[i] = f.GetData()[v]
}
body2, _ := proto.Marshal(arr)
return md5.Sum(append(body1, body2...))
}
func (f *Fsm) Apply(jlog *pb.JsnLog) {
f.rw.Lock()
defer f.rw.Unlock()
msg := new(pb.LogCmd)
proto.Unmarshal(jlog.GetCmd(), msg)
switch msg.GetOp() {
case pb.LogOperation_LogOperation_Fsm:
switch tp := msg.GetBehavior().(type) {
case *pb.LogCmd_Update:
update := tp.Update
if nil == f.Data {
f.Data = map[uint64]uint64{}
}
f.Data[update.GetKey()] = update.GetValue()
case *pb.LogCmd_Delete:
del := tp.Delete
delete(f.Data, del.GetKey())
}
}
}
func (r *Raft) applyLog(fromIndex, toIndex int64) {
for idx := fromIndex + 1; idx <= toIndex; idx++ {
jlog := r.getLog(idx)
if nil == jlog {
panic("no log")
}
r.fsm.Apply(jlog)
}
}