Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

p2p: http catchpoints support #5924

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions agreement/gossip/network_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package gossip

import (
"context"
"net"
"net/http"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -156,7 +155,7 @@ func (w *whiteholeNetwork) GetPeers(options ...network.PeerOption) []network.Pee
}
func (w *whiteholeNetwork) RegisterHTTPHandler(path string, handler http.Handler) {
}
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
func (w *whiteholeNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettable) {
return nil
}

Expand Down
17 changes: 11 additions & 6 deletions catchup/ledgerFetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,18 @@ func makeLedgerFetcher(net network.GossipNode, accessor ledger.CatchpointCatchup
}

func (lf *ledgerFetcher) requestLedger(ctx context.Context, peer network.HTTPPeer, round basics.Round, method string) (*http.Response, error) {
parsedURL, err := network.ParseHostOrURL(peer.GetAddress())
if err != nil {
return nil, err
}
var ledgerURL string
if network.IsMultiaddr(peer.GetAddress()) {
ledgerURL = network.SubstituteGenesisID(lf.net, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36))
} else {

parsedURL.Path = network.SubstituteGenesisID(lf.net, path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL := parsedURL.String()
parsedURL, err := network.ParseHostOrURL(peer.GetAddress())
if err != nil {
return nil, err
}
parsedURL.Path = network.SubstituteGenesisID(lf.net, path.Join(parsedURL.Path, "/v1/{genesisID}/ledger/"+strconv.FormatUint(uint64(round), 36)))
ledgerURL = parsedURL.String()
}
lf.log.Debugf("ledger %s %#v peer %#v %T", method, ledgerURL, peer, peer)
request, err := http.NewRequestWithContext(ctx, method, ledgerURL, nil)
if err != nil {
Expand Down
54 changes: 49 additions & 5 deletions catchup/ledgerFetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package catchup

import (
"archive/tar"
"context"
"fmt"
"net"
Expand All @@ -30,6 +31,8 @@ import (
"github.com/algorand/go-algorand/data/basics"
"github.com/algorand/go-algorand/ledger"
"github.com/algorand/go-algorand/logging"
p2ptesting "github.com/algorand/go-algorand/network/p2p/testing"
"github.com/algorand/go-algorand/rpcs"
"github.com/algorand/go-algorand/test/partitiontest"
)

Expand Down Expand Up @@ -125,7 +128,7 @@ func TestLedgerFetcherErrorResponseHandling(t *testing.T) {
}
}

func TestLedgerFetcherHeadLedger(t *testing.T) {
func TestLedgerFetcher(t *testing.T) {
partitiontest.PartitionTest(t)

// create a dummy server.
Expand All @@ -136,16 +139,19 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
listener, err := net.Listen("tcp", "localhost:")

var httpServerResponse = 0
var contentTypes = make([]string, 0)
require.NoError(t, err)
go s.Serve(listener)
defer s.Close()
defer listener.Close()
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
for _, contentType := range contentTypes {
w.Header().Add("Content-Type", contentType)
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
w.WriteHeader(httpServerResponse)
})
successPeer := testHTTPPeer(listener.Addr().String())
lf := makeLedgerFetcher(&mocks.MockNetwork{}, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())
Expand All @@ -169,8 +175,46 @@ func TestLedgerFetcherHeadLedger(t *testing.T) {
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), &successPeer, basics.Round(0))
require.NoError(t, err)

// headLedger 500 response
httpServerResponse = http.StatusInternalServerError
err = lf.headLedger(context.Background(), &successPeer, basics.Round(0))
require.Equal(t, fmt.Errorf("headLedger error response status code %d", http.StatusInternalServerError), err)
}

func TestLedgerFetcherP2P(t *testing.T) {
partitiontest.PartitionTest(t)

mux := http.NewServeMux()
nodeA := p2ptesting.MakeHTTPNode(t)
nodeA.RegisterHTTPHandler("/v1/ledger/0", mux)
var httpServerResponse = 0
mux.HandleFunc("/", func(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodHead {
w.WriteHeader(httpServerResponse)
} else {
w.Header().Add("Content-Type", rpcs.LedgerResponseContentType)
w.WriteHeader(httpServerResponse)
wtar := tar.NewWriter(w)
wtar.Close()
}
})

nodeA.Start()
defer nodeA.Stop()

successPeer := nodeA.GetHTTPPeer()
lf := makeLedgerFetcher(nodeA, &mocks.MockCatchpointCatchupAccessor{}, logging.TestingLog(t), &dummyLedgerFetcherReporter{}, config.GetDefaultLocal())

// headLedger 200 response
httpServerResponse = http.StatusOK
err := lf.headLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)

httpServerResponse = http.StatusOK
err = lf.downloadLedger(context.Background(), successPeer, basics.Round(0))
require.NoError(t, err)
}
3 changes: 1 addition & 2 deletions components/mocks/mockNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package mocks
import (
"context"
"errors"
"net"
"net/http"

"github.com/algorand/go-algorand/network"
Expand Down Expand Up @@ -100,7 +99,7 @@ func (network *MockNetwork) RegisterHTTPHandler(path string, handler http.Handle
func (network *MockNetwork) OnNetworkAdvance() {}

// GetHTTPRequestConnection - empty implementation
func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
func (network *MockNetwork) GetHTTPRequestConnection(request *http.Request) (conn network.DeadlineSettable) {
return nil
}

Expand Down
11 changes: 9 additions & 2 deletions network/gossipNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ package network

import (
"context"
"net"
"net/http"
"strings"
"time"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/protocol"
Expand Down Expand Up @@ -52,6 +52,13 @@ const (
PeersPhonebookArchivers PeerOption = iota
)

// DeadlineSettable abstracts net.Conn and related types as deadline-settable
type DeadlineSettable interface {
SetDeadline(time.Time) error
SetReadDeadline(time.Time) error
SetWriteDeadline(time.Time) error
}

// GossipNode represents a node in the gossip network
type GossipNode interface {
Address() (string, bool)
Expand Down Expand Up @@ -95,7 +102,7 @@ type GossipNode interface {

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
GetHTTPRequestConnection(request *http.Request) (conn net.Conn)
GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettable)

// GetGenesisID returns the network-specific genesisID.
GetGenesisID() string
Expand Down
9 changes: 6 additions & 3 deletions network/hybridNetwork.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import (
"context"
"fmt"
"net"
"net/http"
"sync"

Expand Down Expand Up @@ -206,8 +205,12 @@

// GetHTTPRequestConnection returns the underlying connection for the given request. Note that the request must be the same
// request that was provided to the http handler ( or provide a fallback Context() to that )
func (n *HybridP2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn net.Conn) {
return nil
func (n *HybridP2PNetwork) GetHTTPRequestConnection(request *http.Request) (conn DeadlineSettable) {
conn = n.wsNetwork.GetHTTPRequestConnection(request)
if conn != nil {
return conn

Check warning on line 211 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L208-L211

Added lines #L208 - L211 were not covered by tests
}
return n.p2pNetwork.GetHTTPRequestConnection(request)

Check warning on line 213 in network/hybridNetwork.go

View check run for this annotation

Codecov / codecov/patch

network/hybridNetwork.go#L213

Added line #L213 was not covered by tests
}

// GetGenesisID returns the network-specific genesisID.
Expand Down
51 changes: 50 additions & 1 deletion network/p2p/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
libp2phttp "github.com/libp2p/go-libp2p/p2p/http"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
"github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
Expand All @@ -56,6 +58,8 @@
ListPeersForTopic(topic string) []peer.ID
Subscribe(topic string, val pubsub.ValidatorEx) (*pubsub.Subscription, error)
Publish(ctx context.Context, topic string, data []byte) error

GetStream(peer.ID) (network.Stream, bool)
}

// serviceImpl manages integration with libp2p and implements the Service interface
Expand Down Expand Up @@ -116,7 +120,47 @@
noListenAddrs,
libp2p.Security(noise.ID, noise.New),
)
return host, listenAddr, err
return &StreamChainingHost{
Host: host,
handlers: map[protocol.ID][]network.StreamHandler{},
}, listenAddr, err
}

// StreamChainingHost is a wrapper around host.Host that overrides SetStreamHandler
// to allow chaining multiple handlers for the same protocol.
// Note, there should be probably only single handler that writes/reads streams.
type StreamChainingHost struct {
host.Host
handlers map[protocol.ID][]network.StreamHandler
mutex deadlock.Mutex
}

// SetStreamHandler overrides the host.Host.SetStreamHandler method for chaining multiple handlers.
// Function objects are not comparable so theoretically it could have duplicates.
// The main use case is to track HTTP streams for ProtocolIDForMultistreamSelect = "/http/1.1"
// so it could just filter for such protocol if there any issues with other protocols like kad or mesh.
func (h *StreamChainingHost) SetStreamHandler(pid protocol.ID, handler network.StreamHandler) {
h.mutex.Lock()
defer h.mutex.Unlock()

handlers := h.handlers[pid]
if len(handlers) == 0 {
// no other handlers, do not set a proxy handler
h.Host.SetStreamHandler(pid, handler)
h.handlers[pid] = append(handlers, handler)
return
}
// otherwise chain the handlers with a copy of the existing handlers
handlers = append(handlers, handler)
// copy to save it in the closure and call lock free
currentHandlers := make([]network.StreamHandler, len(handlers))
copy(currentHandlers, handlers)
h.Host.SetStreamHandler(pid, func(s network.Stream) {
for _, h := range currentHandlers {
h(s)
}
})
h.handlers[pid] = handlers
}

// MakeService creates a P2P service instance
Expand All @@ -125,6 +169,7 @@
sm := makeStreamManager(ctx, log, h, wsStreamHandler)
h.Network().Notify(sm)
h.SetStreamHandler(AlgorandWsProtocol, sm.streamHandler)
h.SetStreamHandler(libp2phttp.ProtocolIDForMultistreamSelect, sm.streamHandlerHTTP)

Check warning on line 172 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L172

Added line #L172 was not covered by tests

ps, err := makePubSub(ctx, cfg, h)
if err != nil {
Expand Down Expand Up @@ -218,6 +263,10 @@
return s.host.Network().ClosePeer(peer)
}

func (s *serviceImpl) GetStream(peerID peer.ID) (network.Stream, bool) {
return s.streams.getStream(peerID)

Check warning on line 267 in network/p2p/p2p.go

View check run for this annotation

Codecov / codecov/patch

network/p2p/p2p.go#L266-L267

Added lines #L266 - L267 were not covered by tests
}

// netAddressToListenAddress converts a netAddress in "ip:port" format to a listen address
// that can be passed in to libp2p.ListenAddrStrings
func netAddressToListenAddress(netAddress string) (string, error) {
Expand Down
72 changes: 71 additions & 1 deletion network/p2p/p2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,20 @@
package p2p

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

"github.com/algorand/go-algorand/test/partitiontest"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"

"github.com/algorand/go-algorand/config"
"github.com/algorand/go-algorand/network/p2p/peerstore"
"github.com/algorand/go-algorand/test/partitiontest"
)

// Tests the helper function netAddressToListenAddress which converts
Expand Down Expand Up @@ -74,3 +83,64 @@ func TestNetAddressToListenAddress(t *testing.T) {
})
}
}

