Skip to content

Commit

Permalink
feat(testplan): add http comparison
Browse files Browse the repository at this point in the history
  • Loading branch information
acruikshank authored and hannahhoward committed Feb 26, 2021
1 parent f39ea50 commit 2db8f7a
Show file tree
Hide file tree
Showing 3 changed files with 151 additions and 19 deletions.
1 change: 1 addition & 0 deletions testplans/graphsync/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/libp2p/go-libp2p-secio v0.2.2
github.com/libp2p/go-libp2p-tls v0.1.3
github.com/libp2p/go-sockaddr v0.1.0 // indirect
github.com/multiformats/go-multiaddr v0.3.1
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/testground/sdk-go v0.2.7-0.20201112151952-8ee00c80c3ec
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
Expand Down
168 changes: 149 additions & 19 deletions testplans/graphsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package main
import (
"context"
"crypto/rand"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
goruntime "runtime"
Expand Down Expand Up @@ -36,6 +40,7 @@ import (
noise "github.com/libp2p/go-libp2p-noise"
secio "github.com/libp2p/go-libp2p-secio"
tls "github.com/libp2p/go-libp2p-tls"
ma "github.com/multiformats/go-multiaddr"
"github.com/testground/sdk-go/network"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
Expand All @@ -48,6 +53,60 @@ import (
"github.com/ipfs/go-graphsync/storeutil"
)

type AddrInfo struct {
peerAddr *peer.AddrInfo
ip net.IP
}

func (pi AddrInfo) MarshalJSON() ([]byte, error) {
out := make(map[string]interface{})
peerJSON, err := pi.peerAddr.MarshalJSON()
if err != nil {
panic(fmt.Sprintf("error marshaling: %v", err))
}
out["PEER"] = string(peerJSON)

ip, err := pi.ip.MarshalText()
if err != nil {
panic(fmt.Sprintf("error marshaling: %v", err))
}
out["IP"] = string(ip)
return json.Marshal(out)
}

func (pi *AddrInfo) UnmarshalJSON(b []byte) error {
var data map[string]interface{}
err := json.Unmarshal(b, &data)
if err != nil {
panic(fmt.Sprintf("error unmarshaling: %v", err))
}

var pa peer.AddrInfo
pi.peerAddr = &pa
peerAddrData := data["PEER"].(string)
var peerData map[string]interface{}
err = json.Unmarshal([]byte(peerAddrData), &peerData)
if err != nil {
panic(err)
}
pid, err := peer.Decode(peerData["ID"].(string))
if err != nil {
panic(err)
}
pi.peerAddr.ID = pid
addrs, ok := peerData["Addrs"].([]interface{})
if ok {
for _, a := range addrs {
pi.peerAddr.Addrs = append(pi.peerAddr.Addrs, ma.StringCast(a.(string)))
}
}

if err := pi.ip.UnmarshalText([]byte(data["IP"].(string))); err != nil {
panic(fmt.Sprintf("error unmarshaling: %v", err))
}
return nil
}

var testcases = map[string]interface{}{
"stress": run.InitializedTestCaseFn(runStress),
}
Expand Down Expand Up @@ -80,7 +139,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {

initCtx.MustWaitAllInstancesInitialized(ctx)

host, peers, _ := makeHost(ctx, runenv, initCtx)
host, ip, peers, _ := makeHost(ctx, runenv, initCtx)
defer host.Close()

datastore, err := createDatastore(runenv.BooleanParam("disk_store"))
Expand Down Expand Up @@ -117,7 +176,7 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
})
err := runProvider(ctx, runenv, initCtx, dagsrv, size, networkParams, concurrency, memorySnapshots, recorder)
err := runProvider(ctx, runenv, initCtx, dagsrv, size, ip, networkParams, concurrency, memorySnapshots, recorder)
if err != nil {
runenv.RecordMessage("Error running provider: %s", err.Error())
}
Expand All @@ -129,8 +188,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
recorder.recordBlock()
})

p := *peers[0]
if err := host.Connect(ctx, p); err != nil {
p := peers[0]
if err := host.Connect(ctx, *p.peerAddr); err != nil {
return err
}
runenv.RecordMessage("done dialling provider")
Expand Down Expand Up @@ -200,13 +259,15 @@ func parseMemorySnapshotsParam(runenv *runtime.RunEnv) snapshotMode {
}
}

func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p peer.AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, gsync gs.GraphExchange, p *AddrInfo, dagsrv format.DAGService, networkParams []networkParams, concurrency int, size uint64, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
// create a selector for the whole UnixFS dag
sel = allselector.AllSelector
)

runHTTPTest := runenv.BooleanParam("compare_http")

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
Expand Down Expand Up @@ -251,28 +312,51 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
runenv.RecordMessage("\t>>> requesting CID %s", c)

start := time.Now()
respCh, errCh := gsync.Request(grpctx, p.ID, clink, sel)
respCh, errCh := gsync.Request(grpctx, p.peerAddr.ID, clink, sel)
for range respCh {
}
for err := range errCh {
return err
}
dur := time.Since(start)

runenv.RecordMessage("\t<<< request complete with no errors")
runenv.RecordMessage("\t<<< graphsync request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)

measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size))
measurement = strings.Replace(measurement, " ", "", -1)
runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second))

if runHTTPTest {
runenv.R().RecordPoint(measurement+",transport=graphsync", float64(dur)/float64(time.Second))
} else {
runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second))
}
// verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil {
return err
} else if node == nil {
return fmt.Errorf("finished graphsync request, but CID not in store")
}

