From 689776333418f2cda4351ecb2bd42d0cf0fb13cd Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Fri, 26 Mar 2021 22:55:01 +0000 Subject: [PATCH 1/3] Add oldest log metric to leader --- log.go | 68 +++++++++++++++++++++++ log_test.go | 155 ++++++++++++++++++++++++++++++++++++++++++++++++++++ raft.go | 10 +++- 3 files changed, 232 insertions(+), 1 deletion(-) create mode 100644 log_test.go diff --git a/log.go b/log.go index ad3bf0f09..9238468fb 100644 --- a/log.go +++ b/log.go @@ -1,5 +1,11 @@ package raft +import ( + "time" + + metrics "github.com/armon/go-metrics" +) + // LogType describes various types of log entries. type LogType uint8 @@ -62,6 +68,19 @@ type Log struct { // trouble, so gating extension behavior via some flag in the client // program is also a good idea. Extensions []byte + + // AppendedAt stores the time the leader first appended this log to it's + // LogStore. Followers will observe the leader's time. It is not used for + // coordination or as part of the replication protocol at all. It exists only + // to provide operational information for example how many seconds worth of + // logs are present on the leader which might impact follower's ability to + // catch up after restoring a large snapshot. We should never rely on this + // being in the past when appending on a follower or reading a log back since + // the clock skew can mean a follower could see a log with a future timestamp. + // In general too the leader is not required to persist the log before + // delivering to followers although the current implementation happens to do + // this. + AppendedAt time.Time } // LogStore is used to provide an interface for storing @@ -85,3 +104,52 @@ type LogStore interface { // DeleteRange deletes a range of log entries. The range is inclusive. DeleteRange(min, max uint64) error } + +func oldestLog(s LogStore) (Log, error) { + var l Log + + // We might get unlucky and have a truncate right between getting first log + // index and fetching it so keep trying until we succeed or hard fail. + var lastFailIdx uint64 + var lastErr error + for { + firstIdx, err := s.FirstIndex() + if err != nil { + return l, err + } + if firstIdx == 0 { + return l, ErrLogNotFound + } + if firstIdx == lastFailIdx { + // Got same index as last time around which errored, don't bother trying + // to fetch it again just return the error. + return l, lastErr + } + err = s.GetLog(firstIdx, &l) + if err == nil { + // We found the oldest log, break the loop + break + } + // We failed, keep trying to see if there is a new firstIndex + lastFailIdx = firstIdx + lastErr = err + } + return l, nil +} + +func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) { + for { + select { + case <-time.After(interval): + // In error case emit 0 as the age + ageMs := float32(0.0) + l, err := oldestLog(s) + if err == nil && !l.AppendedAt.IsZero() { + ageMs = float32(time.Since(l.AppendedAt).Milliseconds()) + } + metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs) + case <-stopCh: + return + } + } +} diff --git a/log_test.go b/log_test.go new file mode 100644 index 000000000..3ce0990ac --- /dev/null +++ b/log_test.go @@ -0,0 +1,155 @@ +package raft + +import ( + "bytes" + "fmt" + "testing" + "time" + + metrics "github.com/armon/go-metrics" +) + +func TestOldestLog(t *testing.T) { + cases := []struct { + Name string + Logs []*Log + WantIdx uint64 + WantErr bool + }{ + { + Name: "empty logs", + Logs: nil, + WantIdx: 0, + WantErr: true, + }, + { + Name: "simple case", + Logs: []*Log{ + &Log{ + Index: 1234, + Term: 1, + }, + &Log{ + Index: 1235, + Term: 1, + }, + &Log{ + Index: 1236, + Term: 2, + }, + }, + WantIdx: 1234, + WantErr: false, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.Name, func(t *testing.T) { + s := NewInmemStore() + if err := s.StoreLogs(tc.Logs); err != nil { + t.Fatalf("expected store logs not to fail: %s", err) + } + + got, err := oldestLog(s) + switch { + case tc.WantErr && err == nil: + t.Fatalf("wanted error got nil") + case !tc.WantErr && err != nil: + t.Fatalf("wanted no error got: %s", err) + } + + if got.Index != tc.WantIdx { + t.Fatalf("got index %v, want %v", got.Index, tc.WantIdx) + } + }) + } +} + +func testSetupMetrics(t *testing.T) *metrics.InmemSink { + // Record for ages (5 mins) so we can be confident that our assertions won't + // fail on silly long test runs due to dropped data. + s := metrics.NewInmemSink(10*time.Second, 300*time.Second) + cfg := metrics.DefaultConfig("raft.test") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, s) + return s +} + +func getCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, name string) float32 { + t.Helper() + + data := sink.Data() + + // Loop backward through intervals until there is a non-empty one + // Addresses flakiness around recording to one interval but accessing during the next + for i := len(data) - 1; i >= 0; i-- { + currentInterval := data[i] + + currentInterval.RLock() + if gv, ok := currentInterval.Gauges[name]; ok { + currentInterval.RUnlock() + return gv.Value + } + currentInterval.RUnlock() + } + + // Debug print all the gauges + buf := bytes.NewBuffer(nil) + for _, intv := range data { + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) + } + intv.RUnlock() + } + t.Log(buf.String()) + + t.Fatalf("didn't find gauge %q", name) + return 0 +} + +func TestEmitsLogStoreMetrics(t *testing.T) { + sink := testSetupMetrics(t) + + start := time.Now() + + s := NewInmemStore() + logs := []*Log{ + &Log{ + Index: 1234, + Term: 1, + AppendedAt: time.Now(), + }, + &Log{ + Index: 1235, + Term: 1, + }, + &Log{ + Index: 1236, + Term: 2, + }, + } + if err := s.StoreLogs(logs); err != nil { + t.Fatalf("expected store logs not to fail: %s", err) + } + + stopCh := make(chan struct{}) + defer close(stopCh) + + go emitLogStoreMetrics(s, []string{"foo"}, time.Millisecond, stopCh) + + // Wait for at least one interval + time.Sleep(5 * time.Millisecond) + + got := getCurrentGaugeValue(t, sink, "raft.test.foo.oldestLogAge") + + // Assert the age is in a reasonable range. + if got > float32(time.Since(start).Milliseconds()) { + t.Fatalf("max age before test start: %v", got) + } + + if got < 1 { + t.Fatalf("max age less than interval: %v", got) + } +} diff --git a/raft.go b/raft.go index e12768fac..e9d2eb14f 100644 --- a/raft.go +++ b/raft.go @@ -15,7 +15,8 @@ import ( ) const ( - minCheckInterval = 10 * time.Millisecond + minCheckInterval = 10 * time.Millisecond + oldestLogGaugeInterval = 10 * time.Second ) var ( @@ -382,8 +383,14 @@ func (r *Raft) runLeader() { // leaderloop. r.setupLeaderState() + // Run a background go-routine to emit metrics on log age + stopCh := make(chan struct{}) + go emitLogStoreMetrics(r.logs, []string{"raft", "leader"}, oldestLogGaugeInterval, stopCh) + // Cleanup state on step down defer func() { + close(stopCh) + // Since we were the leader previously, we update our // last contact time when we step down, so that we are not // reporting a last contact time from before we were the @@ -1080,6 +1087,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { lastIndex++ applyLog.log.Index = lastIndex applyLog.log.Term = term + applyLog.log.AppendedAt = time.Now() logs[idx] = &applyLog.log r.leaderState.inflight.PushBack(applyLog) } From feaa535c9d1076ef3ffb8b022e859b6e6813c680 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Tue, 30 Mar 2021 15:18:31 +0100 Subject: [PATCH 2/3] Pull time.Now out of a tight loop --- raft.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft.go b/raft.go index e9d2eb14f..c57bda79c 100644 --- a/raft.go +++ b/raft.go @@ -1087,7 +1087,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) { lastIndex++ applyLog.log.Index = lastIndex applyLog.log.Term = term - applyLog.log.AppendedAt = time.Now() + applyLog.log.AppendedAt = now logs[idx] = &applyLog.log r.leaderState.inflight.PushBack(applyLog) } From e5de142e3919ae624be0036f18e15f3440757941 Mon Sep 17 00:00:00 2001 From: Paul Banks Date: Thu, 1 Apr 2021 12:21:45 +0100 Subject: [PATCH 3/3] Move test helpers --- log_test.go | 86 ++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/log_test.go b/log_test.go index 3ce0990ac..92a7da0c2 100644 --- a/log_test.go +++ b/log_test.go @@ -66,49 +66,6 @@ func TestOldestLog(t *testing.T) { } } -func testSetupMetrics(t *testing.T) *metrics.InmemSink { - // Record for ages (5 mins) so we can be confident that our assertions won't - // fail on silly long test runs due to dropped data. - s := metrics.NewInmemSink(10*time.Second, 300*time.Second) - cfg := metrics.DefaultConfig("raft.test") - cfg.EnableHostname = false - metrics.NewGlobal(cfg, s) - return s -} - -func getCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, name string) float32 { - t.Helper() - - data := sink.Data() - - // Loop backward through intervals until there is a non-empty one - // Addresses flakiness around recording to one interval but accessing during the next - for i := len(data) - 1; i >= 0; i-- { - currentInterval := data[i] - - currentInterval.RLock() - if gv, ok := currentInterval.Gauges[name]; ok { - currentInterval.RUnlock() - return gv.Value - } - currentInterval.RUnlock() - } - - // Debug print all the gauges - buf := bytes.NewBuffer(nil) - for _, intv := range data { - intv.RLock() - for name, val := range intv.Gauges { - fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) - } - intv.RUnlock() - } - t.Log(buf.String()) - - t.Fatalf("didn't find gauge %q", name) - return 0 -} - func TestEmitsLogStoreMetrics(t *testing.T) { sink := testSetupMetrics(t) @@ -153,3 +110,46 @@ func TestEmitsLogStoreMetrics(t *testing.T) { t.Fatalf("max age less than interval: %v", got) } } + +func testSetupMetrics(t *testing.T) *metrics.InmemSink { + // Record for ages (5 mins) so we can be confident that our assertions won't + // fail on silly long test runs due to dropped data. + s := metrics.NewInmemSink(10*time.Second, 300*time.Second) + cfg := metrics.DefaultConfig("raft.test") + cfg.EnableHostname = false + metrics.NewGlobal(cfg, s) + return s +} + +func getCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, name string) float32 { + t.Helper() + + data := sink.Data() + + // Loop backward through intervals until there is a non-empty one + // Addresses flakiness around recording to one interval but accessing during the next + for i := len(data) - 1; i >= 0; i-- { + currentInterval := data[i] + + currentInterval.RLock() + if gv, ok := currentInterval.Gauges[name]; ok { + currentInterval.RUnlock() + return gv.Value + } + currentInterval.RUnlock() + } + + // Debug print all the gauges + buf := bytes.NewBuffer(nil) + for _, intv := range data { + intv.RLock() + for name, val := range intv.Gauges { + fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value) + } + intv.RUnlock() + } + t.Log(buf.String()) + + t.Fatalf("didn't find gauge %q", name) + return 0 +}