Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

webtransport: use the rcmgr to control flow control window increases #1832

Merged
merged 2 commits into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 41 additions & 44 deletions core/network/rcmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,60 +271,57 @@ type ScopeStat struct {
}

// NullResourceManager is a stub for tests and initialization of default values
var NullResourceManager ResourceManager = &nullResourceManager{}

type nullResourceManager struct{}
type nullScope struct{}

var _ ResourceScope = (*nullScope)(nil)
var _ ResourceScopeSpan = (*nullScope)(nil)
var _ ServiceScope = (*nullScope)(nil)
var _ ProtocolScope = (*nullScope)(nil)
var _ PeerScope = (*nullScope)(nil)
var _ ConnManagementScope = (*nullScope)(nil)
var _ ConnScope = (*nullScope)(nil)
var _ StreamManagementScope = (*nullScope)(nil)
var _ StreamScope = (*nullScope)(nil)
type NullResourceManager struct{}

var _ ResourceScope = (*NullScope)(nil)
var _ ResourceScopeSpan = (*NullScope)(nil)
var _ ServiceScope = (*NullScope)(nil)
var _ ProtocolScope = (*NullScope)(nil)
var _ PeerScope = (*NullScope)(nil)
var _ ConnManagementScope = (*NullScope)(nil)
var _ ConnScope = (*NullScope)(nil)
var _ StreamManagementScope = (*NullScope)(nil)
var _ StreamScope = (*NullScope)(nil)

// NullScope is a stub for tests and initialization of default values
var NullScope = &nullScope{}
type NullScope struct{}

