Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add a gauge to hold the last restore time that #454

Merged
merged 3 commits into from
Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,12 @@ func RecoverCluster(conf *Config, fsm FSM, logs LogStore, stable StableStore,
continue
}

// Note this is the one place we call fsm.Restore without the
// fsmRestoreAndMeasure wrapper since this function should only be called to
// reset state on disk and the FSM passed will not be used for a running
// server instance. If the same process will eventually become a Raft peer
// then it will call NewRaft and restore again from disk then which will
// report metrics.
err = fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
Expand Down Expand Up @@ -598,16 +604,16 @@ func (r *Raft) restoreSnapshot() error {
continue
}

err = r.fsm.Restore(source)
// Close the source after the restore has completed
source.Close()
if err != nil {
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
source.Close()
r.logger.Error("failed to restore snapshot", "id", snapshot.ID, "error", err)
continue
}
source.Close()

r.logger.Info("restored from snapshot", "id", snapshot.ID)
}

// Update the lastApplied so we don't replay old logs
r.setLastApplied(snapshot.Index)

Expand Down
21 changes: 16 additions & 5 deletions fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,16 +175,13 @@ func (r *Raft) runFSM() {
req.respond(fmt.Errorf("failed to open snapshot %v: %v", req.ID, err))
return
}
defer source.Close()

// Attempt to restore
start := time.Now()
if err := r.fsm.Restore(source); err != nil {
if err := fsmRestoreAndMeasure(r.fsm, source); err != nil {
req.respond(fmt.Errorf("failed to restore snapshot %v: %v", req.ID, err))
source.Close()
return
}
source.Close()
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)

// Update the last index and term
lastIndex = meta.Index
Expand Down Expand Up @@ -233,3 +230,17 @@ func (r *Raft) runFSM() {
}
}
}

// fsmRestoreAndMeasure wraps the Restore call on an FSM to consistently measure
// and report timing metrics. The caller is still responsible for calling Close
// on the source in all cases.
func fsmRestoreAndMeasure(fsm FSM, source io.ReadCloser) error {
start := time.Now()
if err := fsm.Restore(source); err != nil {
return err
}
metrics.MeasureSince([]string{"raft", "fsm", "restore"}, start)
metrics.SetGauge([]string{"raft", "fsm", "lastRestoreDuration"},
float32(time.Since(start).Milliseconds()))
return nil
}