func TestP2PStreamingHost(t *testing.T) {
partitiontest.PartitionTest(t)

cfg := config.GetDefaultLocal()
dir := t.TempDir()
pstore, err := peerstore.NewPeerStore(nil)
require.NoError(t, err)
h, la, err := MakeHost(cfg, dir, pstore)
require.NoError(t, err)

var h1calls atomic.Int64
h1 := func(network.Stream) {
h1calls.Add(1)
}
var h2calls atomic.Int64
h2 := func(network.Stream) {
h2calls.Add(1)
}

ma, err := multiaddr.NewMultiaddr(la)
require.NoError(t, err)
h.Network().Listen(ma)
defer h.Close()

h.SetStreamHandler(AlgorandWsProtocol, h1)
h.SetStreamHandler(AlgorandWsProtocol, h2)

addrInfo := peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}
cpstore, err := peerstore.NewPeerStore([]*peer.AddrInfo{&addrInfo})
require.NoError(t, err)
c, _, err := MakeHost(cfg, dir, cpstore)
require.NoError(t, err)
defer c.Close()

s1, err := c.NewStream(context.Background(), h.ID(), AlgorandWsProtocol)
require.NoError(t, err)
s1.Write([]byte("hello"))
defer s1.Close()

require.Eventually(t, func() bool {
return h1calls.Load() == 1 && h2calls.Load() == 1
}, 5*time.Second, 100*time.Millisecond)

// ensure a single handler also works as expected
h1calls.Store(0)
h.SetStreamHandler(algorandP2pHTTPProtocol, h1)

s2, err := c.NewStream(context.Background(), h.ID(), algorandP2pHTTPProtocol)
require.NoError(t, err)
s2.Write([]byte("hello"))
defer s2.Close()

require.Eventually(t, func() bool {
return h1calls.Load() == 1
}, 5*time.Second, 100*time.Millisecond)

}
Loading
Loading