Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(dot/sync): execute p2p handshake when there is no target #3695

Merged
merged 17 commits into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
f244fcd
chore: execute a block announce if nothing arrives at time
EclesioMeloJunior Jan 15, 2024
4a59ca7
Merge branch 'development' into eclesio/start-sync-from-block-announce
EclesioMeloJunior Jan 15, 2024
46b3ceb
chore: update mocks and fix broken tests
EclesioMeloJunior Jan 16, 2024
746d6ad
chore: moving peerset view to its own file and introduce `getTarget` …
EclesioMeloJunior Jan 16, 2024
c28fb42
Merge branch 'eclesio/start-sync-from-block-announce' of github.com:C…
EclesioMeloJunior Jan 16, 2024
f43b9de
chore: make `peersetview.update` simpler
EclesioMeloJunior Jan 16, 2024
a409b03
Merge branch 'development' into eclesio/start-sync-from-block-announce
EclesioMeloJunior Jan 16, 2024
92cb283
chore: dont wait if there is bootnodes available
EclesioMeloJunior Jan 16, 2024
6b31457
Merge branch 'eclesio/start-sync-from-block-announce' of github.com:C…
EclesioMeloJunior Jan 16, 2024
5b3420d
Merge branch 'development' into eclesio/start-sync-from-block-announce
EclesioMeloJunior Jan 16, 2024
06b8aae
chore: fix broken tests
EclesioMeloJunior Jan 16, 2024
d972609
chore: fix broken tests, rename `isBootstrap` to `currentSyncInformat…
EclesioMeloJunior Jan 16, 2024
4d2458d
chore: fix bootnodes handshake
EclesioMeloJunior Jan 16, 2024
cd0a4e4
chore: uncomment test cases
EclesioMeloJunior Jan 17, 2024
4d596c3
chore: address reviews
EclesioMeloJunior Jan 17, 2024
a05a3bb
Merge branch 'development' into eclesio/start-sync-from-block-announce
EclesioMeloJunior Jan 17, 2024
44c08f0
Merge branch 'development' into eclesio/start-sync-from-block-announce
EclesioMeloJunior Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dot/network/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (s *Service) decodeLightMessage(in []byte, peer peer.ID, _ bool) (Message,
func (s *Service) handleLightMsg(stream libp2pnetwork.Stream, msg Message) (err error) {
defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != "stream reset" {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down
3 changes: 2 additions & 1 deletion dot/network/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,9 @@ func closeOutboundStream(info *notificationsProtocol, peerID peer.ID, stream net
)

info.peersData.deleteOutboundHandshakeData(peerID)

err := stream.Close()
if err != nil {
if err != nil && err.Error() != "stream reset" {
logger.Warnf("failed to close outbound stream: %s", err)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/request_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (rrp *RequestResponseProtocol) Do(to peer.ID, req Message, res ResponseMess

defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != "stream reset" {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down
38 changes: 38 additions & 0 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/ChainSafe/gossamer/dot/peerset"
"github.com/ChainSafe/gossamer/dot/telemetry"
"github.com/ChainSafe/gossamer/dot/types"
"github.com/ChainSafe/gossamer/internal/log"
"github.com/ChainSafe/gossamer/internal/mdns"
"github.com/ChainSafe/gossamer/internal/metrics"
Expand Down Expand Up @@ -742,3 +743,40 @@ func (s *Service) startProcessingMsg() {
}
}
}

func (s *Service) BlockAnnounceHandshake(header *types.Header) error {
protocol, ok := s.notificationsProtocols[blockAnnounceMsgType]
if !ok {
panic("block announce message type not found")
}

handshake, err := protocol.getHandshake()
if err != nil {
return fmt.Errorf("getting handshake: %w", err)
}

peers := s.host.peers()

wg := sync.WaitGroup{}
wg.Add(len(peers))
for _, p := range peers {
protocol.peersData.setMutex(p)

go func(p peer.ID) {
defer wg.Done()
stream, err := s.sendHandshake(p, handshake, protocol)
if err != nil {
logger.Tracef("while sending block announce handshake: %v", err)
return
}

response := protocol.peersData.getOutboundHandshakeData(p)
if response.received && response.validated {
closeOutboundStream(protocol, p, stream)
}
}(p)
}

wg.Wait()
return nil
}
2 changes: 1 addition & 1 deletion dot/network/stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (sm *streamManager) cleanupStreams() {

if time.Since(lastReceived) > cleanupStreamInterval {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != "stream reset" {
logger.Warnf("failed to close inactive stream: %s", err)
}
delete(sm.streamData, id)
Expand Down
2 changes: 1 addition & 1 deletion dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (s *Service) handleSyncMessage(stream libp2pnetwork.Stream, msg Message) er

defer func() {
err := stream.Close()
if err != nil {
if err != nil && err.Error() != "stream reset" {
logger.Warnf("failed to close stream: %s", err)
}
}()
Expand Down
100 changes: 74 additions & 26 deletions dot/sync/chain_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"

"github.com/ChainSafe/gossamer/dot/network"
Expand Down Expand Up @@ -99,6 +100,54 @@ type announcedBlock struct {
header *types.Header
}

type peerViewSet struct {
mtx sync.RWMutex
view map[peer.ID]peerView
}

func (p *peerViewSet) size() int {
p.mtx.RLock()
defer p.mtx.RUnlock()

return len(p.view)
}

func (p *peerViewSet) values() []peerView {
p.mtx.RLock()
defer p.mtx.RUnlock()

return maps.Values(p.view)
}

func (p *peerViewSet) update(peerID peer.ID, hash common.Hash, number uint) {
p.mtx.Lock()
defer p.mtx.Unlock()

newView := peerView{
who: peerID,
hash: hash,
number: number,
}

view, ok := p.view[peerID]
if !ok {
p.view[peerID] = newView
return
}

if view.number >= newView.number {
return
}

p.view[peerID] = newView
}

func newPeerViewSet(cap int) *peerViewSet {
return &peerViewSet{
view: make(map[peer.ID]peerView, cap),
}
}

type chainSync struct {
wg sync.WaitGroup
stopCh chan struct{}
Expand All @@ -110,8 +159,7 @@ type chainSync struct {

// tracks the latest state we know of from our peers,
// ie. their best block hash and number
peerViewLock sync.RWMutex
peerView map[peer.ID]peerView
peerViewSet *peerViewSet

// disjoint set of blocks which are known but not ready to be processed
// ie. we only know the hash, number, or the parent block is unknown, or the body is unknown
Expand Down Expand Up @@ -166,7 +214,7 @@ func newChainSync(cfg chainSyncConfig) *chainSync {
telemetry: cfg.telemetry,
blockState: cfg.bs,
network: cfg.net,
peerView: make(map[peer.ID]peerView),
peerViewSet: newPeerViewSet(cfg.maxPeers),
pendingBlocks: cfg.pendingBlocks,
syncMode: atomicState,
finalisedCh: cfg.bs.GetFinalisedNotifierChannel(),
Expand Down Expand Up @@ -194,6 +242,17 @@ func (cs *chainSync) waitEnoughPeersAndTarget() {
select {
case <-waitPeersTimer.C:
waitPeersTimer.Reset(cs.waitPeersDuration)
bestBlockHeader, err := cs.blockState.BestBlockHeader()
if err != nil {
logger.Errorf("failed to get best block header: %v", err)
continue
}

err = cs.network.BlockAnnounceHandshake(bestBlockHeader)
if err != nil {
logger.Errorf("failed to get handshake peer data: %v", err)
continue
}
case <-cs.stopCh:
return
}
Expand Down Expand Up @@ -310,13 +369,7 @@ func (cs *chainSync) getSyncMode() chainSyncState {
func (cs *chainSync) onBlockAnnounceHandshake(who peer.ID, bestHash common.Hash, bestNumber uint) error {
cs.workerPool.fromBlockAnnounce(who)

cs.peerViewLock.Lock()
cs.peerView[who] = peerView{
who: who,
hash: bestHash,
number: bestNumber,
}
cs.peerViewLock.Unlock()
cs.peerViewSet.update(who, bestHash, bestNumber)

if cs.getSyncMode() == bootstrap {
return nil
Expand Down Expand Up @@ -587,21 +640,19 @@ func (cs *chainSync) requestMaxBlocksFrom(bestBlockHeader *types.Header, origin
// head block number, it would leave us in bootstrap mode forever
// it would be better to have some sort of standard deviation calculation and discard any outliers (#1861)
func (cs *chainSync) getTarget() (uint, error) {
cs.peerViewLock.RLock()
defer cs.peerViewLock.RUnlock()

peerSetLen := cs.peerViewSet.size()
// in practice, this shouldn't happen, as we only start the module once we have some peer states
if len(cs.peerView) == 0 {
if peerSetLen == 0 {
return 0, errNoPeerViews
}

numbers := make([]uint, 0, peerSetLen)
// we are going to sort the data and remove the outliers then we will return the avg of all the valid elements
uintArr := make([]uint, 0, len(cs.peerView))
for _, ps := range cs.peerView {
uintArr = append(uintArr, ps.number)
for _, view := range cs.peerViewSet.values() {
numbers = append(numbers, view.number)
}

sum, count := nonOutliersSumCount(uintArr)
sum, count := nonOutliersSumCount(numbers)
quotientBigInt := big.NewInt(0).Div(sum, big.NewInt(int64(count)))
return uint(quotientBigInt.Uint64()), nil
}
Expand Down Expand Up @@ -655,10 +706,10 @@ taskResultLoop:
taskResult.who, taskResult.err != nil, taskResult.response != nil)

if taskResult.err != nil {
logger.Errorf("task result: peer(%s) error: %s",
taskResult.who, taskResult.err)

if !errors.Is(taskResult.err, network.ErrReceivedEmptyMessage) {
logger.Errorf("task result: peer(%s) error: %s",
taskResult.who, taskResult.err)

if strings.Contains(taskResult.err.Error(), "protocols not supported") {
cs.network.ReportPeer(peerset.ReputationChange{
Value: peerset.BadProtocolValue,
Expand Down Expand Up @@ -1049,14 +1100,11 @@ func doResponseGrowsTheChain(response, ongoingChain []*types.BlockData, startAtB
}

func (cs *chainSync) getHighestBlock() (highestBlock uint, err error) {
cs.peerViewLock.RLock()
defer cs.peerViewLock.RUnlock()

if len(cs.peerView) == 0 {
if cs.peerViewSet.size() == 0 {
return 0, errNoPeers
}

for _, ps := range cs.peerView {
for _, ps := range cs.peerViewSet.values() {
if ps.number < highestBlock {
continue
}
Expand Down
2 changes: 2 additions & 0 deletions dot/sync/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ type Network interface {
ReportPeer(change peerset.ReputationChange, p peer.ID)

AllConnectedPeersIDs() []peer.ID

BlockAnnounceHandshake(*types.Header) error
}

// Telemetry is the telemetry client to send telemetry messages.
Expand Down
2 changes: 1 addition & 1 deletion dot/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewService(cfg *Config) (*Service, error) {
telemetry: cfg.Telemetry,
badBlocks: cfg.BadBlocks,
requestMaker: cfg.RequestMaker,
waitPeersDuration: 100 * time.Millisecond,
waitPeersDuration: 7 * time.Second,
}
chainSync := newChainSync(csCfg)

Expand Down
Loading