diff --git a/api.go b/api.go index 5c17bd6d..4c021659 100644 --- a/api.go +++ b/api.go @@ -207,6 +207,10 @@ type Raft struct { // followerNotifyCh is used to tell followers that config has changed followerNotifyCh chan struct{} + + // thread saturation metric recorders. + mainThreadSaturation *saturationMetric + fsmThreadSaturation *saturationMetric } // BootstrapCluster initializes a server's storage with the given cluster @@ -553,6 +557,8 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna leadershipTransferCh: make(chan *leadershipTransferFuture, 1), leaderNotifyCh: make(chan struct{}, 1), followerNotifyCh: make(chan struct{}, 1), + mainThreadSaturation: newSaturationMetric([]string{"raft", "mainThreadSaturation"}, 1*time.Second), + fsmThreadSaturation: newSaturationMetric([]string{"raft", "fsm", "threadSaturation"}, 1*time.Second), } r.conf.Store(*conf) diff --git a/fsm.go b/fsm.go index 487cb4b7..0ff88831 100644 --- a/fsm.go +++ b/fsm.go @@ -224,8 +224,12 @@ func (r *Raft) runFSM() { } for { + r.fsmThreadSaturation.sleeping() + select { case ptr := <-r.fsmMutateCh: + r.fsmThreadSaturation.working() + switch req := ptr.(type) { case []*commitTuple: applyBatch(req) @@ -238,6 +242,8 @@ func (r *Raft) runFSM() { } case req := <-r.fsmSnapshotCh: + r.fsmThreadSaturation.working() + snapshot(req) case <-r.shutdownCh: diff --git a/raft.go b/raft.go index 4b85ac1e..a509b3b3 100644 --- a/raft.go +++ b/raft.go @@ -159,35 +159,45 @@ func (r *Raft) runFollower() { heartbeatTimer := randomTimeout(r.config().HeartbeatTimeout) for r.getState() == Follower { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(r.liveBootstrap(b.configuration)) case <-r.leaderNotifyCh: @@ -197,6 +207,7 @@ func (r *Raft) runFollower() { heartbeatTimer = time.After(0) case <-heartbeatTimer: + r.mainThreadSaturation.working() // Restart the heartbeat timer hbTimeout := r.config().HeartbeatTimeout heartbeatTimer = randomTimeout(hbTimeout) @@ -290,11 +301,15 @@ func (r *Raft) runCandidate() { r.logger.Debug("votes", "needed", votesNeeded) for r.getState() == Candidate { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case vote := <-voteCh: + r.mainThreadSaturation.working() // Check if the term is greater than ours, bail if vote.Term > r.getCurrentTerm() { r.logger.Debug("newer term discovered, fallback to follower") @@ -318,30 +333,37 @@ func (r *Raft) runCandidate() { } case c := <-r.configurationChangeCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader c.respond(ErrNotLeader) case a := <-r.applyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader a.respond(ErrNotLeader) case v := <-r.verifyCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader v.respond(ErrNotLeader) - case r := <-r.userRestoreCh: + case ur := <-r.userRestoreCh: + r.mainThreadSaturation.working() // Reject any restores since we are not the leader - r.respond(ErrNotLeader) + ur.respond(ErrNotLeader) - case r := <-r.leadershipTransferCh: + case l := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() // Reject any operations since we are not the leader - r.respond(ErrNotLeader) + l.respond(ErrNotLeader) case c := <-r.configurationsCh: + r.mainThreadSaturation.working() c.configurations = r.configurations.Clone() c.respond(nil) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case <-r.leaderNotifyCh: @@ -354,6 +376,7 @@ func (r *Raft) runCandidate() { } case <-electionTimer: + r.mainThreadSaturation.working() // Election failed! Restart the election. We simply return, // which will kick us back into runCandidate r.logger.Warn("Election timeout reached, restarting election") @@ -598,14 +621,19 @@ func (r *Raft) leaderLoop() { lease := time.After(r.config().LeaderLeaseTimeout) for r.getState() == Leader { + r.mainThreadSaturation.sleeping() + select { case rpc := <-r.rpcCh: + r.mainThreadSaturation.working() r.processRPC(rpc) case <-r.leaderState.stepDown: + r.mainThreadSaturation.working() r.setState(Follower) case future := <-r.leadershipTransferCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -686,6 +714,7 @@ func (r *Raft) leaderLoop() { go r.leadershipTransfer(*id, *address, state, stopCh, doneCh) case <-r.leaderState.commitCh: + r.mainThreadSaturation.working() // Process the newly committed entries oldCommitIndex := r.getCommitIndex() commitIndex := r.leaderState.commitment.getCommitIndex() @@ -748,6 +777,7 @@ func (r *Raft) leaderLoop() { } case v := <-r.verifyCh: + r.mainThreadSaturation.working() if v.quorumSize == 0 { // Just dispatched, start the verification r.verifyLeader(v) @@ -772,6 +802,7 @@ func (r *Raft) leaderLoop() { } case future := <-r.userRestoreCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -781,6 +812,7 @@ func (r *Raft) leaderLoop() { future.respond(err) case future := <-r.configurationsCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -790,6 +822,7 @@ func (r *Raft) leaderLoop() { future.respond(nil) case future := <-r.configurationChangeChIfStable(): + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) future.respond(ErrLeadershipTransferInProgress) @@ -798,9 +831,11 @@ func (r *Raft) leaderLoop() { r.appendConfigurationEntry(future) case b := <-r.bootstrapCh: + r.mainThreadSaturation.working() b.respond(ErrCantBootstrap) case newLog := <-r.applyCh: + r.mainThreadSaturation.working() if r.getLeadershipTransferInProgress() { r.logger.Debug(ErrLeadershipTransferInProgress.Error()) newLog.respond(ErrLeadershipTransferInProgress) @@ -829,6 +864,7 @@ func (r *Raft) leaderLoop() { } case <-lease: + r.mainThreadSaturation.working() // Check if we've exceeded the lease, potentially stepping down maxDiff := r.checkLeaderLease() diff --git a/saturation.go b/saturation.go new file mode 100644 index 00000000..48e480a0 --- /dev/null +++ b/saturation.go @@ -0,0 +1,139 @@ +package raft + +import ( + "math" + "time" + + "github.com/armon/go-metrics" +) + +// saturationMetric measures the saturation (percentage of time spent working vs +// waiting for work) of an event processing loop, such as runFSM. It reports the +// saturation as a gauge metric (at most) once every reportInterval. +// +// Callers must instrument their loop with calls to sleeping and working, starting +// with a call to sleeping. +// +// Note: it is expected that the caller is single-threaded and is not safe for +// concurrent use by multiple goroutines. +type saturationMetric struct { + // reportInterval is the maximum frequency at which the gauge will be + // updated (it may be less frequent if the caller is idle). + reportInterval time.Duration + + // index of the current sample. We rely on it wrapping-around on overflow and + // underflow, to implement a circular buffer (note the matching size of the + // samples array). + index uint8 + + // samples is a fixed-size array of samples (similar to a circular buffer) + // where elements at even-numbered indexes contain time spent sleeping, and + // elements at odd-numbered indexes contain time spent working. + samples [math.MaxUint8 + 1]struct { + v time.Duration // measurement + t time.Time // time at which the measurement was captured + } + + sleepBegan, workBegan time.Time + lastReport time.Time + + // These are overwritten in tests. + nowFn func() time.Time + reportFn func(float32) +} + +// newSaturationMetric creates a saturationMetric that will update the gauge +// with the given name at the given reportInterval. +func newSaturationMetric(name []string, reportInterval time.Duration) *saturationMetric { + return &saturationMetric{ + reportInterval: reportInterval, + lastReport: time.Now(), // Set to now to avoid reporting immediately. + nowFn: time.Now, + reportFn: func(sat float32) { metrics.SetGauge(name, sat) }, + } +} + +// sleeping records the time at which the caller began waiting for work. After +// the initial call it must always be proceeded by a call to working. +func (s *saturationMetric) sleeping() { + s.sleepBegan = s.nowFn() + + // Caller should've called working (we've probably missed a branch of a + // select). Reset sleepBegan without recording a sample to "lose" time + // instead of recording nonsense data. + if s.index%2 == 1 { + return + } + + if !s.workBegan.IsZero() { + sample := &s.samples[s.index-1] + sample.v = s.sleepBegan.Sub(s.workBegan) + sample.t = s.sleepBegan + } + + s.index++ + s.report() +} + +// working records the time at which the caller began working. It must always +// be proceeded by a call to sleeping. +func (s *saturationMetric) working() { + s.workBegan = s.nowFn() + + // Caller should've called sleeping. Reset workBegan without recording a + // sample to "lose" time instead of recording nonsense data. + if s.index%2 == 0 { + return + } + + sample := &s.samples[s.index-1] + sample.v = s.workBegan.Sub(s.sleepBegan) + sample.t = s.workBegan + + s.index++ + s.report() +} + +// report updates the gauge if reportInterval has passed since our last report. +func (s *saturationMetric) report() { + if s.nowFn().Sub(s.lastReport) < s.reportInterval { + return + } + + workSamples := make([]time.Duration, 0, len(s.samples)/2) + sleepSamples := make([]time.Duration, 0, len(s.samples)/2) + + for idx, sample := range s.samples { + if !sample.t.After(s.lastReport) { + continue + } + + if idx%2 == 0 { + sleepSamples = append(sleepSamples, sample.v) + } else { + workSamples = append(workSamples, sample.v) + } + } + + // Ensure we take an equal number of work and sleep samples to avoid + // over/under reporting. + take := len(workSamples) + if len(sleepSamples) < take { + take = len(sleepSamples) + } + + var saturation float32 + if take != 0 { + var work, sleep time.Duration + for _, s := range workSamples[0:take] { + work += s + } + for _, s := range sleepSamples[0:take] { + sleep += s + } + saturation = float32(work) / float32(work+sleep) + } + + s.reportFn(saturation) + s.lastReport = s.nowFn() +} diff --git a/saturation_test.go b/saturation_test.go new file mode 100644 index 00000000..c576e2e1 --- /dev/null +++ b/saturation_test.go @@ -0,0 +1,133 @@ +package raft + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSaturationMetric(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + var now time.Time + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(val float32) { reported = val } + + now = time.Now() + sat.sleeping() + + now = now.Add(50 * time.Millisecond) + sat.working() + + now = now.Add(75 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.6), reported) + + now = now.Add(90 * time.Millisecond) + sat.working() + + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.1), reported) + + now = now.Add(100 * time.Millisecond) + sat.working() + + require.Equal(t, float32(0), reported) +} + +func TestSaturationMetric_IndexWraparound(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 100*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + for i := 0; i < 1024; i++ { + now = now.Add(25 * time.Millisecond) + + if i%2 == 0 { + require.NotPanics(t, sat.sleeping) + } else { + require.NotPanics(t, sat.working) + } + } +} + +func TestSaturationMetric_IncorrectUsage(t *testing.T) { + t.Run("calling sleeping() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 50*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling sleeping() consecutively should reset sleepBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => Working (10ms) + // +20ms | sleeping() | + // => [!] LOST [!] (10ms) + // +30ms | sleeping() | + // => Sleeping (10ms) + // +40ms | working() | + // => Working (10ms) + // +50ms | sleeping() | + // + // Total reportable time: 40ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) + + t.Run("calling working() consecutively", func(t *testing.T) { + sat := newSaturationMetric([]string{"metric"}, 30*time.Millisecond) + + now := time.Now() + sat.nowFn = func() time.Time { return now } + + var reported float32 + sat.reportFn = func(v float32) { reported = v } + + // Calling working() consecutively should reset workBegan without recording + // a sample, such that we "lose" time rather than recording nonsense data. + // + // 0 | sleeping() | + // => Sleeping (10ms) + // +10ms | working() | + // => [!] LOST [!] (10ms) + // +20ms | working() | + // => Working (10ms) + // +30ms | sleeping() | + // + // Total reportable time: 20ms. Saturation: 50%. + sat.sleeping() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.working() + now = now.Add(10 * time.Millisecond) + sat.sleeping() + + require.Equal(t, float32(0.5), reported) + }) +}