From 2db8f7a8f6c9f8be30a520be906bf52be43d75bf Mon Sep 17 00:00:00 2001 From: acruikshank Date: Thu, 21 Jan 2021 12:57:05 -0500 Subject: [PATCH] feat(testplan): add http comparison --- testplans/graphsync/go.mod | 1 + testplans/graphsync/main.go | 168 ++++++++++++++++++++++++++---- testplans/graphsync/manifest.toml | 1 + 3 files changed, 151 insertions(+), 19 deletions(-) diff --git a/testplans/graphsync/go.mod b/testplans/graphsync/go.mod index 29fe70ee..c3f52c05 100644 --- a/testplans/graphsync/go.mod +++ b/testplans/graphsync/go.mod @@ -27,6 +27,7 @@ require ( github.com/libp2p/go-libp2p-secio v0.2.2 github.com/libp2p/go-libp2p-tls v0.1.3 github.com/libp2p/go-sockaddr v0.1.0 // indirect + github.com/multiformats/go-multiaddr v0.3.1 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 diff --git a/testplans/graphsync/main.go b/testplans/graphsync/main.go index a1cbbd05..26e4834c 100644 --- a/testplans/graphsync/main.go +++ b/testplans/graphsync/main.go @@ -3,8 +3,12 @@ package main import ( "context" "crypto/rand" + "encoding/json" "fmt" "io" + "io/ioutil" + "net" + "net/http" "os" "path/filepath" goruntime "runtime" @@ -36,6 +40,7 @@ import ( noise "github.com/libp2p/go-libp2p-noise" secio "github.com/libp2p/go-libp2p-secio" tls "github.com/libp2p/go-libp2p-tls" + ma "github.com/multiformats/go-multiaddr" "github.com/testground/sdk-go/network" "github.com/testground/sdk-go/run" "github.com/testground/sdk-go/runtime" @@ -48,6 +53,60 @@ import ( "github.com/ipfs/go-graphsync/storeutil" ) +type AddrInfo struct { + peerAddr *peer.AddrInfo + ip net.IP +} + +func (pi AddrInfo) MarshalJSON() ([]byte, error) { + out := make(map[string]interface{}) + peerJSON, err := pi.peerAddr.MarshalJSON() + if err != nil { + panic(fmt.Sprintf("error marshaling: %v", err)) + } + out["PEER"] = string(peerJSON) + + ip, err := pi.ip.MarshalText() + if err != nil { + panic(fmt.Sprintf("error marshaling: %v", err)) + } + out["IP"] = string(ip) + return json.Marshal(out) +} + +func (pi *AddrInfo) UnmarshalJSON(b []byte) error { + var data map[string]interface{} + err := json.Unmarshal(b, &data) + if err != nil { + panic(fmt.Sprintf("error unmarshaling: %v", err)) + } + + var pa peer.AddrInfo + pi.peerAddr = &pa + peerAddrData := data["PEER"].(string) + var peerData map[string]interface{} + err = json.Unmarshal([]byte(peerAddrData), &peerData) + if err != nil { + panic(err) + } + pid, err := peer.Decode(peerData["ID"].(string)) + if err != nil { + panic(err) + } + pi.peerAddr.ID = pid + addrs, ok := peerData["Addrs"].([]interface{}) + if ok { + for _, a := range addrs { + pi.peerAddr.Addrs = append(pi.peerAddr.Addrs, ma.StringCast(a.(string))) + } + } + + if err := pi.ip.UnmarshalText([]byte(data["IP"].(string))); err != nil { + panic(fmt.Sprintf("error unmarshaling: %v", err)) + } + return nil +} + var testcases = map[string]interface{}{ "stress": run.InitializedTestCaseFn(runStress), } @@ -80,7 +139,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { initCtx.MustWaitAllInstancesInitialized(ctx) - host, peers, _ := makeHost(ctx, runenv, initCtx) + host, ip, peers, _ := makeHost(ctx, runenv, initCtx) defer host.Close() datastore, err := createDatastore(runenv.BooleanParam("disk_store")) @@ -117,7 +176,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) { recorder.recordBlock() }) - err := runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder) + err := runProvider(ctx, runenv, initCtx, dagsrv, size, ip, networkParams, concurrency, memorySnapshots, recorder) if err != nil { runenv.RecordMessage("Error running provider: %s", err.Error()) } @@ -129,8 +188,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error { recorder.recordBlock() }) - p := *peers[0] - if err := host.Connect(ctx, p); err != nil { + p := peers[0] + if err := host.Connect(ctx, *p.peerAddr); err != nil { return err } runenv.RecordMessage("done dialling provider") @@ -200,13 +259,15 @@ func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode { } } -func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error { +func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error { var ( cids []cid.Cid // create a selector for the whole UnixFS dag sel = allselector.AllSelector ) + runHTTPTest := runenv.BooleanParam("compare_http") + for round, np := range networkParams { var ( topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{}) @@ -251,7 +312,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init runenv.RecordMessage("\t>>> requesting CID %s", c) start := time.Now() - respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel) + respCh, errCh := gsync.Request(grpctx, p.peerAddr.ID, clink, sel) for range respCh { } for err := range errCh { @@ -259,20 +320,43 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init } dur := time.Since(start) - runenv.RecordMessage("\t<<< request complete with no errors") + runenv.RecordMessage("\t<<< graphsync request complete with no errors") runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur) measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size)) measurement = strings.Replace(measurement, " ", "", -1) - runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second)) - + if runHTTPTest { + runenv.R().RecordPoint(measurement+",transport=graphsync", float64(dur)/float64(time.Second)) + } else { + runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second)) + } // verify that we have the CID now. if node, err := dagsrv.Get(grpctx, c); err != nil { return err } else if node == nil { return fmt.Errorf("finished graphsync request, but CID not in store") } - + if runHTTPTest { + // request file directly over http + start = time.Now() + file, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-", c.String())) + if err != nil { + panic(err) + } + resp, err := http.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String())) + if err != nil { + panic(err) + } + bytesRead, err := io.Copy(file, resp.Body) + if err != nil { + panic(err) + } + dur = time.Since(start) + + runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead)) + runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur) + runenv.R().RecordPoint(measurement+",transport=http", float64(dur)/float64(time.Second)) + } return nil }) } @@ -292,12 +376,26 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init return nil } -func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error { +func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error { var ( cids []cid.Cid bufferedDS = format.NewBufferedDAG(ctx, dagsrv) ) + runHTTPTest := runenv.BooleanParam("compare_http") + var svr *http.Server + if runHTTPTest { + // start an http server on port 8080 + runenv.RecordMessage("creating http server at http://%s:8080", ip.String()) + svr = &http.Server{Addr: ":8080"} + + go func() { + if err := svr.ListenAndServe(); err != nil { + runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String()) + } + }() + } + for round, np := range networkParams { var ( topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{}) @@ -322,7 +420,14 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC // generate as many random files as the concurrency level. for i := 0; i < concurrency; i++ { // file with random data - file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size))) + data := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size))) + file, err := ioutil.TempFile(os.TempDir(), "unixfs-") + if err != nil { + panic(err) + } + if _, err := io.Copy(file, data); err != nil { + panic(err) + } unixfsChunkSize := uint64(1) << runenv.IntParam("chunk_size") unixfsLinksPerLevel := runenv.IntParam("links_per_level") @@ -334,6 +439,9 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC Dagserv: bufferedDS, } + if _, err := file.Seek(0, 0); err != nil { + panic(err) + } db, err := params.New(chunk.NewSizeSplitter(file, int64(unixfsChunkSize))) if err != nil { return fmt.Errorf("unable to setup dag builder: %w", err) @@ -344,6 +452,20 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC return fmt.Errorf("unable to create unix fs node: %w", err) } + if runHTTPTest { + // set up http server to send file + http.HandleFunc(fmt.Sprintf("/%s", node.Cid()), func(w http.ResponseWriter, r *http.Request) { + fileReader, err := os.Open(file.Name()) + defer fileReader.Close() + if err != nil { + panic(err) + } + _, err = io.Copy(w, fileReader) + if err != nil { + panic(err) + } + }) + } cids = append(cids, node.Cid()) } @@ -382,10 +504,15 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC } + if runHTTPTest { + if err := svr.Shutdown(ctx); err != nil { + panic(err) + } + } return nil } -func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) { +func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, net.IP, []*AddrInfo, *metrics.BandwidthCounter) { secureChannel := runenv.StringParam("secure_channel") var security libp2p.Option @@ -421,18 +548,21 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont ai = &peer.AddrInfo{ID: id, Addrs: host.Addrs()} // the peers topic where all instances will advertise their AddrInfo. - peersTopic = sync.NewTopic("peers", new(peer.AddrInfo)) + peersTopic = sync.NewTopic("peers", new(AddrInfo)) // initialize a slice to store the AddrInfos of all other peers in the run. - peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount-1) + peers = make([]*AddrInfo, 0, runenv.TestInstanceCount-1) ) // Publish our own. - initCtx.SyncClient.MustPublish(ctx, peersTopic, ai) + initCtx.SyncClient.MustPublish(ctx, peersTopic, &AddrInfo{ + peerAddr: ai, + ip: ip, + }) // Now subscribe to the peers topic and consume all addresses, storing them // in the peers slice. - peersCh := make(chan *peer.AddrInfo) + peersCh := make(chan *AddrInfo) sctx, scancel := context.WithCancel(ctx) defer scancel() @@ -442,7 +572,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont for len(peers) < cap(peers) { select { case ai := <-peersCh: - if ai.ID == id { + if ai.peerAddr.ID == id { continue // skip over ourselves. } peers = append(peers, ai) @@ -451,7 +581,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont } } - return host, peers, bwcounter + return host, ip, peers, bwcounter } func createDatastore(diskStore bool) (ds.Datastore, error) { diff --git a/testplans/graphsync/manifest.toml b/testplans/graphsync/manifest.toml index ba0c289e..b7929476 100644 --- a/testplans/graphsync/manifest.toml +++ b/testplans/graphsync/manifest.toml @@ -27,3 +27,4 @@ links_per_level = { type = "int", desc = "unixfs links per level", default = "10 raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"} disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"} memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" } +compare_http = { type = "bool", desc = "run a comparison against http", default = "true"}