-
Notifications
You must be signed in to change notification settings - Fork 0
/
protocol.go
136 lines (105 loc) · 3.19 KB
/
protocol.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package raft
import (
"errors"
"time"
)
// MessagePtrFactory emits a new request struct ptr
type MessagePtrFactory func() interface{}
// MessageTypeTranslator is a map associating rpc types
// (uint8) to MessagePtrFactorys
type MessageTypeTranslator map[uint8]MessagePtrFactory
// MessageTypeTranslatorGetter implements
// GetTypeTranslator() func
type MessageTypeTranslatorGetter interface {
// GetTypeTranslator returns the protocol's
// type translator
GetTypeTranslator() MessageTypeTranslator
}
// MessageReceiveEventHandler represents a protocol interface which is
// notified about incoming messages, properly decoded into the
// interface{} with the rpc type
type MessageReceiveEventHandler interface {
// OnMessageReceive is called when a message was recieved and has been
// properly decoded; return either an error or an struct ptr; both will
// be sent back to the requesting client
OnMessageReceive(uint8, interface{}) (interface{}, error)
}
// RemoteNode is a raft node which is not the local node.
// This is supposed to only be a reference to this node.
type RemoteNode interface {
GetID() string
GetAddress() string
}
// LocalNode is a raft node which is the local node.
type LocalNode interface {
RemoteNode
AppendLogMessage([]byte, time.Duration) error
RemoteProcedureCall(RemoteNode, uint8, interface{}, interface{}) error
GetRemoteNodes() ([]RemoteNode, error)
GetAuthToken() string
}
// LocalNodeSetter represents a protocol interface which
// receives a reference to a LocalNode once a cluster has been
// joined / created.
type LocalNodeSetter interface {
SetLocalNode(LocalNode)
}
// SharedStateGetter implements the GetSharedState() func
type SharedStateGetter interface {
// GetSharedState returns the SharedState
GetSharedState() SharedState
}
// MessageProtocol is a message oriented protocol
// sharing a common state over all nodes of the "overlay"
// network
type MessageProtocol interface {
SharedStateGetter
LocalNodeSetter
MessageTypeTranslatorGetter
MessageReceiveEventHandler
}
type protocolWrapper struct {
joinProtoc MessageProtocol
custProtoc MessageProtocol
}
func (c *protocolWrapper) GetSharedState() SharedState {
if c.custProtoc != nil {
return c.custProtoc.GetSharedState()
}
return nil
}
func (c *protocolWrapper) GetTypeTranslator() MessageTypeTranslator {
jtt := c.joinProtoc.GetTypeTranslator()
if c.custProtoc == nil {
return jtt
}
ctt := c.custProtoc.GetTypeTranslator()
for k, v := range ctt {
// block any rpcTypes that might interfere with raft protocol
if k < RPCHeaderOffset {
return nil
}
jtt[k] = v
}
return jtt
}
func (c *protocolWrapper) OnMessageReceive(u uint8, i interface{}) (interface{}, error) {
ji, je := c.joinProtoc.OnMessageReceive(u, i)
if !(je == nil && ji == nil) {
return ji, je
}
if c.custProtoc == nil {
return ji, je
}
// block any rpcType that might interfere with raft protocol
if u <= rpcJoinCluster {
return nil, errors.New("[ERR] raft internal rpc message reached custom protocol")
}
return c.custProtoc.OnMessageReceive(u, i)
}
func (c *protocolWrapper) SetLocalNode(node LocalNode) {
c.joinProtoc.SetLocalNode(node)
if c.custProtoc != nil {
c.custProtoc.SetLocalNode(node)
}
}