Skip to content

Commit

Permalink
feat(testplan): add configuration & response timing
Browse files Browse the repository at this point in the history
Add configuration for memory buffering and disabling unlimited options, also add timing of responses
  • Loading branch information
hannahhoward committed Mar 2, 2021
1 parent d604c78 commit 43ae998
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 26 deletions.
87 changes: 61 additions & 26 deletions testplans/graphsync/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
goruntime "runtime"
"runtime/pprof"
"strings"
gosync "sync"
"time"

dgbadger "github.com/dgraph-io/badger/v2"
Expand Down Expand Up @@ -147,6 +148,9 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("datastore error: %s", err.Error())
return err
}

maxMemoryPerPeer := runenv.SizeParam("max_memory_per_peer")
maxMemoryTotal := runenv.SizeParam("max_memory_total")
var (
// make datastore, blockstore, dag service, graphsync
bs = blockstore.NewBlockstore(dss.MutexWrap(datastore))
Expand All @@ -155,6 +159,8 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
gsnet.NewFromLibp2pHost(host),
storeutil.LoaderForBlockstore(bs),
storeutil.StorerForBlockstore(bs),
gsi.MaxMemoryPerPeerResponder(maxMemoryPerPeer),
gsi.MaxMemoryResponder(maxMemoryTotal),
)
recorder = &runRecorder{memorySnapshots: memorySnapshots, runenv: runenv}
)
Expand All @@ -170,8 +176,31 @@ func runStress(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
runenv.RecordMessage("we are the provider")
defer runenv.RecordMessage("done provider")

startTimes := make(map[struct {
peer.ID
gs.RequestID
}]time.Time)
var startTimesLk gosync.Mutex
gsync.RegisterIncomingRequestHook(func(p peer.ID, request gs.RequestData, hookActions gs.IncomingRequestHookActions) {
hookActions.ValidateRequest()
startTimesLk.Lock()
startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}] = time.Now()
startTimesLk.Unlock()
})
gsync.RegisterCompletedResponseListener(func(p peer.ID, request gs.RequestData, status gs.ResponseStatusCode) {
startTimesLk.Lock()
startTime, ok := startTimes[struct {
peer.ID
gs.RequestID
}{p, request.ID()}]
startTimesLk.Unlock()
if ok && status == gs.RequestCompletedFull {
duration := time.Since(startTime)
recorder.recordRun(duration)
}
})
gsync.RegisterBlockSentListener(func(p peer.ID, request gs.RequestData, block gs.BlockData) {
recorder.recordBlock()
Expand Down Expand Up @@ -218,8 +247,12 @@ func parseNetworkConfig(runenv *runtime.RunEnv) []networkParams {
// prepend bandwidth=0 and latency=0 zero values; the first iteration will
// be a control iteration. The sidecar interprets zero values as no
// limitation on that attribute.
bandwidths = append([]uint64{0}, bandwidths...)
latencies = append([]time.Duration{0}, latencies...)
if runenv.BooleanParam("unlimited_bandwidth_case") {
bandwidths = append([]uint64{0}, bandwidths...)
}
if runenv.BooleanParam("no_latency_case") {
latencies = append([]time.Duration{0}, latencies...)
}

var ret []networkParams
for _, bandwidth := range bandwidths {
Expand Down Expand Up @@ -276,17 +309,14 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
)

recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
recorder.beginRun(np, size, concurrency, round)

// clean up previous CIDs to attempt to free memory
// TODO does this work?
_ = dagsrv.RemoveMany(ctx, cids)

runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)

sctx, scancel := context.WithCancel(ctx)
cidCh := make(chan []cid.Cid, 1)
initCtx.SyncClient.MustSubscribe(sctx, topicCid, cidCh)
Expand All @@ -301,8 +331,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init

errgrp, grpctx := errgroup.WithContext(ctx)
for _, c := range cids {
c := c // capture
np := np // capture
c := c // capture

errgrp.Go(func() error {
// make a go-ipld-prime link for the root UnixFS node
Expand All @@ -320,16 +349,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
}
dur := time.Since(start)

runenv.RecordMessage("\t<<< graphsync request complete with no errors")
runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)

measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", np.latency, humanize.IBytes(np.bandwidth), concurrency, humanize.Bytes(size))
measurement = strings.Replace(measurement, " ", "", -1)
if runHTTPTest {
runenv.R().RecordPoint(measurement+",transport=graphsync", float64(dur)/float64(time.Second))
} else {
runenv.R().RecordPoint(measurement, float64(dur)/float64(time.Second))
}
recorder.recordRun(dur)
// verify that we have the CID now.
if node, err := dagsrv.Get(grpctx, c); err != nil {
return err
Expand All @@ -352,10 +372,7 @@ func runRequestor(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.Init
panic(err)
}
dur = time.Since(start)

runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead))
runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", round, np.latency, np.bandwidth, dur)
runenv.R().RecordPoint(measurement+",transport=http", float64(dur)/float64(time.Second))
recorder.recordHTTPRun(dur, bytesRead)
}
return nil
})
Expand Down Expand Up @@ -403,10 +420,10 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
stateFinish = sync.State(fmt.Sprintf("finish-%d", round))
stateNet = sync.State(fmt.Sprintf("network-configured-%d", round))
)
recorder.beginRun(np, size, concurrency)

