From ee3d243debc4370dc8ac8799923bc5390f28d274 Mon Sep 17 00:00:00 2001 From: Kirill Date: Fri, 23 Feb 2024 11:55:14 +0100 Subject: [PATCH] fix: fix non deterministic panic during TestStableNetworkRPC integration test (#3756) --- dot/network/connmgr.go | 9 +- dot/network/connmgr_integration_test.go | 2 +- dot/network/discovery.go | 28 +-- dot/network/host.go | 9 +- dot/network/interfaces.go | 7 +- {internal/mdns => dot/network}/notifee.go | 4 +- dot/network/notifications_test.go | 2 +- dot/network/service.go | 18 +- dot/network/utils_test.go | 4 +- dot/peerset/peerset.go | 18 +- dot/peerset/peerset_test.go | 10 +- dot/peerset/peerstate.go | 15 +- dot/peerset/peerstate_test.go | 36 ++-- go.mod | 2 +- go.sum | 9 +- internal/mdns/dialable.go | 60 ------ internal/mdns/interfaces.go | 32 ---- internal/mdns/mdns.go | 213 ---------------------- tests/rpc/system_integration_test.go | 4 +- 19 files changed, 104 insertions(+), 378 deletions(-) rename {internal/mdns => dot/network}/notifee.go (90%) delete mode 100644 internal/mdns/dialable.go delete mode 100644 internal/mdns/interfaces.go delete mode 100644 internal/mdns/mdns.go diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index 8700638e67..f99b87f6c7 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -19,7 +19,7 @@ import ( type ConnManager struct { sync.Mutex host *host - min, max int + maxPeers int connectHandler func(peer.ID) disconnectHandler func(peer.ID) @@ -33,15 +33,16 @@ type ConnManager struct { peerSetHandler PeerSetHandler } -func newConnManager(min, max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) { +func newConnManager(max int, peerSetCfg *peerset.ConfigSet) (*ConnManager, error) { + // TODO: peerSetHandler never used from within connection manager and also referred outside through cm, + // so this should be refactored psh, err := peerset.NewPeerSetHandler(peerSetCfg) if err != nil { return nil, err } return &ConnManager{ - min: min, - max: max, + maxPeers: max, protectedPeers: new(sync.Map), persistentPeers: new(sync.Map), peerSetHandler: psh, diff --git a/dot/network/connmgr_integration_test.go b/dot/network/connmgr_integration_test.go index 63efe2ca47..c0ba3bd4c2 100644 --- a/dot/network/connmgr_integration_test.go +++ b/dot/network/connmgr_integration_test.go @@ -103,7 +103,7 @@ func TestProtectUnprotectPeer(t *testing.T) { ) peerCfgSet := peerset.NewConfigSet(uint32(max-min), uint32(max), false, slotAllocationTime) - cm, err := newConnManager(min, max, peerCfgSet) + cm, err := newConnManager(max, peerCfgSet) require.NoError(t, err) p1 := peer.ID("a") diff --git a/dot/network/discovery.go b/dot/network/discovery.go index 08a6b1be53..8bedf3e812 100644 --- a/dot/network/discovery.go +++ b/dot/network/discovery.go @@ -35,35 +35,35 @@ var ( // discovery handles discovery of new peers via the kademlia DHT type discovery struct { - ctx context.Context - dht *dual.DHT - rd *routing.RoutingDiscovery - h libp2phost.Host - bootnodes []peer.AddrInfo - ds *badger.Datastore - pid protocol.ID - minPeers, maxPeers int - handler PeerSetHandler + ctx context.Context + dht *dual.DHT + rd *routing.RoutingDiscovery + h libp2phost.Host + bootnodes []peer.AddrInfo + ds *badger.Datastore + pid protocol.ID + maxPeers int + handler PeerSetHandler } func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, - pid protocol.ID, min, max int, handler PeerSetHandler) *discovery { + pid protocol.ID, max int, handler PeerSetHandler) *discovery { return &discovery{ ctx: ctx, h: h, bootnodes: bootnodes, ds: ds, pid: pid, - minPeers: min, maxPeers: max, handler: handler, } } +// waitForPeers periodically checks kadDHT peers store for new peers and returns them, +// this function used for local environments to prepopulate bootnodes from mDNS func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) { // get all currently connected peers and use them to bootstrap the DHT - currentPeers := d.h.Network().Peers() t := time.NewTicker(startDHTTimeout) @@ -92,6 +92,9 @@ func (d *discovery) waitForPeers() (peers []peer.AddrInfo, err error) { // start creates the DHT. func (d *discovery) start() error { + // this basically only works with enabled mDNS which is used only for local test setups. Without bootnodes kademilia + // would not bee able to connect to any peers and mDNS is used to find peers in local network. + // TODO: should be refactored because this if is basically used for local integration test purpose if len(d.bootnodes) == 0 { peers, err := d.waitForPeers() if err != nil { @@ -100,7 +103,6 @@ func (d *discovery) start() error { d.bootnodes = peers } - logger.Debugf("starting DHT with bootnodes %v...", d.bootnodes) logger.Debugf("V1ProtocolOverride %v...", d.pid+"/kad") diff --git a/dot/network/host.go b/dot/network/host.go index a7ef2c0096..ef0216f138 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -152,14 +152,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { // connections remain between min peers and max peers const reservedOnly = false peerCfgSet := peerset.NewConfigSet( + //TODO: there is no any understanding of maxOutPeers and maxInPirs calculations. + // This needs to be explicitly mentioned + + // maxInPeers is later used in peerstate only and defines available Incoming connection slots uint32(cfg.MaxPeers-cfg.MinPeers), + // maxOutPeers is later used in peerstate only and defines available Outgoing connection slots uint32(cfg.MaxPeers/2), reservedOnly, peerSetSlotAllocTime, ) // create connection manager - cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet) + cm, err := newConnManager(cfg.MaxPeers, peerCfgSet) if err != nil { return nil, fmt.Errorf("failed to create connection manager: %w", err) } @@ -243,7 +248,7 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { } bwc := metrics.NewBandwidthCounter() - discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MinPeers, cfg.MaxPeers, cm.peerSetHandler) + discovery := newDiscovery(ctx, h, bns, ds, pid, cfg.MaxPeers, cm.peerSetHandler) host := &host{ ctx: ctx, diff --git a/dot/network/interfaces.go b/dot/network/interfaces.go index 6f9145367f..977fa27f6e 100644 --- a/dot/network/interfaces.go +++ b/dot/network/interfaces.go @@ -3,7 +3,10 @@ package network -import "encoding/json" +import ( + "encoding/json" + "io" +) // Telemetry is the telemetry client to send telemetry messages. type Telemetry interface { @@ -22,5 +25,5 @@ type Logger interface { // MDNS is the mDNS service interface. type MDNS interface { Start() error - Stop() error + io.Closer } diff --git a/internal/mdns/notifee.go b/dot/network/notifee.go similarity index 90% rename from internal/mdns/notifee.go rename to dot/network/notifee.go index 0d1d3cdd17..4c61b4a9bc 100644 --- a/internal/mdns/notifee.go +++ b/dot/network/notifee.go @@ -1,7 +1,7 @@ // Copyright 2022 ChainSafe Systems (ON) // SPDX-License-Identifier: LGPL-3.0-only -package mdns +package network import ( "time" @@ -35,7 +35,7 @@ type NotifeeTracker struct { peerAdder PeerAdder } -// HandlePeerFound tracks the address info from the peer found. +// HandlePeerFound is a libp2p.mdns.Notifee interface implementation for mDNS in libp2p. func (n *NotifeeTracker) HandlePeerFound(p peer.AddrInfo) { n.addressAdder.AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL) n.peerAdder.AddPeer(0, p.ID) diff --git a/dot/network/notifications_test.go b/dot/network/notifications_test.go index 0a7d9d1044..21f0c5449e 100644 --- a/dot/network/notifications_test.go +++ b/dot/network/notifications_test.go @@ -303,7 +303,7 @@ func Test_HandshakeTimeout(t *testing.T) { // after the timeout time.Sleep(handshakeTimeout) - // handshake data shouldn't exist still + // handshake data still shouldn't exist data = info.peersData.getOutboundHandshakeData(nodeB.host.id()) require.Nil(t, data) diff --git a/dot/network/service.go b/dot/network/service.go index 6fc3445093..af995315c1 100644 --- a/dot/network/service.go +++ b/dot/network/service.go @@ -16,12 +16,12 @@ import ( "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/internal/log" - "github.com/ChainSafe/gossamer/internal/mdns" "github.com/ChainSafe/gossamer/internal/metrics" "github.com/ChainSafe/gossamer/lib/common" libp2pnetwork "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" + "github.com/libp2p/go-libp2p/p2p/discovery/mdns" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -194,12 +194,12 @@ func NewService(cfg *Config) (*Service, error) { } serviceTag := string(host.protocolID) - notifee := mdns.NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler) + notifee := NewNotifeeTracker(host.p2pHost.Peerstore(), host.cm.peerSetHandler) mdnsLogger := log.NewFromGlobal(log.AddContext("module", "mdns")) mdnsLogger.Debugf( "Creating mDNS discovery service with host %s and protocol %s...", host.id(), host.protocolID) - mdnsService := mdns.NewService(host.p2pHost, serviceTag, mdnsLogger, notifee) + mdnsService := mdns.NewMdnsService(host.p2pHost, serviceTag, notifee) network := &Service{ ctx: ctx, @@ -291,6 +291,8 @@ func (s *Service) Start() error { // this handles all new connections (incoming and outgoing) // it creates a per-protocol mutex for sending outbound handshakes to the peer + // connectHandler is a part of libp2p.Notifiee interface implementation and getting called in the very end + //after or Incoming or Outgoing node is connected s.host.cm.connectHandler = func(peerID peer.ID) { for _, prtl := range s.notificationsProtocols { prtl.peersData.setMutex(peerID) @@ -322,7 +324,8 @@ func (s *Service) Start() error { return fmt.Errorf("starting mDNS service: %w", err) } } - + // TODO: this is basically a hack that is used only in unit tests to disable kademilia dht. + // Should be replaced with a mock instead. if !s.noDiscover { go func() { err = s.host.discovery.start() @@ -467,7 +470,7 @@ func (s *Service) Stop() error { s.cancel() // close mDNS discovery service - err := s.mdns.Stop() + err := s.mdns.Close() if err != nil { logger.Errorf("Failed to close mDNS discovery service: %s", err) } @@ -694,6 +697,9 @@ func (s *Service) startPeerSetHandler() { go s.startProcessingMsg() } +// processMessage process messages from PeerSetHandler. Responsible for Connecting and Drop connection with peers. +// When Connect message received function looking for a PeerAddr in Peerstore. +// If address is not found in peerstore we are looking for a peer with DHT func (s *Service) processMessage(msg peerset.Message) { peerID := msg.PeerID if peerID == "" { @@ -714,6 +720,7 @@ func (s *Service) processMessage(msg peerset.Message) { err := s.host.connect(addrInfo) if err != nil { + // TODO: if error happens here outgoing (?) slot is occupied but no peer is really connected logger.Warnf("failed to open connection for peer %s: %s", peerID, err) return } @@ -728,6 +735,7 @@ func (s *Service) processMessage(msg peerset.Message) { } } +// startProcessingMsg function that listens to messages from the channel that belongs to PeerSet PeerSetHandler. func (s *Service) startProcessingMsg() { msgCh := s.host.cm.peerSetHandler.Messages() for { diff --git a/dot/network/utils_test.go b/dot/network/utils_test.go index cf702cd996..235e35c108 100644 --- a/dot/network/utils_test.go +++ b/dot/network/utils_test.go @@ -65,7 +65,7 @@ func TestStringToAddrInfo(t *testing.T) { for _, str := range TestPeers { pi, err := stringToAddrInfo(str) require.NoError(t, err) - require.Equal(t, pi.ID.Pretty(), str[len(str)-46:]) + require.Equal(t, pi.ID.String(), str[len(str)-46:]) } } @@ -74,7 +74,7 @@ func TestStringsToAddrInfos(t *testing.T) { require.NoError(t, err) for k, pi := range pi { - require.Equal(t, pi.ID.Pretty(), TestPeers[k][len(TestPeers[k])-46:]) + require.Equal(t, pi.ID.String(), TestPeers[k][len(TestPeers[k])-46:]) } } diff --git a/dot/peerset/peerset.go b/dot/peerset/peerset.go index 4f6186b67e..57f7303568 100644 --- a/dot/peerset/peerset.go +++ b/dot/peerset/peerset.go @@ -178,7 +178,9 @@ type PeerSet struct { // TODO: this will be useful for reserved only mode // this is for future purpose if reserved-only flag is enabled (#1888). isReservedOnly bool - resultMsgCh chan Message + + // resultMsgCh is read by network.Service. + resultMsgCh chan Message // time when the PeerSet was created. created time.Time // last time when we updated the reputations of connected nodes. @@ -369,6 +371,7 @@ func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error { } // allocSlots tries to fill available outgoing slots of nodes for the given set. +// By default this getting called every X seconds according to nextPeriodicAllocSlots ticker func (ps *PeerSet) allocSlots(setIdx int) error { err := ps.updateTime() if err != nil { @@ -382,7 +385,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error { case connectedPeer: continue case unknownPeer: - peerState.discover(setIdx, reservePeer) + peerState.insertPeer(setIdx, reservePeer) } node, err := ps.peerState.getNode(reservePeer) @@ -425,7 +428,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error { } if err = peerState.tryOutgoing(setIdx, peerID); err != nil { - logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.Pretty(), err) + logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.String(), err) break } @@ -450,7 +453,7 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error { return nil } - ps.peerState.discover(setID, peerID) + ps.peerState.insertPeer(setID, peerID) ps.reservedNode[peerID] = struct{}{} if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil { @@ -537,13 +540,16 @@ func (ps *PeerSet) setReservedPeer(setID int, peers ...peer.ID) error { return nil } +// addPeer checks peer existence in peerSet and if it does not insert the peer in to peerstate with +// default reputation and notConnected status. Afterwards runs allocSlots that checks availability of outgoing slots +// and put notConnected peers in to them func (ps *PeerSet) addPeer(setID int, peers peer.IDSlice) error { for _, pid := range peers { if ps.peerState.peerStatus(setID, pid) != unknownPeer { return nil } - ps.peerState.discover(setID, pid) + ps.peerState.insertPeer(setID, pid) if err := ps.allocSlots(setID); err != nil { return fmt.Errorf("could not allocate slots: %w", err) } @@ -612,7 +618,7 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error { case notConnectedPeer: ps.peerState.nodes[pid].lastConnected[setID] = time.Now() case unknownPeer: - ps.peerState.discover(setID, pid) + ps.peerState.insertPeer(setID, pid) } state := ps.peerState diff --git a/dot/peerset/peerset_test.go b/dot/peerset/peerset_test.go index 4fb0d8a84a..6503f39b2f 100644 --- a/dot/peerset/peerset_test.go +++ b/dot/peerset/peerset_test.go @@ -24,7 +24,7 @@ func TestBanRejectAcceptPeer(t *testing.T) { peer1Status := ps.peerState.peerStatus(testSetID, peer1) require.Equal(t, unknownPeer, peer1Status) - ps.peerState.discover(testSetID, peer1) + ps.peerState.insertPeer(testSetID, peer1) // adding peer1 with incoming slot. err := ps.peerState.tryAcceptIncoming(testSetID, peer1) require.NoError(t, err) @@ -136,19 +136,19 @@ func TestPeerSetIncoming(t *testing.T) { pid: incomingPeer, expectedStatus: Accept, expectedNumIn: 1, - hasFreeIncomingSlot: false, + hasFreeIncomingSlot: true, }, { pid: incoming2, expectedStatus: Accept, expectedNumIn: 2, - hasFreeIncomingSlot: true, + hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected }, { pid: incoming3, expectedStatus: Reject, expectedNumIn: 2, - hasFreeIncomingSlot: true, + hasFreeIncomingSlot: false, // since maxIn is 2, we will not have any free slots if 2 peers connected }, } @@ -217,7 +217,7 @@ func TestReAllocAfterBanned(t *testing.T) { peer1Status := ps.peerState.peerStatus(testSetID, peer1) require.Equal(t, unknownPeer, peer1Status) - ps.peerState.discover(testSetID, peer1) + ps.peerState.insertPeer(testSetID, peer1) err := ps.peerState.tryAcceptIncoming(testSetID, peer1) require.NoError(t, err) diff --git a/dot/peerset/peerstate.go b/dot/peerset/peerstate.go index 6ddc840ca1..2ddf774567 100644 --- a/dot/peerset/peerstate.go +++ b/dot/peerset/peerstate.go @@ -270,14 +270,17 @@ func (ps *PeersState) highestNotConnectedPeer(set int) (highestPeerID peer.ID) { return highestPeerID } +// hasFreeOutgoingSlot check does number of connected out peers is less then max amount allowed connected peers. +// maxOut is defined as config param as. func (ps *PeersState) hasFreeOutgoingSlot(set int) bool { return ps.sets[set].numOut < ps.sets[set].maxOut } // Note: that it is possible for numIn to be strictly superior to the max, in case we were // connected to reserved node then marked them as not reserved. +// maxIn is defined as config param. func (ps *PeersState) hasFreeIncomingSlot(set int) bool { - return ps.sets[set].numIn >= ps.sets[set].maxIn + return ps.sets[set].numIn < ps.sets[set].maxIn } // addNoSlotNode adds a node to the list of nodes that don't occupy slots. @@ -367,17 +370,15 @@ func (ps *PeersState) disconnect(idx int, peerID peer.ID) error { return nil } -// discover takes input for set id and create a node and insert in the list. +// insertPeer takes input for set id and create a node and insert in the list. // the initial Reputation of the peer will be 0 and ingoing notMember state. -func (ps *PeersState) discover(set int, peerID peer.ID) { +func (ps *PeersState) insertPeer(set int, peerID peer.ID) { ps.Lock() defer ps.Unlock() - numSet := len(ps.sets) - _, has := ps.nodes[peerID] if !has { - n := newNode(numSet) + n := newNode(len(ps.sets)) n.state[set] = notConnected ps.nodes[peerID] = n } @@ -472,7 +473,7 @@ func (ps *PeersState) tryAcceptIncoming(setID int, peerID peer.ID) error { _, isNoSlotOccupied := ps.sets[setID].noSlotNodes[peerID] // if slot is not available and the node is not a reserved node then error - if ps.hasFreeIncomingSlot(setID) && !isNoSlotOccupied { + if !ps.hasFreeIncomingSlot(setID) && !isNoSlotOccupied { return ErrIncomingSlotsUnavailable } diff --git a/dot/peerset/peerstate_test.go b/dot/peerset/peerstate_test.go index 45c56e4d6c..7f4c59ec36 100644 --- a/dot/peerset/peerstate_test.go +++ b/dot/peerset/peerstate_test.go @@ -17,9 +17,9 @@ func TestFullSlotIn(t *testing.T) { // initially peer1 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) - // discover peer1 - state.discover(0, peer1) - // peer1 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + // insertPeer peer1 + state.insertPeer(0, peer1) + // peer1 state will change from unknownPeer to notConnectedPeer, once we tried to insertPeer it. require.Equal(t, notConnectedPeer, state.peerStatus(0, peer1)) // try to make peer1 as an incoming connection. err := state.tryAcceptIncoming(0, peer1) @@ -30,8 +30,8 @@ func TestFullSlotIn(t *testing.T) { // initially peer2 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) - // discover peer2 - state.discover(0, peer2) + // insertPeer peer2 + state.insertPeer(0, peer2) // try to make peer2 as an incoming connection. err = state.tryAcceptIncoming(0, peer2) // peer2 will not be accepted as incoming connection, as we only have one incoming connection slot ingoing peerState. @@ -50,8 +50,8 @@ func TestNoSlotNodeDoesntOccupySlot(t *testing.T) { // initially peer1 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) - // discover peer1 - state.discover(0, peer1) + // insertPeer peer1 + state.insertPeer(0, peer1) // peer1 will become an incoming connection. err = state.tryAcceptIncoming(0, peer1) require.NoError(t, err) @@ -63,9 +63,9 @@ func TestNoSlotNodeDoesntOccupySlot(t *testing.T) { // initially peer2 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) - // discover peer2 - state.discover(0, peer2) - // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + // insertPeer peer2 + state.insertPeer(0, peer2) + // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to insertPeer it. require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) // try to accept peer2 as an incoming connection. @@ -86,8 +86,8 @@ func TestDisconnectingFreeSlot(t *testing.T) { // initially peer1 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) - // discover peer1 - state.discover(0, peer1) + // insertPeer peer1 + state.insertPeer(0, peer1) err := state.tryAcceptIncoming(0, peer1) // try to make peer1 as an incoming connection. require.NoError(t, err) // peer1 is connected @@ -95,9 +95,9 @@ func TestDisconnectingFreeSlot(t *testing.T) { // initially peer2 state will be unknownPeer. require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) - // discover peer2 - state.discover(0, peer2) - // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to discover it. + // insertPeer peer2 + state.insertPeer(0, peer2) + // peer2 state will change from unknownPeer to notConnectedPeer, once we tried to insertPeer it. require.Equal(t, notConnectedPeer, state.peerStatus(0, peer2)) // try to make peer2 as an incoming connection. err = state.tryAcceptIncoming(0, peer2) @@ -125,7 +125,7 @@ func TestDisconnectNoSlotDoesntPanic(t *testing.T) { require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) - state.discover(0, peer1) + state.insertPeer(0, peer1) err = state.tryOutgoing(0, peer1) require.NoError(t, err) @@ -147,7 +147,7 @@ func TestHighestNotConnectedPeer(t *testing.T) { require.Equal(t, unknownPeer, state.peerStatus(0, peer1)) - state.discover(0, peer1) + state.insertPeer(0, peer1) n, err := state.getNode(peer1) require.NoError(t, err) @@ -158,7 +158,7 @@ func TestHighestNotConnectedPeer(t *testing.T) { require.Equal(t, unknownPeer, state.peerStatus(0, peer2)) - state.discover(0, peer2) + state.insertPeer(0, peer2) n, err = state.getNode(peer2) require.NoError(t, err) n.reputation = 25 diff --git a/go.mod b/go.mod index 57ce7eb505..d3c5ffc2cc 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,6 @@ require ( github.com/spf13/viper v1.18.2 github.com/stretchr/testify v1.8.4 github.com/tetratelabs/wazero v1.1.0 - github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 go.uber.org/mock v0.4.0 golang.org/x/crypto v0.19.0 golang.org/x/exp v0.0.0-20240110193028-0dcbfd608b1e @@ -124,6 +123,7 @@ require ( github.com/libp2p/go-netroute v0.2.1 // indirect github.com/libp2p/go-reuseport v0.4.0 // indirect github.com/libp2p/go-yamux/v4 v4.0.1 // indirect + github.com/libp2p/zeroconf/v2 v2.2.0 // indirect github.com/magiconair/properties v1.8.7 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/mattn/go-colorable v0.1.13 // indirect diff --git a/go.sum b/go.sum index 54df804dcc..eaf0276ce3 100644 --- a/go.sum +++ b/go.sum @@ -385,6 +385,8 @@ github.com/libp2p/go-reuseport v0.4.0 h1:nR5KU7hD0WxXCJbmw7r2rhRYruNRl2koHw8fQsc github.com/libp2p/go-reuseport v0.4.0/go.mod h1:ZtI03j/wO5hZVDFo2jKywN6bYKWLOy8Se6DrI2E1cLU= github.com/libp2p/go-yamux/v4 v4.0.1 h1:FfDR4S1wj6Bw2Pqbc8Uz7pCxeRBPbwsBbEdfwiCypkQ= github.com/libp2p/go-yamux/v4 v4.0.1/go.mod h1:NWjl8ZTLOGlozrXSOZ/HlfG++39iKNnM5wwmtQP1YB4= +github.com/libp2p/zeroconf/v2 v2.2.0 h1:Cup06Jv6u81HLhIj1KasuNM/RHHrJ8T7wOTS4+Tv53Q= +github.com/libp2p/zeroconf/v2 v2.2.0/go.mod h1:fuJqLnUwZTshS3U/bMRJ3+ow/v9oid1n0DmyYyNO1Xs= github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm/+2c2E2WMI= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/magiconair/properties v1.8.7 h1:IeQXZAiQcpL9mgcAe1Nu6cX9LLw6ExEHKjN0VQdvPDY= @@ -403,6 +405,7 @@ github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvls github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4= github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJysuI= +github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/miekg/dns v1.1.55 h1:GoQ4hpsj0nFLYe+bWiCToyrBEJXkQfOOIvFGFy0lEgo= github.com/miekg/dns v1.1.55/go.mod h1:uInx36IzPl7FYnDcMeVWxj9byh7DutNykX4G9Sj60FY= github.com/mikioh/tcp v0.0.0-20190314235350-803a9b46060c h1:bzE/A84HN25pxAuk9Eej1Kz9OUelF97nAc82bDquQI8= @@ -624,8 +627,6 @@ github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSD github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 h1:EKhdznlJHPMoKr0XTrX+IlJs1LH3lyx2nfr1dOlZ79k= github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1/go.mod h1:8UvriyWtv5Q5EOgjHaSseUEdkQfvwFv1I/In/O2M9gc= -github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9 h1:Y1/FEOpaCpD21WxrmfeIYCFPuVPRCY2XZTWzTNHGw30= -github.com/whyrusleeping/mdns v0.0.0-20190826153040-b9b60ed33aa9/go.mod h1:j4l84WPFclQPj320J9gp0XwNKBb3U0zt5CBqjPp22G4= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -714,6 +715,7 @@ golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.21.0 h1:AQyQV4dYCvJ7vGmJyKki9+PBdyvhkSd8EIx/qb0AYv4= golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= @@ -754,6 +756,8 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210303074136-134d130e1a04/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -774,6 +778,7 @@ golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/internal/mdns/dialable.go b/internal/mdns/dialable.go deleted file mode 100644 index 1da61b7408..0000000000 --- a/internal/mdns/dialable.go +++ /dev/null @@ -1,60 +0,0 @@ -// Copyright 2022 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package mdns - -import ( - "errors" - "fmt" - "net" - - manet "github.com/multiformats/go-multiaddr/net" -) - -var ( - ErrTCPListenAddressNotFound = errors.New("TCP listen address not found") -) - -func getMDNSIPsAndPort(network interfaceListenAddressesGetter) (ips []net.IP, port uint16) { - tcpAddresses, err := getDialableListenAddrs(network) - if err != nil { - const defaultPort = 4001 - return nil, defaultPort - } - - ips = make([]net.IP, len(tcpAddresses)) - for i := range tcpAddresses { - ips[i] = tcpAddresses[i].IP - } - port = uint16(tcpAddresses[0].Port) - - return ips, port -} - -func getDialableListenAddrs(network interfaceListenAddressesGetter) (tcpAddresses []*net.TCPAddr, err error) { - multiAddresses, err := network.InterfaceListenAddresses() - if err != nil { - return nil, fmt.Errorf("listing host interface listen addresses: %w", err) - } - - tcpAddresses = make([]*net.TCPAddr, 0, len(multiAddresses)) - for _, multiAddress := range multiAddresses { - netAddress, err := manet.ToNetAddr(multiAddress) - if err != nil { - continue - } - - tcpAddress, ok := netAddress.(*net.TCPAddr) - if !ok { - continue - } - - tcpAddresses = append(tcpAddresses, tcpAddress) - } - - if len(tcpAddresses) == 0 { - return nil, fmt.Errorf("%w: in %d multiaddresses", ErrTCPListenAddressNotFound, len(multiAddresses)) - } - - return tcpAddresses, nil -} diff --git a/internal/mdns/interfaces.go b/internal/mdns/interfaces.go deleted file mode 100644 index e909e0380a..0000000000 --- a/internal/mdns/interfaces.go +++ /dev/null @@ -1,32 +0,0 @@ -// Copyright 2022 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package mdns - -import ( - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" -) - -// Logger is a logger interface for the mDNS service. -type Logger interface { - Debugf(format string, args ...any) - Warnf(format string, args ...any) -} - -// IDNetworker can return the peer ID and a network interface. -type IDNetworker interface { - ID() peer.ID - Networker -} - -// Networker can return a network interface. -type Networker interface { - Network() network.Network -} - -// interfaceListenAddressesGetter returns the listen addresses of the interfaces. -type interfaceListenAddressesGetter interface { - InterfaceListenAddresses() ([]multiaddr.Multiaddr, error) -} diff --git a/internal/mdns/mdns.go b/internal/mdns/mdns.go deleted file mode 100644 index 63c2a13f1c..0000000000 --- a/internal/mdns/mdns.go +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright 2022 ChainSafe Systems (ON) -// SPDX-License-Identifier: LGPL-3.0-only - -package mdns - -import ( - "errors" - "fmt" - "net" - "sync" - "time" - - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/multiformats/go-multiaddr" - manet "github.com/multiformats/go-multiaddr/net" - "github.com/whyrusleeping/mdns" -) - -// Notifee is notified when a new peer is found. -type Notifee interface { - HandlePeerFound(peer.AddrInfo) -} - -// Service implements a mDNS service. -type Service struct { - // Dependencies and configuration injected - p2pHost IDNetworker - serviceTag string - logger Logger - notifee Notifee - - // Constant fields - pollPeriod time.Duration - - // Fields set by the Start method. - server *mdns.Server - - // Internal service management fields. - // startStopMutex is to prevent concurrent calls to Start and Stop. - startStopMutex sync.Mutex - started bool - stop chan struct{} - done chan struct{} -} - -// NewService creates and returns a new mDNS service. -func NewService(p2pHost IDNetworker, serviceTag string, - logger Logger, notifee Notifee) (service *Service) { - if serviceTag == "" { - serviceTag = "_ipfs-discovery._udp" - } - - return &Service{ - p2pHost: p2pHost, - serviceTag: serviceTag, - notifee: notifee, - logger: logger, - pollPeriod: time.Minute, - } -} - -// Start starts the mDNS service. -func (s *Service) Start() (err error) { - s.startStopMutex.Lock() - defer s.startStopMutex.Unlock() - - if s.started { - return nil - } - - ips, port := getMDNSIPsAndPort(s.p2pHost.Network()) - - hostID := s.p2pHost.ID() - - hostIDPretty := hostID.Pretty() - txt := []string{hostIDPretty} - - mdns.DisableLogging = true - mdnsService, err := mdns.NewMDNSService(hostIDPretty, s.serviceTag, "", "", int(port), ips, txt) - if err != nil { - return fmt.Errorf("creating mDNS service: %w", err) - } - - server, err := mdns.NewServer(&mdns.Config{Zone: mdnsService}) - if err != nil { - return fmt.Errorf("creating mDNS server: %w", err) - } - s.server = server - - s.stop = make(chan struct{}) - s.done = make(chan struct{}) - ready := make(chan struct{}) - - go s.run(ready) - // It takes a few milliseconds to launch a goroutine - // so we wait for the run goroutine to be ready. - <-ready - - s.started = true - - return nil -} - -// Stop stops the mDNS service and server. -func (s *Service) Stop() (err error) { - s.startStopMutex.Lock() - defer s.startStopMutex.Unlock() - - if !s.started { - return nil - } - - defer func() { - s.started = false - }() - close(s.stop) - <-s.done - return s.server.Shutdown() -} - -func (s *Service) run(ready chan<- struct{}) { - defer close(s.done) - - ticker := time.NewTicker(s.pollPeriod) - defer ticker.Stop() - - const queryTimeout = 5 * time.Second - params := &mdns.QueryParam{ - Domain: "local", - Service: s.serviceTag, - Timeout: queryTimeout, - } - - close(ready) - - for { - entriesListeningReady := make(chan struct{}) - entriesListeningDone := make(chan struct{}) - entriesCh := make(chan *mdns.ServiceEntry, 16) - go func() { - defer close(entriesListeningDone) - close(entriesListeningReady) - for entry := range entriesCh { - err := s.handleEntry(entry) - if err != nil { - s.logger.Warnf("handling mDNS entry: %s", err) - } - } - }() - <-entriesListeningReady - - params.Entries = entriesCh - err := mdns.Query(params) - if err != nil { - s.logger.Warnf("mdns query failed: %s", err) - } - - close(entriesCh) - <-entriesListeningDone - - select { - case <-ticker.C: - case <-s.stop: - return - } - } -} - -var ( - errEntryHasNoIP = errors.New("MDNS entry has no IP address") -) - -func (s *Service) handleEntry(entry *mdns.ServiceEntry) (err error) { - receivedPeerID, err := peer.Decode(entry.Info) - if err != nil { - return fmt.Errorf("parsing peer ID from mdns entry: %w", err) - } - - if receivedPeerID == s.p2pHost.ID() { - return nil - } - - var ip net.IP - switch { - case entry.AddrV4 != nil: - ip = entry.AddrV4 - case entry.AddrV6 != nil: - ip = entry.AddrV6 - default: - return fmt.Errorf("%w: from peer id %s", errEntryHasNoIP, receivedPeerID) - } - - tcpAddress := &net.TCPAddr{ - IP: ip, - Port: entry.Port, - } - - multiAddress, err := manet.FromNetAddr(tcpAddress) - if err != nil { - return fmt.Errorf("converting tcp address from peer id %s to multiaddress: %w", - receivedPeerID, err) - } - - addressInfo := peer.AddrInfo{ - ID: receivedPeerID, - Addrs: []multiaddr.Multiaddr{multiAddress}, - } - - s.logger.Debugf("Peer %s has addresses %s", receivedPeerID, addressInfo.Addrs) - go s.notifee.HandlePeerFound(addressInfo) - return nil -} diff --git a/tests/rpc/system_integration_test.go b/tests/rpc/system_integration_test.go index c2842736e2..3bc346597f 100644 --- a/tests/rpc/system_integration_test.go +++ b/tests/rpc/system_integration_test.go @@ -34,7 +34,7 @@ func TestStableNetworkRPC(t *testing.T) { //nolint:tparallel con.Core.Role = common.FullNodeRole con.RPC.Modules = []string{"system", "author", "chain"} con.Network.MinPeers = 1 - con.Network.MaxPeers = 2 + con.Network.MaxPeers = 20 con.Core.BabeAuthority = true con.Log.Sync = "trace" @@ -68,7 +68,7 @@ func TestStableNetworkRPC(t *testing.T) { //nolint:tparallel err := retry.UntilOK(peerTimeout, 10*time.Second, func() (bool, error) { for _, node := range nodes { endpoint := rpc.NewEndpoint(node.RPCPort()) - t.Logf("starting node %s with port %s", node.String(), endpoint) + t.Logf("requesting node %s with port %s", node.String(), endpoint) var response modules.SystemHealthResponse fetchWithTimeoutFromEndpoint(t, endpoint, "system_health", &response) t.Logf("Response: %+v", response)