func (n *nullResourceManager) ViewSystem(f func(ResourceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewSystem(f func(ResourceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewTransient(f func(ResourceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewTransient(f func(ResourceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewService(svc string, f func(ServiceScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewService(svc string, f func(ServiceScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewProtocol(p protocol.ID, f func(ProtocolScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewProtocol(p protocol.ID, f func(ProtocolScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) ViewPeer(p peer.ID, f func(PeerScope) error) error {
return f(NullScope)
func (n *NullResourceManager) ViewPeer(p peer.ID, f func(PeerScope) error) error {
return f(&NullScope{})
}
func (n *nullResourceManager) OpenConnection(dir Direction, usefd bool, endpoint multiaddr.Multiaddr) (ConnManagementScope, error) {
return NullScope, nil
func (n *NullResourceManager) OpenConnection(dir Direction, usefd bool, endpoint multiaddr.Multiaddr) (ConnManagementScope, error) {
return &NullScope{}, nil
}
func (n *nullResourceManager) OpenStream(p peer.ID, dir Direction) (StreamManagementScope, error) {
return NullScope, nil
func (n *NullResourceManager) OpenStream(p peer.ID, dir Direction) (StreamManagementScope, error) {
return &NullScope{}, nil
}
func (n *nullResourceManager) Close() error {
func (n *NullResourceManager) Close() error {
return nil
}

func (n *nullScope) ReserveMemory(size int, prio uint8) error { return nil }
func (n *nullScope) ReleaseMemory(size int) {}
func (n *nullScope) Stat() ScopeStat { return ScopeStat{} }
func (n *nullScope) BeginSpan() (ResourceScopeSpan, error) { return NullScope, nil }
func (n *nullScope) Done() {}
func (n *nullScope) Name() string { return "" }
func (n *nullScope) Protocol() protocol.ID { return "" }
func (n *nullScope) Peer() peer.ID { return "" }
func (n *nullScope) PeerScope() PeerScope { return NullScope }
func (n *nullScope) SetPeer(peer.ID) error { return nil }
func (n *nullScope) ProtocolScope() ProtocolScope { return NullScope }
func (n *nullScope) SetProtocol(proto protocol.ID) error { return nil }
func (n *nullScope) ServiceScope() ServiceScope { return NullScope }
func (n *nullScope) SetService(srv string) error { return nil }
func (n *NullScope) ReserveMemory(size int, prio uint8) error { return nil }
func (n *NullScope) ReleaseMemory(size int) {}
func (n *NullScope) Stat() ScopeStat { return ScopeStat{} }
func (n *NullScope) BeginSpan() (ResourceScopeSpan, error) { return &NullScope{}, nil }
func (n *NullScope) Done() {}
func (n *NullScope) Name() string { return "" }
func (n *NullScope) Protocol() protocol.ID { return "" }
func (n *NullScope) Peer() peer.ID { return "" }
func (n *NullScope) PeerScope() PeerScope { return &NullScope{} }
func (n *NullScope) SetPeer(peer.ID) error { return nil }
func (n *NullScope) ProtocolScope() ProtocolScope { return &NullScope{} }
func (n *NullScope) SetProtocol(proto protocol.ID) error { return nil }
func (n *NullScope) ServiceScope() ServiceScope { return &NullScope{} }
func (n *NullScope) SetService(srv string) error { return nil }
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/libp2p/zeroconf/v2 v2.2.0
github.com/lucas-clemente/quic-go v0.30.0
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd
github.com/marten-seemann/webtransport-go v0.1.1
github.com/marten-seemann/webtransport-go v0.2.0
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b
github.com/minio/sha256-simd v1.0.0
github.com/mr-tron/base58 v1.2.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,8 @@ github.com/marten-seemann/qtls-go1-19 v0.1.1 h1:mnbxeq3oEyQxQXwI4ReCgW9DPoPR94sN
github.com/marten-seemann/qtls-go1-19 v0.1.1/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbdjrfs5JHrYb0wIJqGpI=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/marten-seemann/webtransport-go v0.1.1 h1:TnyKp3pEXcDooTaNn4s9dYpMJ7kMnTp7k5h+SgYP/mc=
github.com/marten-seemann/webtransport-go v0.1.1/go.mod h1:kBEh5+RSvOA4troP1vyOVBWK4MIMzDICXVrvCPrYcrM=
github.com/marten-seemann/webtransport-go v0.2.0 h1:987jPVqcyE3vF+CHNIxDhT0P21O+bI4fVF+0NoRujSo=
github.com/marten-seemann/webtransport-go v0.2.0/go.mod h1:XmnWYsWXaxUF7kjeIIzLWPyS+q0OcBY5vA64NuyK0ps=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,5 +189,5 @@ func (c *conn) Stat() network.ConnStats {
}

func (c *conn) Scope() network.ConnScope {
return network.NullScope
return &network.NullScope{}
}
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,5 +370,5 @@ func (pn *peernet) notifyAll(notification func(f network.Notifiee)) {
}

func (pn *peernet) ResourceManager() network.ResourceManager {
return network.NullResourceManager
return &network.NullResourceManager{}
}
2 changes: 1 addition & 1 deletion p2p/net/mock/mock_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (s *stream) transport() {
}

func (s *stream) Scope() network.StreamScope {
return network.NullScope
return &network.NullScope{}
}

func (s *stream) cancelWrite(err error) {
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/swarm/swarm.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewSwarm(local peer.ID, peers peerstore.Peerstore, opts ...Option) (*Swarm,
}
}
if s.rcmgr == nil {
s.rcmgr = network.NullResourceManager
s.rcmgr = &network.NullResourceManager{}
}

s.dsync = newDialSync(s.dialWorkerLoop)
Expand Down
4 changes: 2 additions & 2 deletions p2p/net/swarm/swarm_dial_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestAddrsForDial(t *testing.T) {
ps.AddPrivKey(id, priv)
t.Cleanup(func() { ps.Close() })

tpt, err := websocket.New(nil, network.NullResourceManager)
tpt, err := websocket.New(nil, &network.NullResourceManager{})
require.NoError(t, err)
s, err := NewSwarm(id, ps, WithMultiaddrResolver(resolver))
require.NoError(t, err)
Expand Down Expand Up @@ -81,7 +81,7 @@ func newTestSwarmWithResolver(t *testing.T, resolver *madns.Resolver) *Swarm {
})

// Add a tcp transport so that we know we can dial a tcp multiaddr and we don't filter it out.
tpt, err := tcp.NewTCPTransport(nil, network.NullResourceManager)
tpt, err := tcp.NewTCPTransport(nil, &network.NullResourceManager{})
require.NoError(t, err)
err = s.AddTransport(tpt)
require.NoError(t, err)
Expand Down
30 changes: 15 additions & 15 deletions p2p/net/upgrader/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestAcceptSingleConn(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

cconn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

sconn, err := ln.Accept()
Expand All @@ -80,7 +80,7 @@ func TestAcceptMultipleConns(t *testing.T) {
}()

for i := 0; i < 10; i++ {
cconn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
toClose = append(toClose, cconn)

Expand All @@ -104,7 +104,7 @@ func TestConnectionsClosedIfNotAccepted(t *testing.T) {
ln := createListener(t, u)
defer ln.Close()

conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)

errCh := make(chan error)
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestFailedUpgradeOnListen(t *testing.T) {
errCh <- err
}()

_, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
_, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)

// close the listener.
Expand Down Expand Up @@ -177,7 +177,7 @@ func TestListenerClose(t *testing.T) {
require.Contains(err.Error(), "use of closed network connection")

// doesn't accept new connections when it is closed
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), network.NullScope)
_, err = dial(t, u, ln.Multiaddr(), peer.ID("1"), &network.NullScope{})
require.Error(err)
}

Expand All @@ -189,7 +189,7 @@ func TestListenerCloseClosesQueued(t *testing.T) {

var conns []transport.CapableConn
for i := 0; i < 10; i++ {
conn, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
conns = append(conns, conn)
}
Expand Down Expand Up @@ -249,7 +249,7 @@ func TestConcurrentAccept(t *testing.T) {
go func() {
defer wg.Done()

conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
if err != nil {
errCh <- err
return
Expand Down Expand Up @@ -279,7 +279,7 @@ func TestAcceptQueueBacklogged(t *testing.T) {
// setup AcceptQueueLength connections, but don't accept any of them
var counter int32 // to be used atomically
doDial := func() {
conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
atomic.AddInt32(&counter, 1)
t.Cleanup(func() { conn.Close() })
Expand Down Expand Up @@ -315,36 +315,36 @@ func TestListenerConnectionGater(t *testing.T) {
defer ln.Close()

// no gating.
conn, err := dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()

// rejecting after handshake.
testGater.BlockSecured(true)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting on accept will trigger firupgrader.
testGater.BlockSecured(true)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// rejecting only on acceptance.
testGater.BlockSecured(false)
testGater.BlockAccept(true)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), "invalid", &network.NullScope{})
require.Error(err)
require.Nil(conn)

// back to normal
testGater.BlockSecured(false)
testGater.BlockAccept(false)
conn, err = dial(t, u, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, u, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.False(conn.IsClosed())
_ = conn.Close()
Expand All @@ -366,7 +366,7 @@ func TestListenerResourceManagement(t *testing.T) {
connScope.EXPECT().PeerScope(),
)

cconn, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
cconn, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(t, err)
defer cconn.Close()

Expand All @@ -384,7 +384,7 @@ func TestListenerResourceManagementDenied(t *testing.T) {
ln := createListener(t, upgrader)

rcmgr.EXPECT().OpenConnection(network.DirInbound, true, gomock.Not(ln.Multiaddr())).Return(nil, errors.New("nope"))
_, err := dial(t, upgrader, ln.Multiaddr(), id, network.NullScope)
_, err := dial(t, upgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(t, err)

done := make(chan struct{})
Expand Down
2 changes: 1 addition & 1 deletion p2p/net/upgrader/upgrader.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func New(secureMuxer sec.SecureMuxer, muxer network.Multiplexer, opts ...Option)
}
}
if u.rcmgr == nil {
u.rcmgr = network.NullResourceManager
u.rcmgr = &network.NullResourceManager{}
}
return u, nil
}
Expand Down
10 changes: 5 additions & 5 deletions p2p/net/upgrader/upgrader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,21 +121,21 @@ func TestOutboundConnectionGating(t *testing.T) {

testGater := &testGater{}
_, dialUpgrader := createUpgrader(t, upgrader.WithConnectionGater(testGater))
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// blocking accepts doesn't affect the dialling side, only the listener.
testGater.BlockAccept(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.NoError(err)
require.NotNil(conn)
_ = conn.Close()

// now let's block all connections after being secured.
testGater.BlockSecured(true)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, network.NullScope)
conn, err = dial(t, dialUpgrader, ln.Multiaddr(), id, &network.NullScope{})
require.Error(err)
require.Contains(err.Error(), "gater rejected connection")
require.Nil(conn)
Expand All @@ -153,7 +153,7 @@ func TestOutboundResourceManagement(t *testing.T) {
gomock.InOrder(
connScope.EXPECT().PeerScope(),
connScope.EXPECT().SetPeer(id),
connScope.EXPECT().PeerScope().Return(network.NullScope),
connScope.EXPECT().PeerScope().Return(&network.NullScope{}),
)
_, dialUpgrader := createUpgrader(t)
conn, err := dial(t, dialUpgrader, ln.Multiaddr(), id, connScope)
Expand All @@ -174,7 +174,7 @@ func TestOutboundResourceManagement(t *testing.T) {
gomock.InOrder(
connScope.EXPECT().PeerScope(),
connScope.EXPECT().SetPeer(id),
connScope.EXPECT().PeerScope().Return(network.NullScope),
connScope.EXPECT().PeerScope().Return(&network.NullScope{}),
connScope.EXPECT().Done(),
)
_, dialUpgrader := createUpgrader(t)
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/circuitv2/client/reservation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func TestReservationFailures(t *testing.T) {
host.SetStreamHandler(proto.ProtoIDv2Hop, tc.streamHandler)
}

cl, err := libp2p.New(libp2p.ResourceManager(network.NullResourceManager))
cl, err := libp2p.New(libp2p.ResourceManager(&network.NullResourceManager{}))
require.NoError(t, err)
defer cl.Close()
_, err = client.Reserve(context.Background(), cl, peer.AddrInfo{ID: host.ID(), Addrs: host.Addrs()})
Expand Down
2 changes: 1 addition & 1 deletion p2p/protocol/internal/circuitv1-deprecated/dial.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (d *RelayTransport) Dial(ctx context.Context, a ma.Multiaddr, p peer.ID) (t
return nil, err
}
c.tagHop()
scope, _ := network.NullResourceManager.OpenConnection(network.DirOutbound, false, a)
scope, _ := (&network.NullResourceManager{}).OpenConnection(network.DirOutbound, false, a)
return d.upgrader.Upgrade(ctx, d, c, network.DirOutbound, p, scope)
}

Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/quic/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func NewTransport(key ic.PrivKey, psk pnet.PSK, gater connmgr.ConnectionGater, r
return nil, err
}
if rcmgr == nil {
rcmgr = network.NullResourceManager
rcmgr = &network.NullResourceManager{}
}
qconfig := quicConfig.Clone()
keyBytes, err := key.Raw()
Expand Down
2 changes: 1 addition & 1 deletion p2p/transport/tcp/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ transport.Transport = &TcpTransport{}
// created. It represents an entire TCP stack (though it might not necessarily be).
func NewTCPTransport(upgrader transport.Upgrader, rcmgr network.ResourceManager, opts ...Option) (*TcpTransport, error) {
if rcmgr == nil {
rcmgr = network.NullResourceManager
rcmgr = &network.NullResourceManager{}
}
tr := &TcpTransport{
upgrader: upgrader,
Expand Down
Loading