From fc3ba772a67798314b8fe54f1e4d2976e4c527f0 Mon Sep 17 00:00:00 2001 From: Johnny Graettinger Date: Tue, 27 Jun 2023 06:33:59 +0000 Subject: [PATCH] broker: allow further banking of unused flow-control credit Allow up to ten seconds of unused flow-control credit to be banked against a future burst of traffic. This is in service of workloads where clients want to write short burts of data and then go away for a while. --- broker/append_flow_control.go | 6 +++++- broker/append_flow_control_test.go | 6 +++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/broker/append_flow_control.go b/broker/append_flow_control.go index da1f3a3b..2e0a2066 100644 --- a/broker/append_flow_control.go +++ b/broker/append_flow_control.go @@ -35,6 +35,10 @@ var ( flowControlBurstFactor = time.Second // flowControlQuantum is the time quantum with which flow control is evaluated. flowControlQuantum = time.Millisecond * 50 + // How many multiples of unused MaxAppendRate credit may be banked against + // a future burst of data? We want some flexibility for spiky workloads + // where clients send short bursts of data and then go away for a while. + flowControlBankFactor int64 = 10 ) type appendFlowControl struct { @@ -151,7 +155,7 @@ func (fc *appendFlowControl) onTick(millis int64) error { // Add |d| interval bytes to |balance|, capping at |maxRate|. var d = fc.maxRate * (millis - fc.lastMillis) / 1e3 // Millis => seconds. - fc.balance = min64(fc.balance+d, fc.maxRate) + fc.balance = min64(fc.balance+d, fc.maxRate*flowControlBankFactor) // Deduct |d| interval bytes from |spent|. d = fc.minRate * (millis - fc.lastMillis) / 1e3 diff --git a/broker/append_flow_control_test.go b/broker/append_flow_control_test.go index 7f5d1d75..943fad2c 100644 --- a/broker/append_flow_control_test.go +++ b/broker/append_flow_control_test.go @@ -55,19 +55,19 @@ func TestAppendFlowCallbackCases(t *testing.T) { // because we didn't have an opportunity to read another chunk (we were // waiting on the delayed ticker to discharge the last one). require.NoError(t, fc.onTick(fc.lastMillis+100000)) - expect(1e4-10, fc.balance) + expect(1e4*flowControlBankFactor-10, fc.balance) expect(10, fc.spent) expect(0, fc.charge) // A small chunk proceeds immediately. fc.onChunk(40) - expect(1e4-50, fc.balance) + expect(1e4*flowControlBankFactor-50, fc.balance) expect(50, fc.spent) expect(0, fc.charge) // Another modest tick. This time, we do underflow. require.Equal(t, ErrFlowControlUnderflow, fc.onTick(fc.lastMillis+51)) - expect(1e4, fc.balance) + expect(1e4*flowControlBankFactor, fc.balance) expect(0, fc.spent) expect(0, fc.charge)