// wait for all instances to be ready for the next state.
initCtx.SyncClient.MustSignalAndWait(ctx, stateNext, runenv.TestInstanceCount)
recorder.beginRun(np, size, concurrency, round)

// remove the previous CIDs from the dag service; hopefully this
// will delete them from the store and free up memory.
Expand All @@ -415,8 +432,6 @@ func runProvider(ctx context.Context, runenv *runtime.RunEnv, initCtx *run.InitC
}
cids = cids[:0]

runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", round, np.latency, np.bandwidth)

// generate as many random files as the concurrency level.
for i := 0; i < concurrency; i++ {
// file with random data
Expand Down Expand Up @@ -651,7 +666,9 @@ type runRecorder struct {
np networkParams
size uint64
concurrency int
round int
runenv *runtime.RunEnv
measurement string
}

func (rr *runRecorder) recordBlock() {
Expand All @@ -663,9 +680,27 @@ func (rr *runRecorder) recordBlock() {
rr.index++
}

func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int) {
func (rr *runRecorder) recordRun(duration time.Duration) {
rr.runenv.RecordMessage("\t<<< graphsync request complete with no errors")
rr.runenv.RecordMessage("***** ROUND %d observed duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration)
rr.runenv.R().RecordPoint(rr.measurement+",transport=graphsync", float64(duration)/float64(time.Second))
}

func (rr *runRecorder) recordHTTPRun(duration time.Duration, bytesRead int64) {
rr.runenv.RecordMessage(fmt.Sprintf("\t<<< http request complete with no errors, read %d bytes", bytesRead))
rr.runenv.RecordMessage("***** ROUND %d observed http duration (lat=%s,bw=%d): %s", rr.round, rr.np.latency, rr.np.bandwidth, duration)
rr.runenv.R().RecordPoint(rr.measurement+",transport=http", float64(duration)/float64(time.Second))
}

func (rr *runRecorder) beginRun(np networkParams, size uint64, concurrency int, round int) {

rr.concurrency = concurrency
rr.np = np
rr.size = size
rr.index = 0
rr.round = round
rr.runenv.RecordMessage("===== ROUND %d: latency=%s, bandwidth=%d =====", rr.round, rr.np.latency, rr.np.bandwidth)
measurement := fmt.Sprintf("duration.sec,lat=%s,bw=%s,concurrency=%d,size=%s", rr.np.latency, humanize.IBytes(rr.np.bandwidth), rr.concurrency, humanize.Bytes(rr.size))
measurement = strings.Replace(measurement, " ", "", -1)
rr.measurement = measurement
}
4 changes: 4 additions & 0 deletions testplans/graphsync/manifest.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,7 @@ raw_leaves = { type = "bool", desc = "should unixfs leaves be left unwrapped", d
disk_store = { type = "bool", desc = "should data be stored on disk (true) or memory (false)", default = "false"}
memory_snapshots = { type = "string", desc = "what kind of memory snapshots to take (none, simple, detailed)", default = "none" }
compare_http = { type = "bool", desc = "run a comparison against http", default = "true"}
max_memory_per_peer = { type = "int", desc = "max memory a responder can queue up per peer", default = "64MiB"}
max_memory_total = { type = "int", desc = "max memory a responder can queue up total", default = "512MiB"}
unlimited_bandwidth_case = { type = "bool", desc = "run a comparison against http", default = "true"}
no_latency_case = { type = "bool", desc = "run a comparison against http", default = "true"}

0 comments on commit 43ae998

Please # to comment.