Skip to content

Commit 8ea3d8a

Browse files
janosskylenet
authored andcommitted
swarm: fix network/stream data races (ethereum#19051)
* swarm/network/stream: newStreamerTester cleanup only if err is nil * swarm/network/stream: raise newStreamerTester waitForPeers timeout * swarm/network/stream: fix data races in GetPeerSubscriptions * swarm/storage: prevent data race on LDBStore.batchesC ethersphere/swarm#1198 (comment) * swarm/network/stream: fix TestGetSubscriptionsRPC data race ethersphere/swarm#1198 (comment) * swarm/network/stream: correctly use Simulation.Run callback ethersphere/swarm#1198 (comment) * swarm/network: protect addrCountC in Kademlia.AddrCountC function ethersphere/swarm#1198 (comment) * p2p/simulations: fix a deadlock calling getRandomNode with lock ethersphere/swarm#1198 (comment) * swarm/network/stream: terminate disconnect goruotines in tests * swarm/network/stream: reduce memory consumption when testing data races * swarm/network/stream: add watchDisconnections helper function * swarm/network/stream: add concurrent counter for tests * swarm/network/stream: rename race/norace test files and use const * swarm/network/stream: remove watchSim and its panic * swarm/network/stream: pass context in watchDisconnections * swarm/network/stream: add concurrent safe bool for watchDisconnections * swarm/storage: fix LDBStore.batchesC data race by not closing it (cherry picked from commit 3fd6db2)
1 parent a012701 commit 8ea3d8a

14 files changed

+274
-197
lines changed

p2p/simulations/network.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ func (net *Network) getRandomNode(ids []enode.ID, excludeIDs []enode.ID) *Node {
460460
if l == 0 {
461461
return nil
462462
}
463-
return net.GetNode(filtered[rand.Intn(l)])
463+
return net.getNode(filtered[rand.Intn(l)])
464464
}
465465

466466
func filterIDs(ids []enode.ID, excludeIDs []enode.ID) []enode.ID {

swarm/network/kademlia.go

+3
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,9 @@ func (k *Kademlia) sendNeighbourhoodDepthChange() {
353353
// Not receiving from the returned channel will block Register function
354354
// when address count value changes.
355355
func (k *Kademlia) AddrCountC() <-chan int {
356+
k.lock.Lock()
357+
defer k.lock.Unlock()
358+
356359
if k.addrCountC == nil {
357360
k.addrCountC = make(chan int)
358361
}

swarm/network/stream/common_test.go

+58-4
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,7 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
151151
// temp datadir
152152
datadir, err := ioutil.TempDir("", "streamer")
153153
if err != nil {
154-
return nil, nil, nil, func() {}, err
154+
return nil, nil, nil, nil, err
155155
}
156156
removeDataDir := func() {
157157
os.RemoveAll(datadir)
@@ -163,12 +163,14 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
163163

164164
localStore, err := storage.NewTestLocalStoreForAddr(params)
165165
if err != nil {
166-
return nil, nil, nil, removeDataDir, err
166+
removeDataDir()
167+
return nil, nil, nil, nil, err
167168
}
168169

169170
netStore, err := storage.NewNetStore(localStore, nil)
170171
if err != nil {
171-
return nil, nil, nil, removeDataDir, err
172+
removeDataDir()
173+
return nil, nil, nil, nil, err
172174
}
173175

174176
delivery := NewDelivery(to, netStore)
@@ -180,8 +182,9 @@ func newStreamerTester(registryOptions *RegistryOptions) (*p2ptest.ProtocolTeste
180182
}
181183
protocolTester := p2ptest.NewProtocolTester(addr.ID(), 1, streamer.runProtocol)
182184

183-
err = waitForPeers(streamer, 1*time.Second, 1)
185+
err = waitForPeers(streamer, 10*time.Second, 1)
184186
if err != nil {
187+
teardown()
185188
return nil, nil, nil, nil, errors.New("timeout: peer is not created")
186189
}
187190

@@ -317,3 +320,54 @@ func createTestLocalStorageForID(id enode.ID, addr *network.BzzAddr) (storage.Ch
317320
}
318321
return store, datadir, nil
319322
}
323+
324+
// watchDisconnections receives simulation peer events in a new goroutine and sets atomic value
325+
// disconnected to true in case of a disconnect event.
326+
func watchDisconnections(ctx context.Context, sim *simulation.Simulation) (disconnected *boolean) {
327+
log.Debug("Watching for disconnections")
328+
disconnections := sim.PeerEvents(
329+
ctx,
330+
sim.NodeIDs(),
331+
simulation.NewPeerEventsFilter().Drop(),
332+
)
333+
disconnected = new(boolean)
334+
go func() {
335+
for {
336+
select {
337+
case <-ctx.Done():
338+
return
339+
case d := <-disconnections:
340+
if d.Error != nil {
341+
log.Error("peer drop event error", "node", d.NodeID, "peer", d.PeerID, "err", d.Error)
342+
} else {
343+
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
344+
}
345+
disconnected.set(true)
346+
}
347+
}
348+
}()
349+
return disconnected
350+
}
351+
352+
// boolean is used to concurrently set
353+
// and read a boolean value.
354+
type boolean struct {
355+
v bool
356+
mu sync.RWMutex
357+
}
358+
359+
// set sets the value.
360+
func (b *boolean) set(v bool) {
361+
b.mu.Lock()
362+
defer b.mu.Unlock()
363+
364+
b.v = v
365+
}
366+
367+
// bool reads the value.
368+
func (b *boolean) bool() bool {
369+
b.mu.RLock()
370+
defer b.mu.RUnlock()
371+
372+
return b.v
373+
}

swarm/network/stream/delivery_test.go

+20-55
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"errors"
2323
"fmt"
2424
"sync"
25-
"sync/atomic"
2625
"testing"
2726
"time"
2827

@@ -48,10 +47,10 @@ func TestStreamerRetrieveRequest(t *testing.T) {
4847
Syncing: SyncingDisabled,
4948
}
5049
tester, streamer, _, teardown, err := newStreamerTester(regOpts)
51-
defer teardown()
5250
if err != nil {
5351
t.Fatal(err)
5452
}
53+
defer teardown()
5554

5655
node := tester.Nodes[0]
5756

@@ -100,10 +99,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchangeWithoutStore(t *testing.T) {
10099
Retrieval: RetrievalEnabled,
101100
Syncing: SyncingDisabled, //do no syncing
102101
})
103-
defer teardown()
104102
if err != nil {
105103
t.Fatal(err)
106104
}
105+
defer teardown()
107106

108107
node := tester.Nodes[0]
109108

@@ -172,10 +171,10 @@ func TestStreamerUpstreamRetrieveRequestMsgExchange(t *testing.T) {
172171
Retrieval: RetrievalEnabled,
173172
Syncing: SyncingDisabled,
174173
})
175-
defer teardown()
176174
if err != nil {
177175
t.Fatal(err)
178176
}
177+
defer teardown()
179178

180179
node := tester.Nodes[0]
181180

@@ -362,10 +361,10 @@ func TestStreamerDownstreamChunkDeliveryMsgExchange(t *testing.T) {
362361
Retrieval: RetrievalDisabled,
363362
Syncing: SyncingDisabled,
364363
})
365-
defer teardown()
366364
if err != nil {
367365
t.Fatal(err)
368366
}
367+
defer teardown()
369368

370369
streamer.RegisterClientFunc("foo", func(p *Peer, t string, live bool) (Client, error) {
371370
return &testClient{
@@ -485,7 +484,8 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
485484
}
486485

487486
log.Info("Starting simulation")
488-
ctx := context.Background()
487+
ctx, cancel := context.WithCancel(context.Background())
488+
defer cancel()
489489
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
490490
nodeIDs := sim.UpNodeIDs()
491491
//determine the pivot node to be the first node of the simulation
@@ -548,27 +548,10 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
548548
retErrC <- err
549549
}()
550550

551-
log.Debug("Watching for disconnections")
552-
disconnections := sim.PeerEvents(
553-
context.Background(),
554-
sim.NodeIDs(),
555-
simulation.NewPeerEventsFilter().Drop(),
556-
)
557-
558-
var disconnected atomic.Value
559-
go func() {
560-
for d := range disconnections {
561-
if d.Error != nil {
562-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
563-
disconnected.Store(true)
564-
}
565-
}
566-
}()
551+
disconnected := watchDisconnections(ctx, sim)
567552
defer func() {
568-
if err != nil {
569-
if yes, ok := disconnected.Load().(bool); ok && yes {
570-
err = errors.New("disconnect events received")
571-
}
553+
if err != nil && disconnected.bool() {
554+
err = errors.New("disconnect events received")
572555
}
573556
}()
574557

@@ -589,7 +572,7 @@ func testDeliveryFromNodes(t *testing.T, nodes, chunkCount int, skipCheck bool)
589572
return fmt.Errorf("Test failed, chunks not available on all nodes")
590573
}
591574
if err := <-retErrC; err != nil {
592-
t.Fatalf("requesting chunks: %v", err)
575+
return fmt.Errorf("requesting chunks: %v", err)
593576
}
594577
log.Debug("Test terminated successfully")
595578
return nil
@@ -657,48 +640,33 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
657640
b.Fatal(err)
658641
}
659642

660-
ctx := context.Background()
643+
ctx, cancel := context.WithCancel(context.Background())
644+
defer cancel()
661645
result := sim.Run(ctx, func(ctx context.Context, sim *simulation.Simulation) (err error) {
662646
nodeIDs := sim.UpNodeIDs()
663647
node := nodeIDs[len(nodeIDs)-1]
664648

665649
item, ok := sim.NodeItem(node, bucketKeyFileStore)
666650
if !ok {
667-
b.Fatal("No filestore")
651+
return errors.New("No filestore")
668652
}
669653
remoteFileStore := item.(*storage.FileStore)
670654

671655
pivotNode := nodeIDs[0]
672656
item, ok = sim.NodeItem(pivotNode, bucketKeyNetStore)
673657
if !ok {
674-
b.Fatal("No filestore")
658+
return errors.New("No filestore")
675659
}
676660
netStore := item.(*storage.NetStore)
677661

678662
if _, err := sim.WaitTillHealthy(ctx); err != nil {
679663
return err
680664
}
681665

682-
disconnections := sim.PeerEvents(
683-
context.Background(),
684-
sim.NodeIDs(),
685-
simulation.NewPeerEventsFilter().Drop(),
686-
)
687-
688-
var disconnected atomic.Value
689-
go func() {
690-
for d := range disconnections {
691-
if d.Error != nil {
692-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
693-
disconnected.Store(true)
694-
}
695-
}
696-
}()
666+
disconnected := watchDisconnections(ctx, sim)
697667
defer func() {
698-
if err != nil {
699-
if yes, ok := disconnected.Load().(bool); ok && yes {
700-
err = errors.New("disconnect events received")
701-
}
668+
if err != nil && disconnected.bool() {
669+
err = errors.New("disconnect events received")
702670
}
703671
}()
704672
// benchmark loop
@@ -713,12 +681,12 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
713681
ctx := context.TODO()
714682
hash, wait, err := remoteFileStore.Store(ctx, testutil.RandomReader(i, chunkSize), int64(chunkSize), false)
715683
if err != nil {
716-
b.Fatalf("expected no error. got %v", err)
684+
return fmt.Errorf("store: %v", err)
717685
}
718686
// wait until all chunks stored
719687
err = wait(ctx)
720688
if err != nil {
721-
b.Fatalf("expected no error. got %v", err)
689+
return fmt.Errorf("wait store: %v", err)
722690
}
723691
// collect the hashes
724692
hashes[i] = hash
@@ -754,10 +722,7 @@ func benchmarkDeliveryFromNodes(b *testing.B, nodes, chunkCount int, skipCheck b
754722
break Loop
755723
}
756724
}
757-
if err != nil {
758-
b.Fatal(err)
759-
}
760-
return nil
725+
return err
761726
})
762727
if result.Error != nil {
763728
b.Fatal(result.Error)

swarm/network/stream/intervals_test.go

+5-25
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"errors"
2323
"fmt"
2424
"sync"
25-
"sync/atomic"
2625
"testing"
2726
"time"
2827

@@ -118,13 +117,11 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
118117

119118
_, wait, err := fileStore.Store(ctx, testutil.RandomReader(1, size), int64(size), false)
120119
if err != nil {
121-
log.Error("Store error: %v", "err", err)
122-
t.Fatal(err)
120+
return fmt.Errorf("store: %v", err)
123121
}
124122
err = wait(ctx)
125123
if err != nil {
126-
log.Error("Wait error: %v", "err", err)
127-
t.Fatal(err)
124+
return fmt.Errorf("wait store: %v", err)
128125
}
129126

130127
item, ok = sim.NodeItem(checker, bucketKeyRegistry)
@@ -136,32 +133,15 @@ func testIntervals(t *testing.T, live bool, history *Range, skipCheck bool) {
136133
liveErrC := make(chan error)
137134
historyErrC := make(chan error)
138135

139-
log.Debug("Watching for disconnections")
140-
disconnections := sim.PeerEvents(
141-
context.Background(),
142-
sim.NodeIDs(),
143-
simulation.NewPeerEventsFilter().Drop(),
144-
)
145-
146136
err = registry.Subscribe(storer, NewStream(externalStreamName, "", live), history, Top)
147137
if err != nil {
148138
return err
149139
}
150140

151-
var disconnected atomic.Value
152-
go func() {
153-
for d := range disconnections {
154-
if d.Error != nil {
155-
log.Error("peer drop", "node", d.NodeID, "peer", d.PeerID)
156-
disconnected.Store(true)
157-
}
158-
}
159-
}()
141+
disconnected := watchDisconnections(ctx, sim)
160142
defer func() {
161-
if err != nil {
162-
if yes, ok := disconnected.Load().(bool); ok && yes {
163-
err = errors.New("disconnect events received")
164-
}
143+
if err != nil && disconnected.bool() {
144+
err = errors.New("disconnect events received")
165145
}
166146
}()
167147

0 commit comments

Comments
 (0)