From 96a06033e68975a9de88a4ff2bd3bd178b4bc68f Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Mon, 12 Feb 2024 21:43:38 -0800 Subject: [PATCH] fix(ewma): reduce the chances of fake bandwidth spikes (#8) * fix(ewma): reduce the chances of fake bandwidth spikes ignore updates that are less than 100ms. Otherwise, we could attribute a large amount of bandwidth to a very short period of time and get a _huge_ spike. * correctly handle non-monotonic clocks * fix: explain why we ignore small time deltas --- sweeper.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/sweeper.go b/sweeper.go index ec291ad..d2bfbdb 100644 --- a/sweeper.go +++ b/sweeper.go @@ -29,6 +29,9 @@ func SetClock(c clock.Clock) { cl = c } +// We tick every second. +var ewmaRate = time.Second + type sweeper struct { sweepOnce sync.Once @@ -62,7 +65,7 @@ func (sw *sweeper) register(m *Meter) { } func (sw *sweeper) runActive() { - ticker := cl.Ticker(time.Second) + ticker := cl.Ticker(ewmaRate) defer ticker.Stop() sw.lastUpdateTime = cl.Now() @@ -91,11 +94,29 @@ func (sw *sweeper) update() { now := cl.Now() tdiff := now.Sub(sw.lastUpdateTime) - if tdiff <= 0 { + if tdiff < 0 { + // we went back in time, skip this update. + // note: if we go _forward_ in time, we don't really care as + // we'll just log really low bandwidth for a second. + sw.lastUpdateTime = now + + // update the totals but leave the rates alone. + for _, m := range sw.meters { + m.snapshot.Total = atomic.LoadUint64(&m.accumulator) + } + return + } else if tdiff <= ewmaRate/10 { + // If the time-delta is too small, wait a bit. Otherwise, we can end up logging a + // very large spike. + // + // This won't fix the case where a user passes a large update (spanning multiple + // seconds) to `Meter.Mark`, but it will fix the case where the system fails to + // accurately schedule the sweeper goroutine. return } + sw.lastUpdateTime = now - timeMultiplier := float64(time.Second) / float64(tdiff) + timeMultiplier := float64(ewmaRate) / float64(tdiff) // Calculate the bandwidth for all active meters. for i, m := range sw.meters[:sw.activeMeters] {