if runHTTPTest {
// request file directly over http
start = time.Now()
file, err := ioutil.TempFile(os.TempDir(), fmt.Sprintf("%s-", c.String()))
if err != nil {
panic(err)
}
resp, err := http.Get(fmt.Sprintf("http://%s:8080/%s", p.ip.String(), c.String()))
if err != nil {
panic(err)
}
bytesRead, err := io.Copy(file, resp.Body)
if err != nil {
panic(err)
}
dur = time.Since(start)

runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead))
runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)
runenv.R().RecordPoint(measurement+",transport=http", float64(dur)/float64(time.Second))
}
return nil
})
}
Expand All @@ -292,12 +376,26 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
return nil
}

func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext, dagsrv format.DAGService, size uint64, ip net.IP, networkParams []networkParams, concurrency int, memorySnapshots snapshotMode, recorder *runRecorder) error {
var (
cids []cid.Cid
bufferedDS = format.NewBufferedDAG(ctx, dagsrv)
)

runHTTPTest := runenv.BooleanParam("compare_http")
var svr *http.Server
if runHTTPTest {
// start an http server on port 8080
runenv.RecordMessage("creating http server at http://%s:8080", ip.String())
svr = &http.Server{Addr: ":8080"}

go func() {
if err := svr.ListenAndServe(); err != nil {
runenv.RecordMessage("shutdown http server at http://%s:8080", ip.String())
}
}()
}

for round, np := range networkParams {
var (
topicCid = sync.NewTopic(fmt.Sprintf("cid-%d", round), []cid.Cid{})
Expand All @@ -322,7 +420,14 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
// generate as many random files as the concurrency level.
for i := 0; i < concurrency; i++ {
// file with random data
file := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
data := files.NewReaderFile(io.LimitReader(rand.Reader, int64(size)))
file, err := ioutil.TempFile(os.TempDir(), "unixfs-")
if err != nil {
panic(err)
}
if _, err := io.Copy(file, data); err != nil {
panic(err)
}

unixfsChunkSize := uint64(1) << runenv.IntParam("chunk_size")
unixfsLinksPerLevel := runenv.IntParam("links_per_level")
Expand All @@ -334,6 +439,9 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
Dagserv: bufferedDS,
}

if _, err := file.Seek(0, 0); err != nil {
panic(err)
}
db, err := params.New(chunk.NewSizeSplitter(file, int64(unixfsChunkSize)))
if err != nil {
return fmt.Errorf("unable to setup dag builder: %w", err)
Expand All @@ -344,6 +452,20 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
return fmt.Errorf("unable to create unix fs node: %w", err)
}

if runHTTPTest {
// set up http server to send file
http.HandleFunc(fmt.Sprintf("/%s", node.Cid()), func(w http.ResponseWriter, r *http.Request) {
fileReader, err := os.Open(file.Name())
defer fileReader.Close()
if err != nil {
panic(err)
}
_, err = io.Copy(w, fileReader)
if err != nil {
panic(err)
}
})
}
cids = append(cids, node.Cid())
}

Expand Down Expand Up @@ -382,10 +504,15 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC

}

if runHTTPTest {
if err := svr.Shutdown(ctx); err != nil {
panic(err)
}
}
return nil
}

func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, []*peer.AddrInfo, *metrics.BandwidthCounter) {
func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitContext) (host.Host, net.IP, []*AddrInfo, *metrics.BandwidthCounter) {
secureChannel := runenv.StringParam("secure_channel")

var security libp2p.Option
Expand Down Expand Up @@ -421,18 +548,21 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
ai = &peer.AddrInfo{ID: id, Addrs: host.Addrs()}

// the peers topic where all instances will advertise their AddrInfo.
peersTopic = sync.NewTopic("peers", new(peer.AddrInfo))
peersTopic = sync.NewTopic("peers", new(AddrInfo))

// initialize a slice to store the AddrInfos of all other peers in the run.
peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount-1)
peers = make([]*AddrInfo, 0, runenv.TestInstanceCount-1)
)

// Publish our own.
initCtx.SyncClient.MustPublish(ctx, peersTopic, ai)
initCtx.SyncClient.MustPublish(ctx, peersTopic, &AddrInfo{
peerAddr: ai,
ip: ip,
})

// Now subscribe to the peers topic and consume all addresses, storing them
// in the peers slice.
peersCh := make(chan *peer.AddrInfo)
peersCh := make(chan *AddrInfo)
sctx, scancel := context.WithCancel(ctx)
defer scancel()

Expand All @@ -442,7 +572,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
for len(peers) < cap(peers) {
select {
case ai := <-peersCh:
if ai.ID == id {
if ai.peerAddr.ID == id {
continue // skip over ourselves.
}
peers = append(peers, ai)
Expand All @@ -451,7 +581,7 @@ func makeHost(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitCont
}
}

return host, peers, bwcounter
return host, ip, peers, bwcounter
}

func createDatastore(diskStore bool) (ds.Datastore, error) {
Expand Down
1 change: 1 addition & 0 deletions testplans/graphsync/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ links_per_level = { type = "int", desc = "unixfs links per level", default = "10
raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", default = "true"}
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
compare_http = { type = "bool", desc = "run a comparison against http", default = "true"}

0 comments on commit 2db8f7a

Please # to comment.