From 086d06ad1187ebc751b0ff382d93989778ceb69e Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Fri, 20 Dec 2024 15:56:02 -0700 Subject: [PATCH 1/3] WRITING-28094 Add connection churn benchmark --- internal/cmd/benchmark/benchmark_test.go | 3 +- .../cmd/benchmark/connection_churn_test.go | 532 ++++++++++++++++++ internal/cmd/benchmark/go.mod | 6 +- internal/cmd/benchmark/go.sum | 7 + 4 files changed, 546 insertions(+), 2 deletions(-) create mode 100644 internal/cmd/benchmark/connection_churn_test.go diff --git a/internal/cmd/benchmark/benchmark_test.go b/internal/cmd/benchmark/benchmark_test.go index 69e8d12d2d..56e7e76d23 100644 --- a/internal/cmd/benchmark/benchmark_test.go +++ b/internal/cmd/benchmark/benchmark_test.go @@ -4,7 +4,7 @@ // not use this file except in compliance with the License. You may obtain // a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 -package main +package benchmark import ( "archive/tar" @@ -583,6 +583,7 @@ func TestRunAllBenchmarks(t *testing.T) { {name: "BenchmarkMultiFindMany", benchmark: BenchmarkMultiFindMany}, {name: "BenchmarkMultiInsertSmallDocument", benchmark: BenchmarkMultiInsertSmallDocument}, {name: "BenchmarkMultiInsertLargeDocument", benchmark: BenchmarkMultiInsertLargeDocument}, + {name: "BenchmarkConnectionChurn", benchmark: BenchmarkConnectionChurn}, } results := make([]poplarTest, len(cases)) diff --git a/internal/cmd/benchmark/connection_churn_test.go b/internal/cmd/benchmark/connection_churn_test.go new file mode 100644 index 0000000000..deda50c83b --- /dev/null +++ b/internal/cmd/benchmark/connection_churn_test.go @@ -0,0 +1,532 @@ +package benchmark + +import ( + "context" + "fmt" + "math" + "os" + "runtime" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "math/rand" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.mongodb.org/mongo-driver/v2/bson" + "go.mongodb.org/mongo-driver/v2/event" + "go.mongodb.org/mongo-driver/v2/mongo" + "go.mongodb.org/mongo-driver/v2/mongo/options" + "gonum.org/v1/gonum/integrate" + "gonum.org/v1/gonum/stat" +) + +const defaultConnectionChurnDB = "connectionChurnDB" +const defaultLargeCollectionSize = 1000 + +type ccCfg struct { + collName string // name of the large collection + ops uint // number of operations to run + goRoutines uint // number of goroutines to split ops over +} + +type ccResult struct { + RunID int64 // unifying ID for runs between 1 and 100 percentiles + ShortCircuitRate float64 + Throughput float64 // operations per second + ThroughputActual float64 // throughput * startedRate + Percentile float64 + TimeoutRate float64 // timeout errors per operation + StartedRate float64 +} + +func calculateThroughputPercentile(p float64, results []ccResult) float64 { + throughputs := make([]float64, len(results)) + for i := range throughputs { + throughputs[i] = results[i].Throughput + } + + sort.Float64s(throughputs) + + return stat.Quantile(p, stat.Empirical, throughputs, nil) +} + +// calculateSimpsonsE returns an approximation of the area under the curve of +// discrete data defined by [100 percentiles, short-circuit succeeded]. +func calculateSimpsonsE(results []ccResult) float64 { + ff64s := []float64{} + NaNSet := map[int]bool{} + + ff64sAllZero := true + for idx, result := range results { + if !math.IsNaN(result.ShortCircuitRate) { + if result.ShortCircuitRate != 0 { + ff64sAllZero = false + } + ff64s = append(ff64s, result.ShortCircuitRate) + } else { + NaNSet[idx] = true + } + } + + fx64s := []float64{} + fx64sAllZero := true + for idx, result := range results { + if NaNSet[idx] { + continue + } + + if result.Percentile != 0 { + fx64sAllZero = false + } + + fx64s = append(fx64s, result.Percentile) + } + + if fx64sAllZero && ff64sAllZero { + return 0 + } + + return integrate.Simpsons(fx64s, ff64s) +} + +func benchmarkConnectionChurnTO( + b *testing.B, + runID int64, + to time.Duration, + cfg ccCfg, + opts ...*options.ClientOptions, +) ccResult { + clientOpts := options.MergeClientOptions(opts...) + + var connectionsClosed atomic.Int64 + poolMonitor := &event.PoolMonitor{ + Event: func(pe *event.PoolEvent) { + if pe.Type == event.ConnectionClosed { + connectionsClosed.Add(1) + } + }, + } + + var commandFailed atomic.Int64 + var commandSucceeded atomic.Int64 + var commandStarted atomic.Int64 + + cmdMonitor := &event.CommandMonitor{ + Started: func(_ context.Context, cse *event.CommandStartedEvent) { + if cse.CommandName == "find" { + commandStarted.Add(1) + } + }, + Succeeded: func(_ context.Context, cse *event.CommandSucceededEvent) { + if cse.CommandName == "find" { + commandSucceeded.Add(1) + } + }, + Failed: func(_ context.Context, evt *event.CommandFailedEvent) { + if evt.CommandName == "find" { + commandFailed.Add(1) + } + }, + } + + clientOpts.SetTimeout(0).SetMonitor(cmdMonitor).SetPoolMonitor(poolMonitor) + + client, err := mongo.Connect(clientOpts) + require.NoError(b, err) + + defer func() { + err := client.Disconnect(context.Background()) + require.NoError(b, err) + }() + + perGoroutine := cfg.ops / cfg.goRoutines + + errs := make(chan error, cfg.goRoutines*perGoroutine) + done := make(chan struct{}, cfg.goRoutines) + + coll := client.Database(defaultConnectionChurnDB).Collection(cfg.collName) + + query := bson.D{{Key: "field1", Value: "doesntexist"}} + + startTime := time.Now() + + // Run the find query on an unindex collection in partitions upto the number + // of goroutines. + for i := 0; i < int(cfg.goRoutines); i++ { + go func(i int) { + for j := 0; j < int(perGoroutine); j++ { + ctx, cancel := context.WithTimeout(context.Background(), to) + + err := coll.FindOne(ctx, query).Err() + cancel() + + if err != nil && err != mongo.ErrNoDocuments { + errs <- err + } + } + + done <- struct{}{} + }(i) + } + + go func() { + defer close(errs) + defer close(done) + + for i := 0; i < int(cfg.goRoutines); i++ { + <-done + } + }() + + gotTimeoutErrCount := 0 + for err := range errs { + if mongo.IsTimeout(err) { + // We don't consider "ErrDeadlineWouldBeExceeded" errors, these would not + // result in a connection closing. + gotTimeoutErrCount++ + } + } + + elapsed := time.Since(startTime) + + timeoutRate := float64(gotTimeoutErrCount) / float64(cfg.ops) + startedRate := float64(commandStarted.Load()) / float64(cfg.ops) + + throughput := float64(cfg.ops) / elapsed.Seconds() + throughputActual := throughput * startedRate + + shortCircuitRate := 1.0 + if gotTimeoutErrCount != 0 { + shortCircuitRate = 1.0 - float64(connectionsClosed.Load())/float64(gotTimeoutErrCount) + } + + //b.ReportMetric(float64(cfg.ops), "ops") + //b.ReportMetric(float64(commandStarted.Load()), "round-trips") + //b.ReportMetric(float64(commandFailed.Load()), "failures") + //b.ReportMetric(float64(gotTimeoutErrCount), "t/o") + //b.ReportMetric(shortCircuitRate, "closure/t/o") + + //fmt.Println("data: ", cfg.ops, commandStarted, commandSucceeded, commandFailed, gotTimeoutErrCount) + return ccResult{ + RunID: runID, + ShortCircuitRate: shortCircuitRate, + Throughput: throughput, + ThroughputActual: throughputActual, + TimeoutRate: timeoutRate, + StartedRate: startedRate, + } +} + +// Record metrics to an Atlas server. +func recordCCThroughputResults(b *testing.B, results []ccResult) { + metricAtlasURI := os.Getenv("METRICS_ATLAS_URI") + if metricAtlasURI == "" { + return + } + + client, err := mongo.Connect(options.Client().ApplyURI(metricAtlasURI)) + require.NoError(b, err) + + defer func() { + err := client.Disconnect(context.Background()) + require.NoError(b, err) + }() + + coll := client.Database(defaultConnectionChurnDB).Collection("throughput") + + _, err = coll.InsertMany(context.Background(), results) + require.NoError(b, err) +} + +func BenchmarkConnectionChurn(b *testing.B) { + uri := os.Getenv("MONGODB_URI") + if uri == "" { + uri = "mongodb://localhost:27017" + } + + tests := []struct { + name string + goRoutines int + operations int + clientOpts *options.ClientOptions + }{ + { + name: "low volume and small pool", + goRoutines: runtime.NumCPU(), + operations: 100, + clientOpts: options.Client().SetMaxPoolSize(1).ApplyURI(uri), + }, + } + + setupContext, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + // Load the test data for the test case. + collName, err := loadLargeCollection(setupContext, defaultLargeCollectionSize, options.Client().ApplyURI(uri)) + require.NoError(b, err) + + // Teardown collection. + defer func() { + client, err := mongo.Connect(options.Client().ApplyURI(uri)) + require.NoError(b, err) + + coll := client.Database(defaultConnectionChurnDB).Collection(collName) + + err = coll.Drop(context.Background()) + require.NoError(b, err) + }() + + // Load the 1st through 100th percentiles from concurrently sampling op time. + latencyPercentiles, err := sampleLatency(setupContext, collName, options.Client().ApplyURI(uri)) + require.NoError(b, err) + + for _, tcase := range tests { + b.Run(tcase.name, func(b *testing.B) { + id := time.Now().Unix() + + results := make([]ccResult, len(latencyPercentiles)) + for i := range results { + cfg := ccCfg{ + collName: collName, + goRoutines: uint(tcase.goRoutines), + ops: uint(tcase.operations), + } + + results[i] = benchmarkConnectionChurnTO(b, id, latencyPercentiles[i], cfg, tcase.clientOpts) + results[i].Percentile = float64(i) / 100.0 // Normalize the percentile + } + + b.ReportMetric(calculateSimpsonsE(results), "E") + b.ReportMetric(calculateThroughputPercentile(0.5, results), "T(0.5)") + b.ReportMetric(calculateThroughputPercentile(0.01, results), "T(0.01)") + + recordCCThroughputResults(b, results) + }) + } +} + +// loadLargeCollection will dedicate a worker pool to inserting test data into +// an unindexed collection. Each record is 31 bytes in size. +func loadLargeCollection(ctx context.Context, size int, opts ...*options.ClientOptions) (string, error) { + client, err := mongo.Connect(opts...) + if err != nil { + return "", fmt.Errorf("failed to create client: %w", err) + } + + defer func() { + if err := client.Disconnect(ctx); err != nil { + panic(err) + } + }() + + // Initialize a collection with the name "large". + collName := fmt.Sprintf("large%s", uuid.NewString()) + + goRoutines := runtime.NumCPU() + + // Partition the volume into equal sizes per go routine. Use the floor if the + // volume is not divisible by the number of goroutines. + perGoroutine := size / goRoutines + + docs := make([]interface{}, perGoroutine) + for i := range docs { + docs[i] = bson.D{ + {Key: "field1", Value: rand.Int63()}, + {Key: "field2", Value: rand.Int31()}, + } + } + + errs := make(chan error, goRoutines) + done := make(chan struct{}, goRoutines) + + coll := client.Database(defaultConnectionChurnDB).Collection(collName) + + for i := 0; i < int(goRoutines); i++ { + go func(i int) { + _, err := coll.InsertMany(ctx, docs) + if err != nil { + errs <- fmt.Errorf("goroutine %v failed: %w", i, err) + } + + done <- struct{}{} + }(i) + } + + go func() { + defer close(errs) + defer close(done) + + for i := 0; i < int(goRoutines); i++ { + <-done + } + }() + + // Await errors and return the first error encountered. + for err := range errs { + if err != nil { + return "", err + } + } + + return collName, nil +} + +func TestLoadLargeCollection(t *testing.T) { + tests := []struct { + name string + size int + wantErr string + }{ + { + name: "size of 100", + size: 100, + wantErr: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + collName, err := loadLargeCollection(ctx, tt.size) + if err != nil { + require.Equal(t, tt.wantErr, err.Error()) + + return + } + + assert.NotEmpty(t, collName) + + // Lookup the collection and assert the size is as expected. + client, err := mongo.Connect() + require.NoError(t, err) + + defer func() { + err := client.Disconnect(context.Background()) + require.NoError(t, err) + }() + + coll := client.Database(defaultConnectionChurnDB).Collection(collName) + + defer func() { + err := coll.Drop(context.Background()) + require.NoError(t, err) + }() + + count, err := coll.CountDocuments(context.Background(), bson.D{}) + require.NoError(t, err) + + assert.Equal(t, int64(tt.size), count) + }) + } + +} + +// calculatePercentilesDuration calculates the 1st through 100th percentiles of a sample +func calculatePercentilesDuration(durations []time.Duration) []time.Duration { + if len(durations) == 0 { + return nil // Handle empty input + } + + // Convert durations to float64 for processing + sampleFloats := make([]float64, len(durations)) + for i, d := range durations { + sampleFloats[i] = float64(d) + } + + // Sort the sampleFloats array + sort.Float64s(sampleFloats) + + percentiles := make([]time.Duration, 100) + for i := 1; i <= 100; i++ { + // Calculate the percentile using gonum/stat + value := stat.Quantile(float64(i)/100, stat.Empirical, sampleFloats, nil) + percentiles[i-1] = time.Duration(value) + } + + return percentiles +} + +// sampleLatency will concurrently run a find query {field1: doesnotexist} a +// pre-defined number of times greater than 100, recording the time each +// operation takes and returning the percetiles 1-100 of the sample data. +func sampleLatency(ctx context.Context, collName string, opts ...*options.ClientOptions) ([]time.Duration, error) { + const sampleSize = 1000 + + client, err := mongo.Connect(opts...) + if err != nil { + return nil, fmt.Errorf("failed to create client: %w", err) + } + + defer func() { + if err := client.Disconnect(ctx); err != nil { + panic(err) + } + }() + + coll := client.Database(defaultConnectionChurnDB).Collection(collName) + + samples := make([]time.Duration, 0, sampleSize) + + goRoutines := runtime.NumCPU() + + errs := make(chan error, goRoutines) + done := make(chan struct{}, goRoutines) + + query := bson.D{{Key: "field1", Value: "doesntexist"}} + + // Warm up the server: + for i := 0; i < 100; i++ { + coll.FindOne(context.Background(), query) + } + + var samplesMu sync.Mutex + for i := 0; i < int(goRoutines); i++ { + go func() { + // Higher durations yield more accurate statistics. + durations := make([]time.Duration, 10) + for i := 0; i < len(durations); i++ { + start := time.Now() + err := coll.FindOne(context.Background(), query).Err() + durations[i] = time.Since(start) + + if err != nil && err != mongo.ErrNoDocuments { + errs <- fmt.Errorf("failed to collect query stats: %w", err) + + break + } + } + + samplesMu.Lock() + samples = append(samples, durations...) + samplesMu.Unlock() + + done <- struct{}{} + }() + } + + go func() { + defer close(errs) + + for i := 0; i < int(goRoutines); i++ { + <-done + } + }() + + for err := range errs { + if err != nil { + return nil, err + } + } + + samplesMu.Lock() + defer samplesMu.Unlock() + + return calculatePercentilesDuration(samples), nil +} diff --git a/internal/cmd/benchmark/go.mod b/internal/cmd/benchmark/go.mod index 6582dfd09c..abffa015c8 100644 --- a/internal/cmd/benchmark/go.mod +++ b/internal/cmd/benchmark/go.mod @@ -1,12 +1,16 @@ module go.mongodb.go/mongo-driver/internal/cmd/benchmark -go 1.18 +go 1.22 + +toolchain go1.23.1 replace go.mongodb.org/mongo-driver/v2 => ../../../ require ( + github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.8.1 go.mongodb.org/mongo-driver/v2 v2.0.0-00010101000000-000000000000 + gonum.org/v1/gonum v0.15.1 ) require ( diff --git a/internal/cmd/benchmark/go.sum b/internal/cmd/benchmark/go.sum index aa205ecc72..e3a14ea3d2 100644 --- a/internal/cmd/benchmark/go.sum +++ b/internal/cmd/benchmark/go.sum @@ -4,6 +4,9 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= @@ -33,6 +36,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.29.0 h1:L5SG1JTTXupVV3n6sUqMTeWbjAyfPwoda2DLX8J8FrQ= golang.org/x/crypto v0.29.0/go.mod h1:+F4F4N5hv6v38hfeYwTdx20oUvLLc+QfrE9Ax9HtgRg= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa h1:FRnLl4eNAQl8hwxVVC17teOw8kdjVDVAiFMtgUdTSRQ= +golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa/go.mod h1:zk2irFbV9DP96SEBUUAy67IdHUaZuSnrz1n472HUCLE= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= @@ -58,6 +63,8 @@ golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGm golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.15.1 h1:FNy7N6OUZVUaWG9pTiD+jlhdQ3lMP+/LcTpJ6+a8sQ0= +gonum.org/v1/gonum v0.15.1/go.mod h1:eZTZuRFrzu5pcyjN5wJhcIhnUdNijYxX1T2IcrOGY0o= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= From 2c7ef39e7529b0d1f2abf7d3e60391acb3cecf3a Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Tue, 31 Dec 2024 10:35:30 -0700 Subject: [PATCH 2/3] Clean up code --- .../cmd/benchmark/connection_churn_test.go | 30 +++++++++++-------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/internal/cmd/benchmark/connection_churn_test.go b/internal/cmd/benchmark/connection_churn_test.go index deda50c83b..6becd16718 100644 --- a/internal/cmd/benchmark/connection_churn_test.go +++ b/internal/cmd/benchmark/connection_churn_test.go @@ -35,13 +35,14 @@ type ccCfg struct { } type ccResult struct { - RunID int64 // unifying ID for runs between 1 and 100 percentiles - ShortCircuitRate float64 - Throughput float64 // operations per second - ThroughputActual float64 // throughput * startedRate - Percentile float64 - TimeoutRate float64 // timeout errors per operation - StartedRate float64 + RunID int64 // unifying ID for runs between 1 and 100 percentiles + ShortCircuitRate float64 + Throughput float64 // operations per second + ThroughputActual float64 // throughput * startedRate + ThroughputSuccess float64 // throughput * successRate + Percentile float64 + TimeoutRate float64 // timeout errors per operation + StartedRate float64 } func calculateThroughputPercentile(p float64, results []ccResult) float64 { @@ -196,9 +197,11 @@ func benchmarkConnectionChurnTO( timeoutRate := float64(gotTimeoutErrCount) / float64(cfg.ops) startedRate := float64(commandStarted.Load()) / float64(cfg.ops) + successRate := float64(commandSucceeded.Load()) / float64(cfg.ops) throughput := float64(cfg.ops) / elapsed.Seconds() throughputActual := throughput * startedRate + throughputSuccess := throughput * successRate shortCircuitRate := 1.0 if gotTimeoutErrCount != 0 { @@ -213,12 +216,13 @@ func benchmarkConnectionChurnTO( //fmt.Println("data: ", cfg.ops, commandStarted, commandSucceeded, commandFailed, gotTimeoutErrCount) return ccResult{ - RunID: runID, - ShortCircuitRate: shortCircuitRate, - Throughput: throughput, - ThroughputActual: throughputActual, - TimeoutRate: timeoutRate, - StartedRate: startedRate, + RunID: runID, + ShortCircuitRate: shortCircuitRate, + Throughput: throughput, + ThroughputActual: throughputActual, + ThroughputSuccess: throughputSuccess, + TimeoutRate: timeoutRate, + StartedRate: startedRate, } } From fb1b98db96cedbb7ff5202e0a4421146843384bc Mon Sep 17 00:00:00 2001 From: Preston Vasquez Date: Tue, 21 Jan 2025 19:31:36 -0700 Subject: [PATCH 3/3] WRITING-28094 add connection ready metric --- .../cmd/benchmark/connection_churn_test.go | 73 ++++++++++++------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/internal/cmd/benchmark/connection_churn_test.go b/internal/cmd/benchmark/connection_churn_test.go index 6becd16718..c9494c43bc 100644 --- a/internal/cmd/benchmark/connection_churn_test.go +++ b/internal/cmd/benchmark/connection_churn_test.go @@ -35,14 +35,16 @@ type ccCfg struct { } type ccResult struct { - RunID int64 // unifying ID for runs between 1 and 100 percentiles - ShortCircuitRate float64 - Throughput float64 // operations per second - ThroughputActual float64 // throughput * startedRate - ThroughputSuccess float64 // throughput * successRate - Percentile float64 - TimeoutRate float64 // timeout errors per operation - StartedRate float64 + RunID int64 // unifying ID for runs between 1 and 100 percentiles + ShortCircuitRate float64 + Throughput float64 // operations per second + ThroughputActual float64 // throughput * startedRate + ThroughputSuccess float64 // throughput * successRate + Percentile float64 + TimeoutRate float64 // timeout errors per operation + StartedRate float64 + Elapsed float64 + ConnectionReadDurationsMS []float64 } func calculateThroughputPercentile(p float64, results []ccResult) float64 { @@ -56,6 +58,15 @@ func calculateThroughputPercentile(p float64, results []ccResult) float64 { return stat.Quantile(p, stat.Empirical, throughputs, nil) } +func calculateAverageElapsed(results []ccResult) float64 { + elapsed := make([]float64, len(results)) + for i := range elapsed { + elapsed[i] = results[i].Elapsed + } + + return stat.Mean(elapsed, nil) +} + // calculateSimpsonsE returns an approximation of the area under the curve of // discrete data defined by [100 percentiles, short-circuit succeeded]. func calculateSimpsonsE(results []ccResult) float64 { @@ -102,13 +113,24 @@ func benchmarkConnectionChurnTO( cfg ccCfg, opts ...*options.ClientOptions, ) ccResult { + b.Logf("starting benchmark") clientOpts := options.MergeClientOptions(opts...) var connectionsClosed atomic.Int64 + + connectionReadyDurationsMu := sync.Mutex{} + connectionReadyDurations := []float64{} + poolMonitor := &event.PoolMonitor{ Event: func(pe *event.PoolEvent) { - if pe.Type == event.ConnectionClosed { + switch pe.Type { + case event.ConnectionClosed: connectionsClosed.Add(1) + case event.ConnectionReady: + fmt.Println("READY!") + connectionReadyDurationsMu.Lock() + connectionReadyDurations = append(connectionReadyDurations, float64(pe.Duration)/float64(time.Millisecond)) + connectionReadyDurationsMu.Unlock() } }, } @@ -208,21 +230,17 @@ func benchmarkConnectionChurnTO( shortCircuitRate = 1.0 - float64(connectionsClosed.Load())/float64(gotTimeoutErrCount) } - //b.ReportMetric(float64(cfg.ops), "ops") - //b.ReportMetric(float64(commandStarted.Load()), "round-trips") - //b.ReportMetric(float64(commandFailed.Load()), "failures") - //b.ReportMetric(float64(gotTimeoutErrCount), "t/o") - //b.ReportMetric(shortCircuitRate, "closure/t/o") - - //fmt.Println("data: ", cfg.ops, commandStarted, commandSucceeded, commandFailed, gotTimeoutErrCount) + fmt.Println("data: ", cfg.ops, commandStarted.Load()) return ccResult{ - RunID: runID, - ShortCircuitRate: shortCircuitRate, - Throughput: throughput, - ThroughputActual: throughputActual, - ThroughputSuccess: throughputSuccess, - TimeoutRate: timeoutRate, - StartedRate: startedRate, + RunID: runID, + ShortCircuitRate: shortCircuitRate, + Throughput: throughput, + ThroughputActual: throughputActual, + ThroughputSuccess: throughputSuccess, + TimeoutRate: timeoutRate, + StartedRate: startedRate, + Elapsed: elapsed.Seconds(), + ConnectionReadDurationsMS: connectionReadyDurations, } } @@ -260,9 +278,10 @@ func BenchmarkConnectionChurn(b *testing.B) { clientOpts *options.ClientOptions }{ { - name: "low volume and small pool", - goRoutines: runtime.NumCPU(), - operations: 100, + name: "low volume and small pool", + //goRoutines: runtime.NumCPU(), + goRoutines: 50, + operations: 500, clientOpts: options.Client().SetMaxPoolSize(1).ApplyURI(uri), }, } @@ -301,7 +320,7 @@ func BenchmarkConnectionChurn(b *testing.B) { ops: uint(tcase.operations), } - results[i] = benchmarkConnectionChurnTO(b, id, latencyPercentiles[i], cfg, tcase.clientOpts) + results[i] = benchmarkConnectionChurnTO(b, id, latencyPercentiles[i]*5, cfg, tcase.clientOpts) results[i].Percentile = float64(i) / 100.0 // Normalize the percentile }