Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add oldest log metric to leader #452

Merged
merged 3 commits into from
Apr 6, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions log.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package raft

import (
"time"

metrics "github.com/armon/go-metrics"
)

// LogType describes various types of log entries.
type LogType uint8

Expand Down Expand Up @@ -62,6 +68,19 @@ type Log struct {
// trouble, so gating extension behavior via some flag in the client
// program is also a good idea.
Extensions []byte

// AppendedAt stores the time the leader first appended this log to it's
// LogStore. Followers will observe the leader's time. It is not used for
// coordination or as part of the replication protocol at all. It exists only
// to provide operational information for example how many seconds worth of
// logs are present on the leader which might impact follower's ability to
// catch up after restoring a large snapshot. We should never rely on this
// being in the past when appending on a follower or reading a log back since
// the clock skew can mean a follower could see a log with a future timestamp.
// In general too the leader is not required to persist the log before
// delivering to followers although the current implementation happens to do
// this.
AppendedAt time.Time
}

// LogStore is used to provide an interface for storing
Expand All @@ -85,3 +104,52 @@ type LogStore interface {
// DeleteRange deletes a range of log entries. The range is inclusive.
DeleteRange(min, max uint64) error
}

func oldestLog(s LogStore) (Log, error) {
var l Log

// We might get unlucky and have a truncate right between getting first log
// index and fetching it so keep trying until we succeed or hard fail.
var lastFailIdx uint64
var lastErr error
for {
firstIdx, err := s.FirstIndex()
if err != nil {
return l, err
}
if firstIdx == 0 {
return l, ErrLogNotFound
}
if firstIdx == lastFailIdx {
// Got same index as last time around which errored, don't bother trying
// to fetch it again just return the error.
return l, lastErr
}
err = s.GetLog(firstIdx, &l)
if err == nil {
// We found the oldest log, break the loop
break
}
// We failed, keep trying to see if there is a new firstIndex
lastFailIdx = firstIdx
lastErr = err
}
return l, nil
}

func emitLogStoreMetrics(s LogStore, prefix []string, interval time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(interval):
// In error case emit 0 as the age
ageMs := float32(0.0)
l, err := oldestLog(s)
if err == nil && !l.AppendedAt.IsZero() {
ageMs = float32(time.Since(l.AppendedAt).Milliseconds())
}
metrics.SetGauge(append(prefix, "oldestLogAge"), ageMs)
case <-stopCh:
return
}
}
}
155 changes: 155 additions & 0 deletions log_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package raft

import (
"bytes"
"fmt"
"testing"
"time"

metrics "github.com/armon/go-metrics"
)

func TestOldestLog(t *testing.T) {
cases := []struct {
Name string
Logs []*Log
WantIdx uint64
WantErr bool
}{
{
Name: "empty logs",
Logs: nil,
WantIdx: 0,
WantErr: true,
},
{
Name: "simple case",
Logs: []*Log{
&Log{
Index: 1234,
Term: 1,
},
&Log{
Index: 1235,
Term: 1,
},
&Log{
Index: 1236,
Term: 2,
},
},
WantIdx: 1234,
WantErr: false,
},
}

for _, tc := range cases {
tc := tc
t.Run(tc.Name, func(t *testing.T) {
s := NewInmemStore()
if err := s.StoreLogs(tc.Logs); err != nil {
t.Fatalf("expected store logs not to fail: %s", err)
}

got, err := oldestLog(s)
switch {
case tc.WantErr && err == nil:
t.Fatalf("wanted error got nil")
case !tc.WantErr && err != nil:
t.Fatalf("wanted no error got: %s", err)
}

if got.Index != tc.WantIdx {
t.Fatalf("got index %v, want %v", got.Index, tc.WantIdx)
}
})
}
}

func testSetupMetrics(t *testing.T) *metrics.InmemSink {
// Record for ages (5 mins) so we can be confident that our assertions won't
// fail on silly long test runs due to dropped data.
s := metrics.NewInmemSink(10*time.Second, 300*time.Second)
cfg := metrics.DefaultConfig("raft.test")
cfg.EnableHostname = false
metrics.NewGlobal(cfg, s)
return s
}

func getCurrentGaugeValue(t *testing.T, sink *metrics.InmemSink, name string) float32 {
t.Helper()

data := sink.Data()

// Loop backward through intervals until there is a non-empty one
// Addresses flakiness around recording to one interval but accessing during the next
for i := len(data) - 1; i >= 0; i-- {
currentInterval := data[i]

currentInterval.RLock()
if gv, ok := currentInterval.Gauges[name]; ok {
currentInterval.RUnlock()
return gv.Value
}
currentInterval.RUnlock()
}

// Debug print all the gauges
buf := bytes.NewBuffer(nil)
for _, intv := range data {
intv.RLock()
for name, val := range intv.Gauges {
fmt.Fprintf(buf, "[%v][G] '%s': %0.3f\n", intv.Interval, name, val.Value)
}
intv.RUnlock()
}
t.Log(buf.String())

t.Fatalf("didn't find gauge %q", name)
return 0
}

func TestEmitsLogStoreMetrics(t *testing.T) {
sink := testSetupMetrics(t)

start := time.Now()

s := NewInmemStore()
logs := []*Log{
&Log{
Index: 1234,
Term: 1,
AppendedAt: time.Now(),
},
&Log{
Index: 1235,
Term: 1,
},
&Log{
Index: 1236,
Term: 2,
},
}
if err := s.StoreLogs(logs); err != nil {
t.Fatalf("expected store logs not to fail: %s", err)
}

stopCh := make(chan struct{})
defer close(stopCh)

go emitLogStoreMetrics(s, []string{"foo"}, time.Millisecond, stopCh)

// Wait for at least one interval
time.Sleep(5 * time.Millisecond)

got := getCurrentGaugeValue(t, sink, "raft.test.foo.oldestLogAge")

// Assert the age is in a reasonable range.
if got > float32(time.Since(start).Milliseconds()) {
t.Fatalf("max age before test start: %v", got)
}

if got < 1 {
t.Fatalf("max age less than interval: %v", got)
}
}
10 changes: 9 additions & 1 deletion raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
)

const (
minCheckInterval = 10 * time.Millisecond
minCheckInterval = 10 * time.Millisecond
oldestLogGaugeInterval = 10 * time.Second
)

var (
Expand Down Expand Up @@ -382,8 +383,14 @@ func (r *Raft) runLeader() {
// leaderloop.
r.setupLeaderState()

// Run a background go-routine to emit metrics on log age
stopCh := make(chan struct{})
go emitLogStoreMetrics(r.logs, []string{"raft", "leader"}, oldestLogGaugeInterval, stopCh)

// Cleanup state on step down
defer func() {
close(stopCh)

// Since we were the leader previously, we update our
// last contact time when we step down, so that we are not
// reporting a last contact time from before we were the
Expand Down Expand Up @@ -1080,6 +1087,7 @@ func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
lastIndex++
applyLog.log.Index = lastIndex
applyLog.log.Term = term
applyLog.log.AppendedAt = now
logs[idx] = &applyLog.log
r.leaderState.inflight.PushBack(applyLog)
}
Expand Down