Skip to content

Commit

Permalink
broker: allow further banking of unused flow-control credit
Browse files Browse the repository at this point in the history
Allow up to ten seconds of unused flow-control credit to be banked
against a future burst of traffic.

This is in service of workloads where clients want to write short
burts of data and then go away for a while.
  • Loading branch information
jgraettinger committed Jun 27, 2023
1 parent 3826e46 commit fc3ba77
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 4 deletions.
6 changes: 5 additions & 1 deletion broker/append_flow_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ var (
flowControlBurstFactor = time.Second
// flowControlQuantum is the time quantum with which flow control is evaluated.
flowControlQuantum = time.Millisecond * 50
// How many multiples of unused MaxAppendRate credit may be banked against
// a future burst of data? We want some flexibility for spiky workloads
// where clients send short bursts of data and then go away for a while.
flowControlBankFactor int64 = 10
)

type appendFlowControl struct {
Expand Down Expand Up @@ -151,7 +155,7 @@ func (fc *appendFlowControl) onTick(millis int64) error {

// Add |d| interval bytes to |balance|, capping at |maxRate|.
var d = fc.maxRate * (millis - fc.lastMillis) / 1e3 // Millis => seconds.
fc.balance = min64(fc.balance+d, fc.maxRate)
fc.balance = min64(fc.balance+d, fc.maxRate*flowControlBankFactor)

// Deduct |d| interval bytes from |spent|.
d = fc.minRate * (millis - fc.lastMillis) / 1e3
Expand Down
6 changes: 3 additions & 3 deletions broker/append_flow_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,19 @@ func TestAppendFlowCallbackCases(t *testing.T) {
// because we didn't have an opportunity to read another chunk (we were
// waiting on the delayed ticker to discharge the last one).
require.NoError(t, fc.onTick(fc.lastMillis+100000))
expect(1e4-10, fc.balance)
expect(1e4*flowControlBankFactor-10, fc.balance)
expect(10, fc.spent)
expect(0, fc.charge)

// A small chunk proceeds immediately.
fc.onChunk(40)
expect(1e4-50, fc.balance)
expect(1e4*flowControlBankFactor-50, fc.balance)
expect(50, fc.spent)
expect(0, fc.charge)

// Another modest tick. This time, we do underflow.
require.Equal(t, ErrFlowControlUnderflow, fc.onTick(fc.lastMillis+51))
expect(1e4, fc.balance)
expect(1e4*flowControlBankFactor, fc.balance)
expect(0, fc.spent)
expect(0, fc.charge)

Expand Down

0 comments on commit fc3ba77

Please # to comment.