Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[refactor] move root span handler into aggregator #5478

Merged
merged 6 commits into from
May 26, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/collector/app/root_span_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ type mockAggregator struct {
func (t *mockAggregator) RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64) {
t.callCount.Add(1)
}

func (t *mockAggregator) HandleRootSpan(span *model.Span, logger *zap.Logger) {
handleRootSpan(t, logger)(span, "")
}
func (t *mockAggregator) Start() {}
func (t *mockAggregator) Close() error {
t.closeCount.Add(1)
Expand Down
6 changes: 6 additions & 0 deletions cmd/collector/app/sampling/strategystore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"io"

"go.uber.org/zap"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)
Expand All @@ -36,6 +38,10 @@ type Aggregator interface {
// Close() from io.Closer stops the aggregator from aggregating throughput.
io.Closer

// The HandleRootSpan function processes a span, checking if it's a root span.
// If it is, it extracts sampler parameters, then calls RecordThroughput.
HandleRootSpan(span *model.Span, logger *zap.Logger)

// RecordThroughput records throughput for an operation for aggregation.
RecordThroughput(service, operation string, samplerType model.SamplerType, probability float64)

Expand Down
17 changes: 17 additions & 0 deletions plugin/sampling/strategystore/adaptive/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,20 @@ func (a *aggregator) Close() error {
a.bgFinished.Wait()
return nil
}

func (a *aggregator) HandleRootSpan(span *span_model.Span, logger *zap.Logger) {
// simply checking parentId to determine if a span is a root span is not sufficient. However,
// we can be sure that only a root span will have sampler tags.
if span.ParentSpanID() != span_model.NewSpanID(0) {
return
}
service := span.Process.ServiceName
if service == "" || span.OperationName == "" {
return
}
samplerType, samplerParam := span.GetSamplerParams(logger)
if samplerType == span_model.SamplerTypeUnrecognized {
return
}
a.RecordThroughput(service, span.OperationName, samplerType, samplerParam)
}
40 changes: 40 additions & 0 deletions plugin/sampling/strategystore/adaptive/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,3 +115,43 @@ func TestLowerboundThroughput(t *testing.T) {
assert.EqualValues(t, 0, a.(*aggregator).currentThroughput["A"]["GET"].Count)
assert.Empty(t, a.(*aggregator).currentThroughput["A"]["GET"].Probabilities["0.001000"])
}

func TestRecordThroughput(t *testing.T) {
metricsFactory := metricstest.NewFactory(0)
mockStorage := &mocks.Store{}
mockEP := &epmocks.ElectionParticipant{}
testOpts := Options{
CalculationInterval: 1 * time.Second,
AggregationBuckets: 1,
BucketsForCalculation: 1,
}
logger := zap.NewNop()
a, err := NewAggregator(testOpts, logger, metricsFactory, mockEP, mockStorage)
require.NoError(t, err)

// Testing non-root span
span := &model.Span{References: []model.SpanRef{{SpanID: model.NewSpanID(1), RefType: model.ChildOf}}}
a.HandleRootSpan(span, logger)
require.Empty(t, a.(*aggregator).currentThroughput)

// Testing span with service name but no operation
span.References = []model.SpanRef{}
span.Process = &model.Process{
ServiceName: "A",
}
a.HandleRootSpan(span, logger)
require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"])

// Testing span with service name and operation but no probabilistic sampling tags
span.OperationName = "GET"
a.HandleRootSpan(span, logger)
require.Nil(t, a.(*aggregator).currentThroughput["A"]["GET"])

// Testing span with service name, operation, and probabilistic sampling tags
span.Tags = model.KeyValues{
model.String("sampler.type", "probabilistic"),
model.String("sampler.param", "0.001"),
}
a.HandleRootSpan(span, logger)
assert.EqualValues(t, 1, a.(*aggregator).currentThroughput["A"]["GET"].Count)
}
Loading