Skip to content

Commit

Permalink
Switch v1 collector pipeline to v2 Writer (jaegertracing#6491)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?
- Part of jaegertracing#6487
- Part of jaegertracing#6474

## Description of the changes
- Swap v1 spanWriter for v2 traceWriter in collector pipeline
- Currently the traceWriter is provided via v1 adapter, so it's always
v1 writer underneath
- And since only v1 spans entry point is currently implemented, there is
no performance impact from additional data transformations
- However, as soon as OTLP entry point is utilized (e.g. via OTLP
receiver), the `ptrace.Traces` batch will be handled via exporterhelp
queue as a single item (not broken into individual spans) and then
passed directly to the writer as a batch. Since the writer is
implemented via adapter the batch will be converted to spans and written
one span at a time. There will be no additional data transformations on
this path either.

## How was this change tested?
- CI

## Outstanding
- [x] Invoking proper preprocessing, like sanitizers and collector tags,
on the OTLP path
- [x] Adequate metrics parity, ideally same as v1 collector
- [ ] Test coverage, including passing a v2-like (mock) writer that
cannot be downgraded to v1
- Idea: parameterize some tests (ideally those that also validate
pre-processing) to execute both v1 and v2 write paths

## Follow-up PRs
* Enable v2 write path from OTLP and Zipkin receivers (they currently
explicitly downgrade to v1). This will also allow adding better unit
tests.

---------

Signed-off-by: Yuri Shkuro <github@ysh.us>
Signed-off-by: Yuri Shkuro <yurishkuro@users.noreply.github.com>
  • Loading branch information
yurishkuro authored and ekefan committed Jan 13, 2025
1 parent ad70e64 commit f6dc762
Show file tree
Hide file tree
Showing 14 changed files with 329 additions and 113 deletions.
2 changes: 1 addition & 1 deletion cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ by default uses only in-memory database.`,
ServiceName: "jaeger-collector",
Logger: logger,
MetricsFactory: collectorMetricsFactory,
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
SamplingProvider: samplingProvider,
SamplingAggregator: samplingAggregator,
HealthCheck: svc.HC(),
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

const (
Expand All @@ -37,7 +37,7 @@ type Collector struct {
serviceName string
logger *zap.Logger
metricsFactory metrics.Factory
spanWriter spanstore.Writer
traceWriter tracestore.Writer
samplingProvider samplingstrategy.Provider
samplingAggregator samplingstrategy.Aggregator
hCheck *healthcheck.HealthCheck
Expand All @@ -57,7 +57,7 @@ type CollectorParams struct {
ServiceName string
Logger *zap.Logger
MetricsFactory metrics.Factory
SpanWriter spanstore.Writer
TraceWriter tracestore.Writer
SamplingProvider samplingstrategy.Provider
SamplingAggregator samplingstrategy.Aggregator
HealthCheck *healthcheck.HealthCheck
Expand All @@ -70,7 +70,7 @@ func New(params *CollectorParams) *Collector {
serviceName: params.ServiceName,
logger: params.Logger,
metricsFactory: params.MetricsFactory,
spanWriter: params.SpanWriter,
traceWriter: params.TraceWriter,
samplingProvider: params.SamplingProvider,
samplingAggregator: params.SamplingAggregator,
hCheck: params.HealthCheck,
Expand All @@ -81,7 +81,7 @@ func New(params *CollectorParams) *Collector {
// Start the component and underlying dependencies
func (c *Collector) Start(options *flags.CollectorOptions) error {
handlerBuilder := &SpanHandlerBuilder{
SpanWriter: c.spanWriter,
TraceWriter: c.traceWriter,
CollectorOpts: options,
Logger: c.logger,
MetricsFactory: c.metricsFactory,
Expand Down
9 changes: 5 additions & 4 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/healthcheck"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

var _ (io.Closer) = (*Collector)(nil)
Expand Down Expand Up @@ -116,7 +117,7 @@ func TestNewCollector(t *testing.T) {
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
Expand All @@ -143,7 +144,7 @@ func TestCollector_StartErrors(t *testing.T) {
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
Expand Down Expand Up @@ -201,7 +202,7 @@ func TestCollector_PublishOpts(t *testing.T) {
ServiceName: "collector",
Logger: logger,
MetricsFactory: metricsFactory,
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
SamplingProvider: samplingProvider,
HealthCheck: hc,
TenancyMgr: tm,
Expand Down Expand Up @@ -232,7 +233,7 @@ func TestAggregator(t *testing.T) {
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
SamplingProvider: samplingProvider,
HealthCheck: hc,
SamplingAggregator: agg,
Expand Down
30 changes: 24 additions & 6 deletions cmd/collector/app/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ import (
"strings"
"sync"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/normalizer"
"github.com/jaegertracing/jaeger/pkg/otelsemconv"
)

const (
Expand All @@ -24,10 +28,10 @@ const (
// samplerTypeKey is the name of the metric tag showing sampler type
samplerTypeKey = "sampler_type"

// // types of samplers: const, probabilistic, ratelimiting, lowerbound
// numOfSamplerTypes = 4

concatenation = "$_$"

// unknownServiceName is used when a span has no service name
unknownServiceName = "__unknown"
)

var otherServicesSamplers map[model.SamplerType]string = initOtherServicesSamplers()
Expand Down Expand Up @@ -224,12 +228,12 @@ func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat processor.SpanForma
return t
}

// reportServiceNameForSpan determines the name of the service that emitted
// ForSpanV1 determines the name of the service that emitted
// the span and reports a counter stat.
func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
func (m metricsBySvc) ForSpanV1(span *model.Span) {
var serviceName string
if nil == span.Process || len(span.Process.ServiceName) == 0 {
serviceName = "__unknown"
serviceName = unknownServiceName
} else {
serviceName = span.Process.ServiceName
}
Expand All @@ -241,6 +245,20 @@ func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) {
}
}

// ForSpanV2 determines the name of the service that emitted
// the span and reports a counter stat.
func (m metricsBySvc) ForSpanV2(resource pcommon.Resource, span ptrace.Span) {
serviceName := unknownServiceName
if v, ok := resource.Attributes().Get(string(otelsemconv.ServiceNameKey)); ok {
serviceName = v.AsString()
}

m.countSpansByServiceName(serviceName, false)
if span.ParentSpanID().IsEmpty() {
m.countTracesByServiceName(serviceName, false, model.SamplerTypeUnrecognized)
}
}

// countSpansByServiceName counts how many spans are received per service.
func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) {
m.spans.countByServiceName(serviceName, isDebug)
Expand Down
23 changes: 17 additions & 6 deletions cmd/collector/app/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"go.opentelemetry.io/collector/pdata/ptrace"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/internal/metricstest"
Expand All @@ -29,24 +30,34 @@ func TestProcessorMetrics(t *testing.T) {

grpcChannelFormat := spm.GetCountsForFormat(processor.JaegerSpanFormat, processor.GRPCTransport)
assert.NotNil(t, grpcChannelFormat)
grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{
grpcChannelFormat.ReceivedBySvc.ForSpanV1(&model.Span{
Process: &model.Process{},
})
mSpan := model.Span{
Process: &model.Process{
ServiceName: "fry",
},
}
grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan)
mSpan.Flags.SetDebug()
grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan)
mSpan.ReplaceParentID(1234)
grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan)
grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan)

pd := ptrace.NewTraces()
rs := pd.ResourceSpans().AppendEmpty()
resource := rs.Resource()
resource.Attributes().PutStr("service.name", "fry")
sp := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty()
grpcChannelFormat.ReceivedBySvc.ForSpanV2(resource, sp)
sp.SetParentSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8})
grpcChannelFormat.ReceivedBySvc.ForSpanV2(resource, sp)

counters, gauges := baseMetrics.Backend.Snapshot()

assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=grpc"])
assert.EqualValues(t, 3, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=grpc"])
assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=grpc"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"])
assert.EqualValues(t, 2, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"])
assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"])
assert.Empty(t, gauges)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ import (
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/storage/spanstore"
"github.com/jaegertracing/jaeger/storage_v2/tracestore"
)

// SpanHandlerBuilder holds configuration required for handlers
type SpanHandlerBuilder struct {
SpanWriter spanstore.Writer
TraceWriter tracestore.Writer
CollectorOpts *flags.CollectorOptions
Logger *zap.Logger
MetricsFactory metrics.Factory
Expand All @@ -42,7 +42,7 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) (proc
hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}})

return NewSpanProcessor(
b.SpanWriter,
b.TraceWriter,
additional,
Options.ServiceMetrics(svcMetrics),
Options.HostMetrics(hostMetrics),
Expand Down
5 changes: 3 additions & 2 deletions cmd/collector/app/span_handler_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/jaegertracing/jaeger/pkg/metrics"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/memory"
"github.com/jaegertracing/jaeger/storage_v2/v1adapter"
)

func TestNewSpanHandlerBuilder(t *testing.T) {
Expand All @@ -29,15 +30,15 @@ func TestNewSpanHandlerBuilder(t *testing.T) {
spanWriter := memory.NewStore()

builder := &SpanHandlerBuilder{
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
CollectorOpts: cOpts,
TenancyMgr: &tenancy.Manager{},
}
assert.NotNil(t, builder.logger())
assert.NotNil(t, builder.metricsFactory())

builder = &SpanHandlerBuilder{
SpanWriter: spanWriter,
TraceWriter: v1adapter.NewTraceWriter(spanWriter),
CollectorOpts: cOpts,
Logger: zap.NewNop(),
MetricsFactory: metrics.NullFactory,
Expand Down
Loading

0 comments on commit f6dc762

Please # to comment.