From 3b0ab984dd641d155428ec791d7108be9628c20e Mon Sep 17 00:00:00 2001 From: Damien Neil Date: Wed, 20 Sep 2023 14:13:20 -0700 Subject: [PATCH] quic: avoid deadlock on listener close Avoid holding Listener.connsMu while blocking on a Conn's loop, since the Conn can acquire the mutex while shutting down. Fix Conn.waitReady to check conn readiness before checking the Context doneness. This doesn't make a difference in the current exported API, but this simplifies some tests and will be useful once 0-RTT is implemented. Refactor a bit of the testConn datagram handling to use a testListener type, which helped expose the above deadlock and will be useful for writing tests which don't involve a Conn. Change-Id: I064fca99ae9a165631fc0ff46eb334d25d7dd957 Reviewed-on: https://go-review.googlesource.com/c/net/+/529935 LUCI-TryBot-Result: Go LUCI Reviewed-by: Jonathan Amsterdam --- internal/quic/conn_close.go | 20 +++++++-- internal/quic/conn_test.go | 41 ++++--------------- internal/quic/listener.go | 2 +- internal/quic/listener_test.go | 75 ++++++++++++++++++++++++++++++++++ 4 files changed, 100 insertions(+), 38 deletions(-) diff --git a/internal/quic/conn_close.go b/internal/quic/conn_close.go index ec0b7a327..b8b86fd6f 100644 --- a/internal/quic/conn_close.go +++ b/internal/quic/conn_close.go @@ -168,6 +168,13 @@ func (c *Conn) enterDraining(err error) { } func (c *Conn) waitReady(ctx context.Context) error { + select { + case <-c.lifetime.readyc: + return nil + case <-c.lifetime.drainingc: + return c.lifetime.finalErr + default: + } select { case <-c.lifetime.readyc: return nil @@ -215,7 +222,7 @@ func (c *Conn) Abort(err error) { if err == nil { err = localTransportError(errNo) } - c.runOnLoop(func(now time.Time, c *Conn) { + c.sendMsg(func(now time.Time, c *Conn) { c.abort(now, err) }) } @@ -228,11 +235,18 @@ func (c *Conn) abort(now time.Time, err error) { c.lifetime.localErr = err } +// abortImmediately terminates a connection. +// The connection does not send a CONNECTION_CLOSE, and skips the draining period. +func (c *Conn) abortImmediately(now time.Time, err error) { + c.abort(now, err) + c.enterDraining(err) + c.exited = true +} + // exit fully terminates a connection immediately. func (c *Conn) exit() { - c.runOnLoop(func(now time.Time, c *Conn) { + c.sendMsg(func(now time.Time, c *Conn) { c.enterDraining(errors.New("connection closed")) c.exited = true }) - <-c.donec } diff --git a/internal/quic/conn_test.go b/internal/quic/conn_test.go index ac0543b1e..d75b2eb69 100644 --- a/internal/quic/conn_test.go +++ b/internal/quic/conn_test.go @@ -13,9 +13,7 @@ import ( "errors" "flag" "fmt" - "io" "math" - "net" "net/netip" "reflect" "strings" @@ -112,7 +110,7 @@ const maxTestKeyPhases = 3 type testConn struct { t *testing.T conn *Conn - listener *Listener + listener *testListener now time.Time timer time.Time timerLastFired time.Time @@ -231,8 +229,8 @@ func newTestConn(t *testing.T, side connSide, opts ...any) *testConn { tc.peerTLSConn.SetTransportParameters(marshalTransportParameters(peerProvidedParams)) tc.peerTLSConn.Start(context.Background()) - tc.listener = newListener((*testConnUDPConn)(tc), config, (*testConnHooks)(tc)) - conn, err := tc.listener.newConn( + tc.listener = newTestListener(t, config, (*testConnHooks)(tc)) + conn, err := tc.listener.l.newConn( tc.now, side, initialConnID, @@ -335,7 +333,7 @@ func (tc *testConn) cleanup() { return } tc.conn.exit() - tc.listener.Close(context.Background()) + <-tc.conn.donec } func (tc *testConn) logDatagram(text string, d *testDatagram) { @@ -388,6 +386,7 @@ func (tc *testConn) write(d *testDatagram) { for len(buf) < d.paddedSize { buf = append(buf, 0) } + // TODO: This should use tc.listener.write. tc.conn.sendMsg(&datagram{ b: buf, }) @@ -457,11 +456,10 @@ func (tc *testConn) readDatagram() *testDatagram { tc.wait() tc.sentPackets = nil tc.sentFrames = nil - if len(tc.sentDatagrams) == 0 { + buf := tc.listener.read() + if buf == nil { return nil } - buf := tc.sentDatagrams[0] - tc.sentDatagrams = tc.sentDatagrams[1:] d := tc.parseTestDatagram(buf) // Log the datagram before removing ignored frames. // When things go wrong, it's useful to see all the frames. @@ -982,31 +980,6 @@ func testPeerConnID(seq int64) []byte { return []byte{0xbe, 0xee, 0xff, byte(seq)} } -// testConnUDPConn implements UDPConn. -type testConnUDPConn testConn - -func (tc *testConnUDPConn) Close() error { - close(tc.recvDatagram) - return nil -} - -func (tc *testConnUDPConn) LocalAddr() net.Addr { - return net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:443")) -} - -func (tc *testConnUDPConn) ReadMsgUDPAddrPort(b, control []byte) (n, controln, flags int, _ netip.AddrPort, _ error) { - for d := range tc.recvDatagram { - n = copy(b, d.b) - return n, 0, 0, d.addr, nil - } - return 0, 0, 0, netip.AddrPort{}, io.EOF -} - -func (tc *testConnUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { - tc.sentDatagrams = append(tc.sentDatagrams, append([]byte(nil), b...)) - return len(b), nil -} - // canceledContext returns a canceled Context. // // Functions which take a context preference progress over cancelation. diff --git a/internal/quic/listener.go b/internal/quic/listener.go index a84286e89..03d8ec65f 100644 --- a/internal/quic/listener.go +++ b/internal/quic/listener.go @@ -104,7 +104,7 @@ func (l *Listener) Close(ctx context.Context) error { if !l.closing { l.closing = true for c := range l.conns { - c.Close() + c.Abort(errors.New("listener closed")) } if len(l.conns) == 0 { l.udpConn.Close() diff --git a/internal/quic/listener_test.go b/internal/quic/listener_test.go index a6e0b3464..9d0f314ec 100644 --- a/internal/quic/listener_test.go +++ b/internal/quic/listener_test.go @@ -10,6 +10,8 @@ import ( "bytes" "context" "io" + "net" + "net/netip" "testing" ) @@ -86,3 +88,76 @@ func newLocalListener(t *testing.T, side connSide, conf *Config) *Listener { }) return l } + +type testListener struct { + t *testing.T + l *Listener + recvc chan *datagram + idlec chan struct{} + sentDatagrams [][]byte +} + +func newTestListener(t *testing.T, config *Config, testHooks connTestHooks) *testListener { + tl := &testListener{ + t: t, + recvc: make(chan *datagram), + idlec: make(chan struct{}), + } + tl.l = newListener((*testListenerUDPConn)(tl), config, testHooks) + t.Cleanup(tl.cleanup) + return tl +} + +func (tl *testListener) cleanup() { + tl.l.Close(canceledContext()) +} + +func (tl *testListener) wait() { + tl.idlec <- struct{}{} +} + +func (tl *testListener) write(d *datagram) { + tl.recvc <- d + tl.wait() +} + +func (tl *testListener) read() []byte { + tl.wait() + if len(tl.sentDatagrams) == 0 { + return nil + } + d := tl.sentDatagrams[0] + tl.sentDatagrams = tl.sentDatagrams[1:] + return d +} + +// testListenerUDPConn implements UDPConn. +type testListenerUDPConn testListener + +func (tl *testListenerUDPConn) Close() error { + close(tl.recvc) + return nil +} + +func (tl *testListenerUDPConn) LocalAddr() net.Addr { + return net.UDPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:443")) +} + +func (tl *testListenerUDPConn) ReadMsgUDPAddrPort(b, control []byte) (n, controln, flags int, _ netip.AddrPort, _ error) { + for { + select { + case d, ok := <-tl.recvc: + if !ok { + return 0, 0, 0, netip.AddrPort{}, io.EOF + } + n = copy(b, d.b) + return n, 0, 0, d.addr, nil + case <-tl.idlec: + } + } +} + +func (tl *testListenerUDPConn) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { + tl.sentDatagrams = append(tl.sentDatagrams, append([]byte(nil), b...)) + return len(b), nil +}