From 4c0145b04cd5a392dc3d7fb31c9b5245918c7a5d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ecl=C3=A9sio=20Junior?= Date: Thu, 18 Jan 2024 08:06:20 -0400 Subject: [PATCH] fix(dot/sync): execute p2p handshake when there is no target (#3695) --- dot/network/errors.go | 2 + dot/network/light.go | 2 +- dot/network/notifications.go | 3 +- dot/network/request_response.go | 2 +- dot/network/service.go | 41 ++++++++++ dot/network/stream_manager.go | 2 +- dot/network/sync.go | 2 +- dot/sync/chain_sync.go | 128 ++++++++++---------------------- dot/sync/chain_sync_test.go | 76 +++++++++---------- dot/sync/errors.go | 1 - dot/sync/interfaces.go | 2 + dot/sync/mocks_test.go | 14 ++++ dot/sync/peer_view.go | 98 ++++++++++++++++++++++++ 13 files changed, 241 insertions(+), 132 deletions(-) create mode 100644 dot/sync/peer_view.go diff --git a/dot/network/errors.go b/dot/network/errors.go index 640f15c4ba..b3121c69a6 100644 --- a/dot/network/errors.go +++ b/dot/network/errors.go @@ -8,6 +8,7 @@ import ( ) var ( + ErrNoPeersConnected = errors.New("no peers connected") ErrReceivedEmptyMessage = errors.New("received empty message") errCannotValidateHandshake = errors.New("failed to validate handshake") @@ -22,4 +23,5 @@ var ( ErrNilStream = errors.New("nil stream") ErrInvalidLEB128EncodedData = errors.New("invalid LEB128 encoded data") ErrGreaterThanMaxSize = errors.New("greater than maximum size") + ErrStreamReset = errors.New("stream reset") ) diff --git a/dot/network/light.go b/dot/network/light.go index 28d0fb3b38..0943cb823d 100644 --- a/dot/network/light.go +++ b/dot/network/light.go @@ -36,7 +36,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message, func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) { defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/notifications.go b/dot/network/notifications.go index 1dcab79157..dac2970be6 100644 --- a/dot/network/notifications.go +++ b/dot/network/notifications.go @@ -242,8 +242,9 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream net ) info.peersData.deleteOutboundHandshakeData(peerID) + err := stream.Close() - if err != nil { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close outbound stream: %s", err) } } diff --git a/dot/network/request_response.go b/dot/network/request_response.go index 09956a6d27..3a3b732cf1 100644 --- a/dot/network/request_response.go +++ b/dot/network/request_response.go @@ -43,7 +43,7 @@ func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMess defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/network/service.go b/dot/network/service.go index 42395d98cd..6fc3445093 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -14,6 +14,7 @@ import ( "github.com/ChainSafe/gossamer/dot/peerset" "github.com/ChainSafe/gossamer/dot/telemetry" + "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" "github.com/ChainSafe/gossamer/internal/mdns" "github.com/ChainSafe/gossamer/internal/metrics" @@ -742,3 +743,43 @@ func (s *Service) startProcessingMsg() { } } } + +func (s *Service) BlockAnnounceHandshake(header *types.Header) error { + peers := s.host.peers() + if len(peers) == 0 { + return ErrNoPeersConnected + } + + protocol, ok := s.notificationsProtocols[blockAnnounceMsgType] + if !ok { + panic("block announce message type not found") + } + + handshake, err := protocol.getHandshake() + if err != nil { + return fmt.Errorf("getting handshake: %w", err) + } + + wg := sync.WaitGroup{} + wg.Add(len(peers)) + for _, p := range peers { + protocol.peersData.setMutex(p) + + go func(p peer.ID) { + defer wg.Done() + stream, err := s.sendHandshake(p, handshake, protocol) + if err != nil { + logger.Tracef("sending block announce handshake: %s", err) + return + } + + response := protocol.peersData.getOutboundHandshakeData(p) + if response.received && response.validated { + closeOutboundStream(protocol, p, stream) + } + }(p) + } + + wg.Wait() + return nil +} diff --git a/dot/network/stream_manager.go b/dot/network/stream_manager.go index b47518c374..c4554c10d6 100644 --- a/dot/network/stream_manager.go +++ b/dot/network/stream_manager.go @@ -61,7 +61,7 @@ func (sm *streamManager) cleanupStreams() { if time.Since(lastReceived) > cleanupStreamInterval { err := stream.Close() - if err != nil { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close inactive stream: %s", err) } delete(sm.streamData, id) diff --git a/dot/network/sync.go b/dot/network/sync.go index ce96ae9d70..c8d3e7f432 100644 --- a/dot/network/sync.go +++ b/dot/network/sync.go @@ -32,7 +32,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er defer func() { err := stream.Close() - if err != nil { + if err != nil && err.Error() != ErrStreamReset.Error() { logger.Warnf("failed to close stream: %s", err) } }() diff --git a/dot/sync/chain_sync.go b/dot/sync/chain_sync.go index eea6301a89..17a6a587ad 100644 --- a/dot/sync/chain_sync.go +++ b/dot/sync/chain_sync.go @@ -7,7 +7,6 @@ import ( "bytes" "errors" "fmt" - "math/big" "strings" "sync" "sync/atomic" @@ -70,13 +69,6 @@ var ( }) ) -// peerView tracks our peers's best reported blocks -type peerView struct { - who peer.ID - hash common.Hash - number uint -} - // ChainSync contains the methods used by the high-level service into the `chainSync` module type ChainSync interface { start() @@ -98,7 +90,6 @@ type announcedBlock struct { who peer.ID header *types.Header } - type chainSync struct { wg sync.WaitGroup stopCh chan struct{} @@ -110,8 +101,7 @@ type chainSync struct { // tracks the latest state we know of from our peers, // ie. their best block hash and number - peerViewLock sync.RWMutex - peerView map[peer.ID]peerView + peerViewSet *peerViewSet // disjoint set of blocks which are known but not ready to be processed // ie. we only know the hash, number, or the parent block is unknown, or the body is unknown @@ -166,7 +156,7 @@ func newChainSync(cfg chainSyncConfig) *chainSync { telemetry: cfg.telemetry, blockState: cfg.bs, network: cfg.net, - peerView: make(map[peer.ID]peerView), + peerViewSet: newPeerViewSet(cfg.maxPeers), pendingBlocks: cfg.pendingBlocks, syncMode: atomicState, finalisedCh: cfg.bs.GetFinalisedNotifierChannel(), @@ -179,21 +169,32 @@ func newChainSync(cfg chainSyncConfig) *chainSync { } } -func (cs *chainSync) waitEnoughPeersAndTarget() { +func (cs *chainSync) waitWorkersAndTarget() { waitPeersTimer := time.NewTimer(cs.waitPeersDuration) + highestFinalizedHeader, err := cs.blockState.GetHighestFinalisedHeader() + if err != nil { + panic(fmt.Sprintf("failed to get highest finalised header: %v", err)) + } + for { cs.workerPool.useConnectedPeers() - _, err := cs.getTarget() - totalAvailable := cs.workerPool.totalWorkers() - if totalAvailable >= uint(cs.minPeers) && err == nil { + + if totalAvailable >= uint(cs.minPeers) && + cs.peerViewSet.getTarget() > 0 { return } + err := cs.network.BlockAnnounceHandshake(highestFinalizedHeader) + if err != nil && !errors.Is(err, network.ErrNoPeersConnected) { + logger.Errorf("retrieving target info from peers: %v", err) + } + select { case <-waitPeersTimer.C: waitPeersTimer.Reset(cs.waitPeersDuration) + case <-cs.stopCh: return } @@ -208,7 +209,7 @@ func (cs *chainSync) start() { go cs.pendingBlocks.run(cs.finalisedCh, cs.stopCh, &cs.wg) // wait until we have a minimal workers in the sync worker pool - cs.waitEnoughPeersAndTarget() + cs.waitWorkersAndTarget() } func (cs *chainSync) stop() error { @@ -237,18 +238,14 @@ func (cs *chainSync) stop() error { } } -func (cs *chainSync) isBootstrap() (bestBlockHeader *types.Header, syncTarget uint, +func (cs *chainSync) currentSyncInformations() (bestBlockHeader *types.Header, syncTarget uint, isBootstrap bool, err error) { - syncTarget, err = cs.getTarget() - if err != nil { - return nil, syncTarget, false, fmt.Errorf("getting target: %w", err) - } - bestBlockHeader, err = cs.blockState.BestBlockHeader() if err != nil { return nil, syncTarget, false, fmt.Errorf("getting best block header: %w", err) } + syncTarget = cs.peerViewSet.getTarget() bestBlockNumber := bestBlockHeader.Number isBootstrap = bestBlockNumber+network.MaxBlocksInResponse < syncTarget return bestBlockHeader, syncTarget, isBootstrap, nil @@ -265,9 +262,9 @@ func (cs *chainSync) bootstrapSync() { default: } - bestBlockHeader, syncTarget, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - logger.Criticalf("ending bootstrap sync, checking target distance: %s", err) + bestBlockHeader, syncTarget, isBootstrap, err := cs.currentSyncInformations() + if err != nil { + logger.Criticalf("ending bootstrap sync, getting current sync info: %s", err) return } @@ -286,7 +283,7 @@ func (cs *chainSync) bootstrapSync() { cs.workerPool.totalWorkers(), syncTarget, finalisedHeader.Number, finalisedHeader.Hash()) - if isFarFromTarget { + if isBootstrap { cs.workerPool.useConnectedPeers() err = cs.requestMaxBlocksFrom(bestBlockHeader, networkInitialSync) if err != nil { @@ -309,25 +306,18 @@ func (cs *chainSync) getSyncMode() chainSyncState { // onBlockAnnounceHandshake sets a peer's best known block func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, bestNumber uint) error { cs.workerPool.fromBlockAnnounce(who) - - cs.peerViewLock.Lock() - cs.peerView[who] = peerView{ - who: who, - hash: bestHash, - number: bestNumber, - } - cs.peerViewLock.Unlock() + cs.peerViewSet.update(who, bestHash, bestNumber) if cs.getSyncMode() == bootstrap { return nil } - _, _, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - return fmt.Errorf("checking target distance: %w", err) + _, _, isBootstrap, err := cs.currentSyncInformations() + if err != nil { + return fmt.Errorf("getting current sync info: %w", err) } - if !isFarFromTarget { + if !isBootstrap { return nil } @@ -344,7 +334,6 @@ func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, func (cs *chainSync) onBlockAnnounce(announced announcedBlock) error { // TODO: https://github.com/ChainSafe/gossamer/issues/3432 cs.workerPool.fromBlockAnnounce(announced.who) - if cs.pendingBlocks.hasBlock(announced.header.Hash()) { return fmt.Errorf("%w: block %s (#%d)", errAlreadyInDisjointSet, announced.header.Hash(), announced.header.Number) @@ -359,19 +348,19 @@ func (cs *chainSync) onBlockAnnounce(announced announcedBlock) error { return nil } - _, _, isFarFromTarget, err := cs.isBootstrap() - if err != nil && !errors.Is(err, errNoPeerViews) { - return fmt.Errorf("checking target distance: %w", err) + bestBlockHeader, _, isFarFromTarget, err := cs.currentSyncInformations() + if err != nil { + return fmt.Errorf("getting current sync info: %w", err) } if !isFarFromTarget { - return cs.requestAnnouncedBlock(announced) + return cs.requestAnnouncedBlock(bestBlockHeader, announced) } return nil } -func (cs *chainSync) requestAnnouncedBlock(announce announcedBlock) error { +func (cs *chainSync) requestAnnouncedBlock(bestBlockHeader *types.Header, announce announcedBlock) error { peerWhoAnnounced := announce.who announcedHash := announce.header.Hash() announcedNumber := announce.header.Number @@ -385,11 +374,6 @@ func (cs *chainSync) requestAnnouncedBlock(announce announcedBlock) error { return nil } - bestBlockHeader, err := cs.blockState.BestBlockHeader() - if err != nil { - return fmt.Errorf("getting best block header: %w", err) - } - highestFinalizedHeader, err := cs.blockState.GetHighestFinalisedHeader() if err != nil { return fmt.Errorf("getting highest finalized header") @@ -554,10 +538,7 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin // we should bound it to the real target which is collected through // block announces received from other peers targetBlockNumber := startRequestAt + maxRequestsAllowed*128 - realTarget, err := cs.getTarget() - if err != nil { - return fmt.Errorf("while getting target: %w", err) - } + realTarget := cs.peerViewSet.getTarget() if targetBlockNumber > realTarget { targetBlockNumber = realTarget @@ -574,7 +555,7 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin } resultsQueue := cs.workerPool.submitRequests(requests) - err = cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) + err := cs.handleWorkersResults(resultsQueue, origin, startRequestAt, expectedAmountOfBlocks) if err != nil { return fmt.Errorf("while handling workers results: %w", err) } @@ -582,30 +563,6 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin return nil } -// getTarget takes the average of all peer heads -// TODO: should we just return the highest? could be an attack vector potentially, if a peer reports some very large -// head block number, it would leave us in bootstrap mode forever -// it would be better to have some sort of standard deviation calculation and discard any outliers (#1861) -func (cs *chainSync) getTarget() (uint, error) { - cs.peerViewLock.RLock() - defer cs.peerViewLock.RUnlock() - - // in practice, this shouldn't happen, as we only start the module once we have some peer states - if len(cs.peerView) == 0 { - return 0, errNoPeerViews - } - - // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements - uintArr := make([]uint, 0, len(cs.peerView)) - for _, ps := range cs.peerView { - uintArr = append(uintArr, ps.number) - } - - sum, count := nonOutliersSumCount(uintArr) - quotientBigInt := big.NewInt(0).Div(sum, big.NewInt(int64(count))) - return uint(quotientBigInt.Uint64()), nil -} - // handleWorkersResults, every time we submit requests to workers they results should be computed here // and every cicle we should endup with a complete chain, whenever we identify // any error from a worker we should evaluate the error and re-insert the request @@ -655,10 +612,10 @@ taskResultLoop: taskResult.who, taskResult.err != nil, taskResult.response != nil) if taskResult.err != nil { - logger.Errorf("task result: peer(%s) error: %s", - taskResult.who, taskResult.err) - if !errors.Is(taskResult.err, network.ErrReceivedEmptyMessage) { + logger.Errorf("task result: peer(%s) error: %s", + taskResult.who, taskResult.err) + if strings.Contains(taskResult.err.Error(), "protocols not supported") { cs.network.ReportPeer(peerset.ReputationChange{ Value: peerset.BadProtocolValue, @@ -1049,14 +1006,11 @@ func doResponseGrowsTheChain(response, ongoingChain []*types.BlockData, startAtB } func (cs *chainSync) getHighestBlock() (highestBlock uint, err error) { - cs.peerViewLock.RLock() - defer cs.peerViewLock.RUnlock() - - if len(cs.peerView) == 0 { + if cs.peerViewSet.size() == 0 { return 0, errNoPeers } - for _, ps := range cs.peerView { + for _, ps := range cs.peerViewSet.values() { if ps.number < highestBlock { continue } diff --git a/dot/sync/chain_sync_test.go b/dot/sync/chain_sync_test.go index 1964e0dc67..d99afe8db6 100644 --- a/dot/sync/chain_sync_test.go +++ b/dot/sync/chain_sync_test.go @@ -87,6 +87,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { return &chainSync{ stopCh: make(chan struct{}), pendingBlocks: pendingBlocks, + peerViewSet: newPeerViewSet(0), workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), } }, @@ -105,6 +106,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { return &chainSync{ stopCh: make(chan struct{}), pendingBlocks: pendingBlocks, + peerViewSet: newPeerViewSet(0), workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), } }, @@ -126,6 +128,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { stopCh: make(chan struct{}), pendingBlocks: pendingBlocks, syncMode: state, + peerViewSet: newPeerViewSet(0), workerPool: newSyncWorkerPool(NewMockNetwork(nil), NewMockRequestMaker(nil)), } }, @@ -205,6 +208,7 @@ func Test_chainSync_onBlockAnnounce(t *testing.T) { telemetry: telemetryMock, storageState: storageStateMock, blockImportHandler: importHandlerMock, + peerViewSet: newPeerViewSet(0), } }, peerID: somePeer, @@ -319,7 +323,7 @@ func Test_chainSync_onBlockAnnounceHandshake_tipModeNeedToCatchup(t *testing.T) chainSync := &chainSync{ stopCh: stopCh, - peerView: make(map[peer.ID]peerView), + peerViewSet: newPeerViewSet(10), syncMode: state, pendingBlocks: newDisjointBlockSet(0), workerPool: newSyncWorkerPool(networkMock, requestMaker), @@ -417,7 +421,7 @@ func TestChainSync_onBlockAnnounceHandshake_onBootstrapMode(t *testing.T) { cs := tt.newChainSync(t, ctrl) cs.onBlockAnnounceHandshake(tt.peerID, tt.bestHash, tt.bestNumber) - view, exists := cs.peerView[tt.peerID] + view, exists := cs.peerViewSet.find(tt.peerID) require.True(t, exists) require.Equal(t, tt.peerID, view.who) require.Equal(t, tt.bestHash, view.hash) @@ -486,7 +490,7 @@ func setupChainSyncToBootstrapMode(t *testing.T, blocksAhead uint, } chainSync := newChainSync(cfg) - chainSync.peerView = peerViewMap + chainSync.peerViewSet = &peerViewSet{view: peerViewMap} chainSync.syncMode.Store(bootstrap) return chainSync @@ -546,8 +550,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) { mockedBlockState, mockedNetwork, mockedRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(128), target) // include a new worker in the worker pool set, this worker @@ -555,7 +558,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorker(t *testing.T) { // the worker pool executes the workers management cs.workerPool.fromBlockAnnounce(peer.ID("noot")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -631,8 +634,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) { mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -641,7 +643,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithTwoWorkers(t *testing.T) { cs.workerPool.fromBlockAnnounce(peer.ID("noot")) cs.workerPool.fromBlockAnnounce(peer.ID("noot2")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -725,8 +727,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -735,7 +736,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithOneWorkerFailing(t *testing. cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -825,8 +826,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -835,7 +835,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithProtocolNotSupported(t *test cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -927,8 +927,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -937,7 +936,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithNilHeaderInResponse(t *testi cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1025,8 +1024,8 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() + require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1035,7 +1034,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithResponseIsNotAChain(t *testi cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1139,8 +1138,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing. cs.badBlocks = []string{fakeBadBlockHash.String()} - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1149,7 +1147,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithReceivedBadBlock(t *testing. cs.workerPool.fromBlockAnnounce(peer.ID("alice")) cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1225,13 +1223,12 @@ func TestChainSync_BootstrapSync_SucessfulSync_ReceivedPartialBlockData(t *testi mockBlockState, mockNetwork, mockRequestMaker, mockBabeVerifier, mockStorageState, mockImportHandler, mockTelemetry) - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) cs.workerPool.fromBlockAnnounce(peer.ID("alice")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.NoError(t, err) err = cs.workerPool.stop() @@ -1641,21 +1638,23 @@ func TestChainSync_getHighestBlock(t *testing.T) { cases := map[string]struct { expectedHighestBlock uint wantErr error - chainSyncPeerView map[peer.ID]peerView + chainSyncPeerViewSet *peerViewSet }{ "no_peer_view": { wantErr: errNoPeers, expectedHighestBlock: 0, - chainSyncPeerView: make(map[peer.ID]peerView), + chainSyncPeerViewSet: newPeerViewSet(10), }, "highest_block": { expectedHighestBlock: 500, - chainSyncPeerView: map[peer.ID]peerView{ - peer.ID("peer-A"): { - number: 100, - }, - peer.ID("peer-B"): { - number: 500, + chainSyncPeerViewSet: &peerViewSet{ + view: map[peer.ID]peerView{ + peer.ID("peer-A"): { + number: 100, + }, + peer.ID("peer-B"): { + number: 500, + }, }, }, }, @@ -1667,7 +1666,7 @@ func TestChainSync_getHighestBlock(t *testing.T) { t.Parallel() chainSync := &chainSync{ - peerView: tt.chainSyncPeerView, + peerViewSet: tt.chainSyncPeerViewSet, } highestBlock, err := chainSync.getHighestBlock() @@ -1747,8 +1746,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithInvalidJusticationBlock(t *t cs.finalityGadget = mockFinalityGadget - target, err := cs.getTarget() - require.NoError(t, err) + target := cs.peerViewSet.getTarget() require.Equal(t, uint(blocksAhead), target) // include a new worker in the worker pool set, this worker @@ -1757,7 +1755,7 @@ func TestChainSync_BootstrapSync_SuccessfulSync_WithInvalidJusticationBlock(t *t cs.workerPool.fromBlockAnnounce(peer.ID("alice")) //cs.workerPool.fromBlockAnnounce(peer.ID("bob")) - err = cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) + err := cs.requestMaxBlocksFrom(mockedGenesisHeader, networkInitialSync) require.ErrorIs(t, err, errVerifyBlockJustification) err = cs.workerPool.stop() diff --git a/dot/sync/errors.go b/dot/sync/errors.go index 564c878422..08f89cacba 100644 --- a/dot/sync/errors.go +++ b/dot/sync/errors.go @@ -20,7 +20,6 @@ var ( errRequestStartTooHigh = errors.New("request start number is higher than our best block") // chainSync errors - errNoPeerViews = errors.New("unable to get target") errNilBlockData = errors.New("block data is nil") errNilHeaderInResponse = errors.New("expected header, received none") errNilBodyInResponse = errors.New("expected body, received none") diff --git a/dot/sync/interfaces.go b/dot/sync/interfaces.go index bfc0575d3f..89336bf46b 100644 --- a/dot/sync/interfaces.go +++ b/dot/sync/interfaces.go @@ -75,6 +75,8 @@ type Network interface { ReportPeer(change peerset.ReputationChange, p peer.ID) AllConnectedPeersIDs() []peer.ID + + BlockAnnounceHandshake(*types.Header) error } // Telemetry is the telemetry client to send telemetry messages. diff --git a/dot/sync/mocks_test.go b/dot/sync/mocks_test.go index 4336307a34..5e1b70bb8f 100644 --- a/dot/sync/mocks_test.go +++ b/dot/sync/mocks_test.go @@ -598,6 +598,20 @@ func (mr *MockNetworkMockRecorder) AllConnectedPeersIDs() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AllConnectedPeersIDs", reflect.TypeOf((*MockNetwork)(nil).AllConnectedPeersIDs)) } +// BlockAnnounceHandshake mocks base method. +func (m *MockNetwork) BlockAnnounceHandshake(arg0 *types.Header) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "BlockAnnounceHandshake", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// BlockAnnounceHandshake indicates an expected call of BlockAnnounceHandshake. +func (mr *MockNetworkMockRecorder) BlockAnnounceHandshake(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BlockAnnounceHandshake", reflect.TypeOf((*MockNetwork)(nil).BlockAnnounceHandshake), arg0) +} + // Peers mocks base method. func (m *MockNetwork) Peers() []common.PeerInfo { m.ctrl.T.Helper() diff --git a/dot/sync/peer_view.go b/dot/sync/peer_view.go new file mode 100644 index 0000000000..3a06122555 --- /dev/null +++ b/dot/sync/peer_view.go @@ -0,0 +1,98 @@ +// Copyright 2024 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only + +package sync + +import ( + "math/big" + "sync" + + "github.com/ChainSafe/gossamer/lib/common" + "github.com/libp2p/go-libp2p/core/peer" + "golang.org/x/exp/maps" +) + +// peerView tracks our peers's best reported blocks +type peerView struct { + who peer.ID + hash common.Hash + number uint +} + +type peerViewSet struct { + mtx sync.RWMutex + view map[peer.ID]peerView + target uint +} + +// getTarget takes the average of all peer views best number +func (p *peerViewSet) getTarget() uint { + p.mtx.RLock() + defer p.mtx.RUnlock() + + if len(p.view) == 0 { + return p.target + } + + numbers := make([]uint, 0, len(p.view)) + // we are going to sort the data and remove the outliers then we will return the avg of all the valid elements + for _, view := range maps.Values(p.view) { + numbers = append(numbers, view.number) + } + + sum, count := nonOutliersSumCount(numbers) + quotientBigInt := uint(big.NewInt(0).Div(sum, big.NewInt(int64(count))).Uint64()) + + if p.target >= quotientBigInt { + return p.target + } + + p.target = quotientBigInt // cache latest calculated target + return p.target +} + +func (p *peerViewSet) find(pID peer.ID) (view peerView, ok bool) { + p.mtx.RLock() + defer p.mtx.RUnlock() + + view, ok = p.view[pID] + return view, ok +} + +func (p *peerViewSet) size() int { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return len(p.view) +} + +func (p *peerViewSet) values() []peerView { + p.mtx.RLock() + defer p.mtx.RUnlock() + + return maps.Values(p.view) +} + +func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) { + p.mtx.Lock() + defer p.mtx.Unlock() + + newView := peerView{ + who: peerID, + hash: hash, + number: number, + } + + view, ok := p.view[peerID] + if ok && view.number >= newView.number { + return + } + + p.view[peerID] = newView +} + +func newPeerViewSet(cap int) *peerViewSet { + return &peerViewSet{ + view: make(map[peer.ID]peerView, cap), + } +}