From 5541298b83d714971b9a1e63219fbd68d60aa98d Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Wed, 24 May 2023 15:37:11 -0700 Subject: [PATCH] quic: add packet pacer The pacer rate-limits the transmission of packets to avoid creating bursts that may cause short-term congestion or loss. See RFC 9002, Section 7.7. For golang/go#58547 Change-Id: I75285c194a1048f988e4d5a829602d199829669d Reviewed-on: https://go-review.googlesource.com/c/net/+/499287 Run-TryBot: Damien Neil TryBot-Result: Gopher Robot Reviewed-by: Jonathan Amsterdam --- internal/quic/pacer.go | 131 +++++++++++++++++++++ internal/quic/pacer_test.go | 224 ++++++++++++++++++++++++++++++++++++ 2 files changed, 355 insertions(+) create mode 100644 internal/quic/pacer.go create mode 100644 internal/quic/pacer_test.go diff --git a/internal/quic/pacer.go b/internal/quic/pacer.go new file mode 100644 index 000000000..bcba76936 --- /dev/null +++ b/internal/quic/pacer.go @@ -0,0 +1,131 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "time" +) + +// A pacerState controls the rate at which packets are sent using a leaky-bucket rate limiter. +// +// The pacer limits the maximum size of a burst of packets. +// When a burst exceeds this limit, it spreads subsequent packets +// over time. +// +// The bucket is initialized to the maximum burst size (ten packets by default), +// and fills at the rate: +// +// 1.25 * congestion_window / smoothed_rtt +// +// A sender can send one congestion window of packets per RTT, +// since the congestion window consumed by each packet is returned +// one round-trip later by the responding ack. +// The pacer permits sending at slightly faster than this rate to +// avoid underutilizing the congestion window. +// +// The pacer permits the bucket to become negative, and permits +// sending when non-negative. This biases slightly in favor of +// sending packets over limiting them, and permits bursts one +// packet greater than the configured maximum, but permits the pacer +// to be ignorant of the maximum packet size. +// +// https://www.rfc-editor.org/rfc/rfc9002.html#section-7.7 +type pacerState struct { + bucket int // measured in bytes + maxBucket int + timerGranularity time.Duration + lastUpdate time.Time + nextSend time.Time +} + +func (p *pacerState) init(now time.Time, maxBurst int, timerGranularity time.Duration) { + // Bucket is limited to maximum burst size, which is the initial congestion window. + // https://www.rfc-editor.org/rfc/rfc9002#section-7.7-2 + p.maxBucket = maxBurst + p.bucket = p.maxBucket + p.timerGranularity = timerGranularity + p.lastUpdate = now + p.nextSend = now +} + +// pacerBytesForInterval returns the number of bytes permitted over an interval. +// +// rate = 1.25 * congestion_window / smoothed_rtt +// bytes = interval * rate +// +// https://www.rfc-editor.org/rfc/rfc9002#section-7.7-6 +func pacerBytesForInterval(interval time.Duration, congestionWindow int, rtt time.Duration) int { + bytes := (int64(interval) * int64(congestionWindow)) / int64(rtt) + bytes = (bytes * 5) / 4 // bytes *= 1.25 + return int(bytes) +} + +// pacerIntervalForBytes returns the amount of time required for a number of bytes. +// +// time_per_byte = (smoothed_rtt / congestion_window) / 1.25 +// interval = time_per_byte * bytes +// +// https://www.rfc-editor.org/rfc/rfc9002#section-7.7-8 +func pacerIntervalForBytes(bytes int, congestionWindow int, rtt time.Duration) time.Duration { + interval := (int64(rtt) * int64(bytes)) / int64(congestionWindow) + interval = (interval * 4) / 5 // interval /= 1.25 + return time.Duration(interval) +} + +// advance is called when time passes. +func (p *pacerState) advance(now time.Time, congestionWindow int, rtt time.Duration) { + elapsed := now.Sub(p.lastUpdate) + if elapsed < 0 { + // Time has gone backward? + elapsed = 0 + p.nextSend = now // allow a packet through to get back on track + if p.bucket < 0 { + p.bucket = 0 + } + } + p.lastUpdate = now + if rtt == 0 { + // Avoid divide by zero in the implausible case that we measure no RTT. + p.bucket = p.maxBucket + return + } + // Refill the bucket. + delta := pacerBytesForInterval(elapsed, congestionWindow, rtt) + p.bucket = min(p.bucket+delta, p.maxBucket) +} + +// packetSent is called to record transmission of a packet. +func (p *pacerState) packetSent(now time.Time, size, congestionWindow int, rtt time.Duration) { + p.bucket -= size + if p.bucket < -congestionWindow { + // Never allow the bucket to fall more than one congestion window in arrears. + // We can only fall this far behind if the sender is sending unpaced packets, + // the congestion window has been exceeded, or the RTT is less than the + // timer granularity. + // + // Limiting the minimum bucket size limits the maximum pacer delay + // to RTT/1.25. + p.bucket = -congestionWindow + } + if p.bucket >= 0 { + p.nextSend = now + return + } + // Next send occurs when the bucket has refilled to 0. + delay := pacerIntervalForBytes(-p.bucket, congestionWindow, rtt) + p.nextSend = now.Add(delay) +} + +// canSend reports whether a packet can be sent now. +// If it returns false, next is the time when the next packet can be sent. +func (p *pacerState) canSend(now time.Time) (canSend bool, next time.Time) { + // If the next send time is within the timer granularity, send immediately. + if p.nextSend.After(now.Add(p.timerGranularity)) { + return false, p.nextSend + } + return true, time.Time{} +} diff --git a/internal/quic/pacer_test.go b/internal/quic/pacer_test.go new file mode 100644 index 000000000..9c69da038 --- /dev/null +++ b/internal/quic/pacer_test.go @@ -0,0 +1,224 @@ +// Copyright 2023 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +//go:build go1.21 + +package quic + +import ( + "testing" + "time" +) + +func TestPacerStartup(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 100 * time.Millisecond, + timerGranularity: 1 * time.Millisecond, + } + p.init(t) + t.Logf("# initial burst permits sending ten packets") + for i := 0; i < 10; i++ { + p.sendPacket(1000) + } + + t.Logf("# empty bucket allows for one more packet") + p.sendPacket(1000) + + t.Logf("# sending 1000 byte packets with 8ms interval:") + t.Logf("# (smoothed_rtt * packet_size / congestion_window) / 1.25") + t.Logf("# (100ms * 1000 / 10000) / 1.25 = 8ms") + p.wantSendDelay(8 * time.Millisecond) + p.advance(8 * time.Millisecond) + p.sendPacket(1000) + p.wantSendDelay(8 * time.Millisecond) + + t.Logf("# accumulate enough window for two packets") + p.advance(16 * time.Millisecond) + p.sendPacket(1000) + p.sendPacket(1000) + p.wantSendDelay(8 * time.Millisecond) + + t.Logf("# window does not grow to more than burst limit") + p.advance(1 * time.Second) + for i := 0; i < 11; i++ { + p.sendPacket(1000) + } + p.wantSendDelay(8 * time.Millisecond) +} + +func TestPacerTimerGranularity(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 100 * time.Millisecond, + timerGranularity: 1 * time.Millisecond, + } + p.init(t) + t.Logf("# consume initial burst") + for i := 0; i < 11; i++ { + p.sendPacket(1000) + } + p.wantSendDelay(8 * time.Millisecond) + + t.Logf("# small advance in time does not permit sending") + p.advance(4 * time.Millisecond) + p.wantSendDelay(4 * time.Millisecond) + + t.Logf("# advancing to within timerGranularity of next send permits send") + p.advance(3 * time.Millisecond) + p.wantSendDelay(0) + + t.Logf("# early send adds skipped delay (1ms) to next send (8ms)") + p.sendPacket(1000) + p.wantSendDelay(9 * time.Millisecond) +} + +func TestPacerChangingRate(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 100 * time.Millisecond, + timerGranularity: 0, + } + p.init(t) + t.Logf("# consume initial burst") + for i := 0; i < 11; i++ { + p.sendPacket(1000) + } + p.wantSendDelay(8 * time.Millisecond) + p.advance(8 * time.Millisecond) + + t.Logf("# set congestion window to 20000, 1000 byte interval is 4ms") + p.cwnd = 20000 + p.sendPacket(1000) + p.wantSendDelay(4 * time.Millisecond) + p.advance(4 * time.Millisecond) + + t.Logf("# set rtt to 200ms, 1000 byte interval is 8ms") + p.rtt = 200 * time.Millisecond + p.sendPacket(1000) + p.wantSendDelay(8 * time.Millisecond) + p.advance(8 * time.Millisecond) + + t.Logf("# set congestion window to 40000, 1000 byte interval is 4ms") + p.cwnd = 40000 + p.advance(8 * time.Millisecond) + p.sendPacket(1000) + p.sendPacket(1000) + p.sendPacket(1000) + p.wantSendDelay(4 * time.Millisecond) +} + +func TestPacerTimeReverses(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 100 * time.Millisecond, + timerGranularity: 0, + } + p.init(t) + t.Logf("# consume initial burst") + for i := 0; i < 11; i++ { + p.sendPacket(1000) + } + p.wantSendDelay(8 * time.Millisecond) + t.Logf("# reverse time") + p.advance(-4 * time.Millisecond) + p.sendPacket(1000) + p.wantSendDelay(8 * time.Millisecond) + p.advance(8 * time.Millisecond) + p.sendPacket(1000) + p.wantSendDelay(8 * time.Millisecond) +} + +func TestPacerZeroRTT(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 0, + timerGranularity: 0, + } + p.init(t) + t.Logf("# with rtt 0, the pacer does not limit sending") + for i := 0; i < 20; i++ { + p.sendPacket(1000) + } + p.advance(1 * time.Second) + for i := 0; i < 20; i++ { + p.sendPacket(1000) + } +} + +func TestPacerZeroCongestionWindow(t *testing.T) { + p := &pacerTest{ + cwnd: 10000, + rtt: 100 * time.Millisecond, + timerGranularity: 0, + } + p.init(t) + p.cwnd = 0 + t.Logf("# with cwnd 0, the pacer does not limit sending") + for i := 0; i < 20; i++ { + p.sendPacket(1000) + } +} + +type pacerTest struct { + t *testing.T + p pacerState + timerGranularity time.Duration + cwnd int + rtt time.Duration + now time.Time +} + +func newPacerTest(t *testing.T, congestionWindow int, rtt time.Duration) *pacerTest { + p := &pacerTest{ + now: time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC), + cwnd: congestionWindow, + rtt: rtt, + } + p.p.init(p.now, congestionWindow, p.timerGranularity) + return p +} + +func (p *pacerTest) init(t *testing.T) { + p.t = t + p.now = time.Date(2000, 1, 1, 0, 0, 0, 0, time.UTC) + p.p.init(p.now, p.cwnd, p.timerGranularity) + t.Logf("# initial congestion window: %v", p.cwnd) + t.Logf("# timer granularity: %v", p.timerGranularity) +} + +func (p *pacerTest) advance(d time.Duration) { + p.t.Logf("advance time %v", d) + p.now = p.now.Add(d) + p.p.advance(p.now, p.cwnd, p.rtt) +} + +func (p *pacerTest) sendPacket(size int) { + if canSend, next := p.p.canSend(p.now); !canSend { + p.t.Fatalf("ERROR: pacer unexpectedly blocked send, delay=%v", next.Sub(p.now)) + } + p.t.Logf("send packet of size %v", size) + p.p.packetSent(p.now, size, p.cwnd, p.rtt) +} + +func (p *pacerTest) wantSendDelay(want time.Duration) { + wantCanSend := want == 0 + gotCanSend, next := p.p.canSend(p.now) + var got time.Duration + if !gotCanSend { + got = next.Sub(p.now) + } + p.t.Logf("# pacer send delay: %v", got) + if got != want || gotCanSend != wantCanSend { + p.t.Fatalf("ERROR: pacer send delay = %v (can send: %v); want %v, %v", got, gotCanSend, want, wantCanSend) + } +} + +func (p *pacerTest) sendDelay() time.Duration { + canSend, next := p.p.canSend(p.now) + if canSend { + return 0 + } + return next.Sub(p.now) +}