From 23fcaa7cb887a072b1f169269979b2f50a6e9052 Mon Sep 17 00:00:00 2001 From: Ian Davis Date: Tue, 4 May 2021 16:19:40 +0100 Subject: [PATCH] Fix hang in BackoffDiscovery.FindPeers when requesting limit lower than number of peers available --- backoffcache.go | 21 ++++++++----- backoffcache_test.go | 71 ++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 14 deletions(-) diff --git a/backoffcache.go b/backoffcache.go index 0e255e2..1ec71b8 100644 --- a/backoffcache.go +++ b/backoffcache.go @@ -68,15 +68,15 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption { } type backoffCache struct { + // strat is assigned on creation and not written to + strat BackoffStrategy + + mux sync.Mutex // guards writes to all following fields nextDiscover time.Time prevPeers map[peer.ID]peer.AddrInfo - - peers map[peer.ID]peer.AddrInfo - sendingChs map[chan peer.AddrInfo]int - - ongoing bool - strat BackoffStrategy - mux sync.Mutex + peers map[peer.ID]peer.AddrInfo + sendingChs map[chan peer.AddrInfo]int + ongoing bool } func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) { @@ -112,6 +112,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis sendingChs: make(map[chan peer.AddrInfo]int), strat: d.stratFactory(), } + d.peerCacheMux.Lock() c, ok = d.peerCache[ns] @@ -139,7 +140,11 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis } pch := make(chan peer.AddrInfo, chLen) for _, ai := range c.prevPeers { - pch <- ai + select { + case pch <- ai: + default: + // skip if we have asked for a lower limit than the number of peers known + } } close(pch) return pch, nil diff --git a/backoffcache_test.go b/backoffcache_test.go index 7b0621d..a9c6a39 100644 --- a/backoffcache_test.go +++ b/backoffcache_test.go @@ -41,7 +41,12 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) { t.Helper() - peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10)) + assertNumPeersWithLimit(t, ctx, d, ns, 10, count) +} + +func assertNumPeersWithLimit(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, limit int, count int) { + t.Helper() + peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(limit)) if err != nil { t.Fatal(err) } @@ -115,7 +120,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { assertNumPeers(t, ctx, dCache, ns, 1) // wait a little to make sure the extra request doesn't modify the backoff - time.Sleep(time.Millisecond * 50) //50 < 100 + time.Sleep(time.Millisecond * 50) // 50 < 100 assertNumPeers(t, ctx, dCache, ns, 1) // wait for backoff to expire and check if we increase it @@ -124,15 +129,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) { d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400)) - time.Sleep(time.Millisecond * 150) //150 < 250 + time.Sleep(time.Millisecond * 150) // 150 < 250 assertNumPeers(t, ctx, dCache, ns, 1) - time.Sleep(time.Millisecond * 150) //150 + 150 > 250 + time.Sleep(time.Millisecond * 150) // 150 + 150 > 250 assertNumPeers(t, ctx, dCache, ns, 2) // check that the backoff has been reset // also checks that we can decrease our peer count (i.e. not just growing a set) - time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400 + time.Sleep(time.Millisecond * 110) // 110 > 100, also 150+150+110>400 assertNumPeers(t, ctx, dCache, ns, 1) } @@ -185,7 +190,7 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { } szCh1 := 1 - for _ = range ch1 { + for range ch1 { szCh1++ } @@ -193,3 +198,57 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) { t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n) } } + +func TestBackoffDiscoveryCacheCapacity(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + discServer := newDiscoveryServer() + + // Testing with n larger than most internal buffer sizes (32) + n := 40 + advertisers := make([]discovery.Discovery, n) + + for i := 0; i < n; i++ { + h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + advertisers[i] = &mockDiscoveryClient{h, discServer} + } + + h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx)) + d1 := &mockDiscoveryClient{h1, discServer} + + const discoveryInterval = time.Millisecond * 100 + + bkf := NewFixedBackoff(discoveryInterval) + dCache, err := NewBackoffDiscovery(d1, bkf) + if err != nil { + t.Fatal(err) + } + + const ns = "test" + + // add speers + for i := 0; i < n; i++ { + advertisers[i].Advertise(ctx, ns, discovery.TTL(time.Hour)) + } + + // Request all peers, all will be present + assertNumPeersWithLimit(t, ctx, dCache, ns, n, n) + + // Request peers with a lower limit + assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1) + + // Wait a little time but don't allow cache to expire + time.Sleep(discoveryInterval / 10) + + // Request peers with a lower limit this time using cache + // Here we are testing that the cache logic does not block when there are more peers known than the limit requested + // See https://github.com/libp2p/go-libp2p-discovery/issues/67 + assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1) + + // Wait for next discovery so next request will bypass cache + time.Sleep(time.Millisecond * 100) + + // Ask for all peers again + assertNumPeersWithLimit(t, ctx, dCache, ns, n, n) +}