-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathphi.go
233 lines (199 loc) · 8.45 KB
/
phi.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
package failuredetector
// Port of the Akka PhiAccuralFailureDetector
// https://github.com/akka/akka/blob/master/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala
import (
"errors"
"math"
"sync/atomic"
"time"
"unsafe"
)
// PhiAccuralFailureDetector is an implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
// [http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf]
//
// The suspicion level of failure is given by a value called φ (phi).
// The basic idea of the φ failure detector is to express the value of φ on a scale that
// is dynamically adjusted to reflect current network conditions. A configurable
// threshold is used to decide if φ is considered to be a failure.
//
// The value of φ is calculated as:
//
// {{{
// φ = -log10(1 - F(timeSinceLastHeartbeat)
// }}}
// where F is the cumulative distribution function of a normal distribution with mean
// and standard deviation estimated from historical heartbeat inter-arrival times.
//
// This implementation is a port of the fantastic Akka PhiAccrualFailureDetector.
// https://github.com/akka/akka/blob/master/akka-remote/src/main/scala/akka/remote/PhiAccrualFailureDetector.scala
//
// The failure detector is thread-safe and can be safely shared across
// go-routines without additional synchronization.
//
// threshold - A low threshold is prone to generate many wrong suspicions but ensures a quick detection in the event
// of a real crash. Conversely, a high threshold generates fewer mistakes but needs more time to detect
// actual crashes
// maxSampleSize - Number of samples to use for calculation of mean and standard deviation of
// inter-arrival times.
// minStdDeviation - Minimum standard deviation to use for the normal distribution used when calculating phi.
// Too low standard deviation might result in too much sensitivity for sudden, but normal, deviations
// in heartbeat inter arrival times.
// acceptableHeartbeatPause - Duration corresponding to number of potentially lost/delayed
// heartbeats that will be accepted before considering it to be an anomaly.
// This margin is important to be able to survive sudden, occasional, pauses in heartbeat
// arrivals, due to for example garbage collect or network drop.
// firstHeartbeatEstimate - Bootstrap the stats with heartbeats that corresponds to
// to this duration, with a with rather high standard deviation (since environment is unknown
// in the beginning)
type PhiAccuralFailureDetector struct {
threshold float64
maxSampleSize uint
minStdDeviation time.Duration
acceptableHeartbeatPause time.Duration
firstHeartbeatEstimate time.Duration
eventStream chan<- time.Duration
firstHeartbeat heartbeatHistory
acceptableHeartbeatPauseMS uint64
minStdDeviationMS uint64
state *state
clock clock
}
// state of the PhiAccuralFailureDetector
type state struct {
history heartbeatHistory
timestamp *time.Time
}
// New creates and returns a new failure detector.
func New(
threshold float64,
maxSampleSize uint,
minStdDeviation time.Duration,
acceptableHeartbeatPause time.Duration,
firstHeartbeatEstimate time.Duration,
eventStream chan<- time.Duration) (*PhiAccuralFailureDetector, error) {
if threshold <= 0.0 {
return nil, errors.New("threshold must be > 0")
}
if maxSampleSize <= 0 {
return nil, errors.New("maxSampleSize must be > 0")
}
if minStdDeviation <= 0 {
return nil, errors.New("minStdDeviation must be > 0")
}
if acceptableHeartbeatPause < 0 {
return nil, errors.New("acceptableHeartbeatPause must be >= 0")
}
if firstHeartbeatEstimate <= 0 {
return nil, errors.New("heartbeatInterval must be > 0")
}
firstHeartbeat := initHeartbeat(maxSampleSize, firstHeartbeatEstimate)
return &PhiAccuralFailureDetector{
threshold: threshold,
maxSampleSize: maxSampleSize,
minStdDeviation: minStdDeviation,
acceptableHeartbeatPause: acceptableHeartbeatPause,
firstHeartbeatEstimate: firstHeartbeatEstimate,
eventStream: eventStream,
firstHeartbeat: firstHeartbeat,
acceptableHeartbeatPauseMS: toMillis(acceptableHeartbeatPause),
minStdDeviationMS: toMillis(minStdDeviation),
state: &state{history: firstHeartbeat, timestamp: nil},
clock: defaultClock,
}, nil
}
// initHeartbeat returns the initial heartbeat guess
func initHeartbeat(maxSampleSize uint, firstHeartbeatEstimate time.Duration) heartbeatHistory {
firstHeartbeatEstimateMS := toMillis(firstHeartbeatEstimate)
stdDeviationMS := firstHeartbeatEstimateMS / 4
return newHeartbeatHistory(maxSampleSize).
append(firstHeartbeatEstimateMS - stdDeviationMS).
append(firstHeartbeatEstimateMS + stdDeviationMS)
}
// Load the state of this failure detector
func (d *PhiAccuralFailureDetector) loadState() *state {
return (*state)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&d.state))))
}
// CAS the state of this failure detector
func (d *PhiAccuralFailureDetector) casState(old, new *state) bool {
return atomic.CompareAndSwapPointer(
(*unsafe.Pointer)(unsafe.Pointer(&d.state)),
unsafe.Pointer(old),
unsafe.Pointer(new))
}
// IsAvailable returns true if the resource is considered to be up and healthy; false otherwise.
func (d *PhiAccuralFailureDetector) IsAvailable() bool {
return d.isAvailableAt(d.clock())
}
func (d *PhiAccuralFailureDetector) isAvailableAt(time time.Time) bool {
return d.phiAt(time) < d.threshold
}
// IsMonitoring returns true if the failure detectore has received any
// heartbeats and started monitoring of the resource.
func (d *PhiAccuralFailureDetector) IsMonitoring() bool {
return d.loadState().timestamp != nil
}
// Heartbeat of a monitored resource.
// Notifies the detector that a heartbeat arrived from the monitored resource.
// This causes the detector to update its state.
func (d *PhiAccuralFailureDetector) Heartbeat() {
for {
timestamp := d.clock()
oldState := d.loadState()
var newHistory heartbeatHistory
if latestTimestamp := oldState.timestamp; latestTimestamp == nil {
// this is heartbeat from a new resource
// add starter records for this new resource
newHistory = d.firstHeartbeat
} else {
// this is a known connection
interval := timestamp.Sub(*latestTimestamp)
// don't use the first heartbeat after failure for the history, since a long pause will skew the stats
if d.isAvailableAt(timestamp) {
intervalMS := toMillis(interval)
if intervalMS >= (d.acceptableHeartbeatPauseMS/2) && d.eventStream != nil {
// heartbeat interval is growing too large (by interval)
d.eventStream <- interval
}
newHistory = oldState.history.append(intervalMS)
} else {
newHistory = oldState.history
}
}
newState := &state{history: newHistory, timestamp: ×tamp} // record new timestamp
// if we won the race then update else try again
if d.casState(oldState, newState) {
break
}
}
}
// Phi (the suspicion level) of the accrual failure detector.
func (d *PhiAccuralFailureDetector) Phi() float64 {
return d.phiAt(d.clock())
}
// phiAt a given time of the accrual failure detector.
func (d *PhiAccuralFailureDetector) phiAt(timestamp time.Time) float64 {
oldState := d.loadState()
oldTimestamp := oldState.timestamp
if oldTimestamp == nil {
return 0.0 // treat unmanaged connections, e.g. with zero heartbeats, as healthy connections
}
timeDiff := timestamp.Sub(*oldTimestamp)
history := oldState.history
mean := history.mean()
stdDeviation := d.ensureValidStdDeviation(history.stdDeviation())
return phi(float64(toMillis(timeDiff)), mean+float64(d.acceptableHeartbeatPauseMS), stdDeviation)
}
func (d *PhiAccuralFailureDetector) ensureValidStdDeviation(stdDeviation float64) float64 {
return math.Max(stdDeviation, float64(d.minStdDeviationMS))
}
func toMillis(d time.Duration) uint64 {
return uint64(d.Seconds() * 1e3)
}
func phi(timeDiff, mean, stdDeviation float64) float64 {
y := (timeDiff - mean) / stdDeviation
e := math.Exp(-y * (1.5976 + 0.070566*y*y))
if timeDiff > mean {
return -math.Log10(e / (1.0 + e))
}
return -math.Log10(1.0 - 1.0/(1.0+e))
}