diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index c0d0c43c119c..7bf7879b1456 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -157,7 +157,7 @@ by default uses only in-memory database.`, ServiceName: "jaeger-collector", Logger: logger, MetricsFactory: collectorMetricsFactory, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, SamplingAggregator: samplingAggregator, HealthCheck: svc.HC(), diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index a443f7e115e9..f7e783e06aa3 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -23,7 +23,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" - "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) const ( @@ -37,7 +37,7 @@ type Collector struct { serviceName string logger *zap.Logger metricsFactory metrics.Factory - spanWriter spanstore.Writer + traceWriter tracestore.Writer samplingProvider samplingstrategy.Provider samplingAggregator samplingstrategy.Aggregator hCheck *healthcheck.HealthCheck @@ -57,7 +57,7 @@ type CollectorParams struct { ServiceName string Logger *zap.Logger MetricsFactory metrics.Factory - SpanWriter spanstore.Writer + TraceWriter tracestore.Writer SamplingProvider samplingstrategy.Provider SamplingAggregator samplingstrategy.Aggregator HealthCheck *healthcheck.HealthCheck @@ -70,7 +70,7 @@ func New(params *CollectorParams) *Collector { serviceName: params.ServiceName, logger: params.Logger, metricsFactory: params.MetricsFactory, - spanWriter: params.SpanWriter, + traceWriter: params.TraceWriter, samplingProvider: params.SamplingProvider, samplingAggregator: params.SamplingAggregator, hCheck: params.HealthCheck, @@ -81,7 +81,7 @@ func New(params *CollectorParams) *Collector { // Start the component and underlying dependencies func (c *Collector) Start(options *flags.CollectorOptions) error { handlerBuilder := &SpanHandlerBuilder{ - SpanWriter: c.spanWriter, + TraceWriter: c.traceWriter, CollectorOpts: options, Logger: c.logger, MetricsFactory: c.metricsFactory, diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 4e42109a2e01..d83f322b9542 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -26,6 +26,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) var _ (io.Closer) = (*Collector)(nil) @@ -116,7 +117,7 @@ func TestNewCollector(t *testing.T) { ServiceName: "collector", Logger: logger, MetricsFactory: baseMetrics, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, HealthCheck: hc, TenancyMgr: tm, @@ -143,7 +144,7 @@ func TestCollector_StartErrors(t *testing.T) { ServiceName: "collector", Logger: logger, MetricsFactory: baseMetrics, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, HealthCheck: hc, TenancyMgr: tm, @@ -201,7 +202,7 @@ func TestCollector_PublishOpts(t *testing.T) { ServiceName: "collector", Logger: logger, MetricsFactory: metricsFactory, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, HealthCheck: hc, TenancyMgr: tm, @@ -232,7 +233,7 @@ func TestAggregator(t *testing.T) { ServiceName: "collector", Logger: logger, MetricsFactory: baseMetrics, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, HealthCheck: hc, SamplingAggregator: agg, diff --git a/cmd/collector/app/metrics.go b/cmd/collector/app/metrics.go index 455a577aa0ab..85b7912d3f4a 100644 --- a/cmd/collector/app/metrics.go +++ b/cmd/collector/app/metrics.go @@ -8,10 +8,14 @@ import ( "strings" "sync" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/ptrace" + "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/normalizer" + "github.com/jaegertracing/jaeger/pkg/otelsemconv" ) const ( @@ -24,10 +28,10 @@ const ( // samplerTypeKey is the name of the metric tag showing sampler type samplerTypeKey = "sampler_type" - // // types of samplers: const, probabilistic, ratelimiting, lowerbound - // numOfSamplerTypes = 4 - concatenation = "$_$" + + // unknownServiceName is used when a span has no service name + unknownServiceName = "__unknown" ) var otherServicesSamplers map[model.SamplerType]string = initOtherServicesSamplers() @@ -224,12 +228,12 @@ func (m *SpanProcessorMetrics) GetCountsForFormat(spanFormat processor.SpanForma return t } -// reportServiceNameForSpan determines the name of the service that emitted +// ForSpanV1 determines the name of the service that emitted // the span and reports a counter stat. -func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) { +func (m metricsBySvc) ForSpanV1(span *model.Span) { var serviceName string if nil == span.Process || len(span.Process.ServiceName) == 0 { - serviceName = "__unknown" + serviceName = unknownServiceName } else { serviceName = span.Process.ServiceName } @@ -241,6 +245,20 @@ func (m metricsBySvc) ReportServiceNameForSpan(span *model.Span) { } } +// ForSpanV2 determines the name of the service that emitted +// the span and reports a counter stat. +func (m metricsBySvc) ForSpanV2(resource pcommon.Resource, span ptrace.Span) { + serviceName := unknownServiceName + if v, ok := resource.Attributes().Get(string(otelsemconv.ServiceNameKey)); ok { + serviceName = v.AsString() + } + + m.countSpansByServiceName(serviceName, false) + if span.ParentSpanID().IsEmpty() { + m.countTracesByServiceName(serviceName, false, model.SamplerTypeUnrecognized) + } +} + // countSpansByServiceName counts how many spans are received per service. func (m metricsBySvc) countSpansByServiceName(serviceName string, isDebug bool) { m.spans.countByServiceName(serviceName, isDebug) diff --git a/cmd/collector/app/metrics_test.go b/cmd/collector/app/metrics_test.go index d0d23a5a7f2d..12c16dc76c34 100644 --- a/cmd/collector/app/metrics_test.go +++ b/cmd/collector/app/metrics_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/internal/metricstest" @@ -29,7 +30,7 @@ func TestProcessorMetrics(t *testing.T) { grpcChannelFormat := spm.GetCountsForFormat(processor.JaegerSpanFormat, processor.GRPCTransport) assert.NotNil(t, grpcChannelFormat) - grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&model.Span{ + grpcChannelFormat.ReceivedBySvc.ForSpanV1(&model.Span{ Process: &model.Process{}, }) mSpan := model.Span{ @@ -37,16 +38,26 @@ func TestProcessorMetrics(t *testing.T) { ServiceName: "fry", }, } - grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan) mSpan.Flags.SetDebug() - grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan) mSpan.ReplaceParentID(1234) - grpcChannelFormat.ReceivedBySvc.ReportServiceNameForSpan(&mSpan) + grpcChannelFormat.ReceivedBySvc.ForSpanV1(&mSpan) + + pd := ptrace.NewTraces() + rs := pd.ResourceSpans().AppendEmpty() + resource := rs.Resource() + resource.Attributes().PutStr("service.name", "fry") + sp := rs.ScopeSpans().AppendEmpty().Spans().AppendEmpty() + grpcChannelFormat.ReceivedBySvc.ForSpanV2(resource, sp) + sp.SetParentSpanID([8]byte{1, 2, 3, 4, 5, 6, 7, 8}) + grpcChannelFormat.ReceivedBySvc.ForSpanV2(resource, sp) + counters, gauges := baseMetrics.Backend.Snapshot() - assert.EqualValues(t, 1, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=grpc"]) + assert.EqualValues(t, 3, counters["service.spans.received|debug=false|format=jaeger|svc=fry|transport=grpc"]) assert.EqualValues(t, 2, counters["service.spans.received|debug=true|format=jaeger|svc=fry|transport=grpc"]) - assert.EqualValues(t, 1, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"]) + assert.EqualValues(t, 2, counters["service.traces.received|debug=false|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"]) assert.EqualValues(t, 1, counters["service.traces.received|debug=true|format=jaeger|sampler_type=unrecognized|svc=fry|transport=grpc"]) assert.Empty(t, gauges) } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index e7d690288aa6..9bcb9fc7d260 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -16,12 +16,12 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" - "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) // SpanHandlerBuilder holds configuration required for handlers type SpanHandlerBuilder struct { - SpanWriter spanstore.Writer + TraceWriter tracestore.Writer CollectorOpts *flags.CollectorOptions Logger *zap.Logger MetricsFactory metrics.Factory @@ -42,7 +42,7 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) (proc hostMetrics := svcMetrics.Namespace(metrics.NSOptions{Tags: map[string]string{"host": hostname}}) return NewSpanProcessor( - b.SpanWriter, + b.TraceWriter, additional, Options.ServiceMetrics(svcMetrics), Options.HostMetrics(hostMetrics), diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index 70cdc08fd870..4cf0e1c4af01 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -17,6 +17,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/memory" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) func TestNewSpanHandlerBuilder(t *testing.T) { @@ -29,7 +30,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { spanWriter := memory.NewStore() builder := &SpanHandlerBuilder{ - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), CollectorOpts: cOpts, TenancyMgr: &tenancy.Manager{}, } @@ -37,7 +38,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { assert.NotNil(t, builder.metricsFactory()) builder = &SpanHandlerBuilder{ - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), CollectorOpts: cOpts, Logger: zap.NewNop(), MetricsFactory: metrics.NullFactory, diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index fcf0e0830d2a..1f529bf85bec 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -6,20 +6,26 @@ package app import ( "context" + "fmt" "sync" "sync/atomic" "time" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/ptrace" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" + sanitizerv2 "github.com/jaegertracing/jaeger/cmd/jaeger/sanitizer" "github.com/jaegertracing/jaeger/internal/jptrace" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/queue" + "github.com/jaegertracing/jaeger/pkg/telemetry" "github.com/jaegertracing/jaeger/pkg/tenancy" - "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const ( @@ -32,14 +38,16 @@ const ( type spanProcessor struct { queue *queue.BoundedQueue[queueItem] + otelExporter exporter.Traces queueResizeMu sync.Mutex metrics *SpanProcessorMetrics + telset telemetry.Settings preProcessSpans ProcessSpans filterSpan FilterSpan // filter is called before the sanitizer but after preProcessSpans sanitizer sanitizer.SanitizeSpan // sanitizer is called before processSpan processSpan ProcessSpan logger *zap.Logger - spanWriter spanstore.Writer + traceWriter tracestore.Writer reportBusy bool numWorkers int collectorTags map[string]string @@ -58,19 +66,24 @@ type queueItem struct { // NewSpanProcessor returns a SpanProcessor that preProcesses, filters, queues, sanitizes, and processes spans. func NewSpanProcessor( - spanWriter spanstore.Writer, + traceWriter tracestore.Writer, additional []ProcessSpan, opts ...Option, ) (processor.SpanProcessor, error) { - sp, err := newSpanProcessor(spanWriter, additional, opts...) + sp, err := newSpanProcessor(traceWriter, additional, opts...) if err != nil { - return nil, err + return nil, fmt.Errorf("could not create span processor: %w", err) } sp.queue.StartConsumers(sp.numWorkers, func(item queueItem) { sp.processItemFromQueue(item) }) + err = sp.otelExporter.Start(context.Background(), sp.telset.Host) + if err != nil { + return nil, fmt.Errorf("could not start exporter: %w", err) + } + sp.background(1*time.Second, sp.updateGauges) if sp.dynQueueSizeMemory > 0 { @@ -80,7 +93,7 @@ func NewSpanProcessor( return sp, nil } -func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opts ...Option) (*spanProcessor, error) { +func newSpanProcessor(traceWriter tracestore.Writer, additional []ProcessSpan, opts ...Option) (*spanProcessor, error) { options := Options.apply(opts...) handlerMetrics := NewSpanProcessorMetrics( options.serviceMetrics, @@ -102,13 +115,14 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt sp := spanProcessor{ queue: boundedQueue, metrics: handlerMetrics, + telset: telemetry.NoopSettings(), // TODO get real settings logger: options.logger, preProcessSpans: options.preProcessSpans, filterSpan: options.spanFilter, sanitizer: sanitizer.NewChainedSanitizer(sanitizers...), reportBusy: options.reportBusy, numWorkers: options.numWorkers, - spanWriter: spanWriter, + traceWriter: traceWriter, collectorTags: options.collectorTags, stopCh: make(chan struct{}), dynQueueSizeMemory: options.dynQueueSizeMemory, @@ -122,27 +136,75 @@ func newSpanProcessor(spanWriter spanstore.Writer, additional []ProcessSpan, opt zap.Uint("queue-size-warmup", options.dynQueueSizeWarmup)) } if options.dynQueueSizeMemory > 0 || options.spanSizeMetricsEnabled { - // add to processSpanFuncs - processSpanFuncs = append(processSpanFuncs, sp.countSpan) + processSpanFuncs = append(processSpanFuncs, sp.countSpansInQueue) } + sp.processSpan = ChainedProcessSpan(append(processSpanFuncs, additional...)...) + + otelExporter, err := exporterhelper.NewTraces( + context.Background(), + exporter.Settings{ + TelemetrySettings: sp.telset.ToOtelComponent(), + }, + struct{}{}, // exporterhelper requires not-nil config, but then ignores it + sp.pushTraces, + exporterhelper.WithQueue(exporterhelper.NewDefaultQueueConfig()), + // exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}), + // exporterhelper.WithTimeout(oCfg.TimeoutConfig), + // exporterhelper.WithRetry(oCfg.RetryConfig), + // exporterhelper.WithBatcher(oCfg.BatcherConfig), + // exporterhelper.WithStart(oce.start), + // exporterhelper.WithShutdown(oce.shutdown), + ) + if err != nil { + return nil, fmt.Errorf("could not create exporterhelper: %w", err) + } + sp.otelExporter = otelExporter - processSpanFuncs = append(processSpanFuncs, additional...) - - sp.processSpan = ChainedProcessSpan(processSpanFuncs...) return &sp, nil } func (sp *spanProcessor) Close() error { close(sp.stopCh) sp.queue.Stop() - + sp.otelExporter.Shutdown(context.Background()) return nil } +// pushTraces is called by exporterhelper's concurrent queue consumers. +func (sp *spanProcessor) pushTraces(ctx context.Context, td ptrace.Traces) error { + td = sanitizerv2.Sanitize(td) + + if len(sp.collectorTags) > 0 { + for i := 0; i < td.ResourceSpans().Len(); i++ { + resource := td.ResourceSpans().At(i).Resource() + for k, v := range sp.collectorTags { + if _, ok := resource.Attributes().Get(k); ok { + continue // don't override existing keys + } + resource.Attributes().PutStr(k, v) + } + } + } + + err := sp.traceWriter.WriteTraces(ctx, td) + + sp.metrics.BatchSize.Update(int64(td.SpanCount())) + jptrace.SpanIter(td)(func(i jptrace.SpanIterPos, span ptrace.Span) bool { + if err != nil { + sp.metrics.SavedErrBySvc.ForSpanV2(i.Resource.Resource(), span) + } else { + sp.metrics.SavedOkBySvc.ForSpanV2(i.Resource.Resource(), span) + } + return true + }) + + return err +} + func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) { - if nil == span.Process { + if span.Process == nil { sp.logger.Error("process is empty for the span") - sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) + sp.metrics.SavedErrBySvc.ForSpanV1(span) return } @@ -151,31 +213,60 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) { // the inbound Context, as it may be cancelled by the time we reach this point, // so we need to start a new Context. ctx := tenancy.WithTenant(context.Background(), tenant) - if err := sp.spanWriter.WriteSpan(ctx, span); err != nil { + if err := sp.writeSpan(ctx, span); err != nil { sp.logger.Error("Failed to save span", zap.Error(err)) - sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) + sp.metrics.SavedErrBySvc.ForSpanV1(span) } else { sp.logger.Debug("Span written to the storage by the collector", zap.Stringer("trace-id", span.TraceID), zap.Stringer("span-id", span.SpanID)) - sp.metrics.SavedOkBySvc.ReportServiceNameForSpan(span) + sp.metrics.SavedOkBySvc.ForSpanV1(span) } sp.metrics.SaveLatency.Record(time.Since(startTime)) } -func (sp *spanProcessor) countSpan(span *model.Span, _ string /* tenant */) { +func (sp *spanProcessor) writeSpan(ctx context.Context, span *model.Span) error { + spanWriter, err := v1adapter.GetV1Writer(sp.traceWriter) + if err == nil { + return spanWriter.WriteSpan(ctx, span) + } + traces := v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}}) + return sp.traceWriter.WriteTraces(ctx, traces) +} + +func (sp *spanProcessor) countSpansInQueue(span *model.Span, _ string /* tenant */) { //nolint: gosec // G115 sp.bytesProcessed.Add(uint64(span.Size())) sp.spansProcessed.Add(1) } +// TODO pass Context func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { + // We call preProcessSpans on a batch, it's responsibility of implementation + // to understand v1/v2 distinction. Jaeger itself does not use pre-processors. sp.preProcessSpans(batch) - var spans []*model.Span - batch.GetSpans(func(spansV1 []*model.Span) { - spans = spansV1 - }, func(_ ptrace.Traces) { - panic("not implemented") + + var batchOks []bool + var batchErr error + batch.GetSpans(func(spans []*model.Span) { + batchOks, batchErr = sp.processSpans(batch, spans) + }, func(traces ptrace.Traces) { + // TODO verify if the context will survive all the way to the consumer threads. + ctx := tenancy.WithTenant(context.Background(), batch.GetTenant()) + + // the exporter will eventually call pushTraces from consumer threads. + if err := sp.otelExporter.ConsumeTraces(ctx, traces); err != nil { + batchErr = err + } else { + batchOks = make([]bool, traces.SpanCount()) + for i := range batchOks { + batchOks[i] = true + } + } }) + return batchOks, batchErr +} + +func (sp *spanProcessor) processSpans(batch processor.Batch, spans []*model.Span) ([]bool, error) { sp.metrics.BatchSize.Update(int64(len(spans))) retMe := make([]bool, len(spans)) @@ -189,7 +280,6 @@ func (sp *spanProcessor) ProcessSpans(batch processor.Batch) ([]bool, error) { } for i, mSpan := range spans { - // TODO does this have to be one span at a time? ok := sp.enqueueSpan(mSpan, batch.GetSpanFormat(), batch.GetInboundTransport(), batch.GetTenant()) if !ok && sp.reportBusy { return nil, processor.ErrBusy @@ -230,10 +320,10 @@ func (sp *spanProcessor) addCollectorTags(span *model.Span) { // in this function as it may cause race conditions. func (sp *spanProcessor) enqueueSpan(span *model.Span, originalFormat processor.SpanFormat, transport processor.InboundTransport, tenant string) bool { spanCounts := sp.metrics.GetCountsForFormat(originalFormat, transport) - spanCounts.ReceivedBySvc.ReportServiceNameForSpan(span) + spanCounts.ReceivedBySvc.ForSpanV1(span) if !sp.filterSpan(span) { - spanCounts.RejectedBySvc.ReportServiceNameForSpan(span) + spanCounts.RejectedBySvc.ForSpanV1(span) return true // as in "not dropped", because it's actively rejected } diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 87f991b6b071..9e0dcff69462 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -5,16 +5,19 @@ package app import ( + "bytes" "context" "errors" "fmt" "io" "reflect" + "slices" "sync" "sync/atomic" "testing" "time" + "github.com/gogo/protobuf/jsonpb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -27,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/metrics" "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -78,7 +82,7 @@ func TestBySvcMetrics(t *testing.T) { serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) hostMetrics := mb.Namespace(metrics.NSOptions{Name: "host", Tags: nil}) sp, err := newSpanProcessor( - &fakeSpanWriter{}, + v1adapter.NewTraceWriter(&fakeSpanWriter{}), nil, Options.ServiceMetrics(serviceMetrics), Options.HostMetrics(hostMetrics), @@ -167,6 +171,7 @@ func isSpanAllowed(span *model.Span) bool { } type fakeSpanWriter struct { + t *testing.T spansLock sync.Mutex spans []*model.Span err error @@ -174,6 +179,9 @@ type fakeSpanWriter struct { } func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error { + if n.t != nil { + n.t.Logf("Capturing span %+v", span) + } n.spansLock.Lock() defer n.spansLock.Unlock() n.spans = append(n.spans, span) @@ -236,7 +244,7 @@ func makeJaegerSpan(service string, rootSpan bool, debugEnabled bool) (*jaeger.S func TestSpanProcessor(t *testing.T) { w := &fakeSpanWriter{} - p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + p, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1)) require.NoError(t, err) res, err := p.ProcessSpans( @@ -262,7 +270,8 @@ func TestSpanProcessorErrors(t *testing.T) { mb := metricstest.NewFactory(time.Hour) defer mb.Backend.Stop() serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) - pp, err := NewSpanProcessor(w, + pp, err := NewSpanProcessor( + v1adapter.NewTraceWriter(w), nil, Options.Logger(logger), Options.ServiceMetrics(serviceMetrics), @@ -315,7 +324,8 @@ func (w *blockingWriter) WriteSpan(context.Context, *model.Span) error { func TestSpanProcessorBusy(t *testing.T) { w := &blockingWriter{} - pp, err := NewSpanProcessor(w, + pp, err := NewSpanProcessor( + v1adapter.NewTraceWriter(w), nil, Options.NumWorkers(1), Options.QueueSize(1), @@ -363,7 +373,7 @@ func TestSpanProcessorWithNilProcess(t *testing.T) { serviceMetrics := mb.Namespace(metrics.NSOptions{Name: "service", Tags: nil}) w := &fakeSpanWriter{} - pp, err := NewSpanProcessor(w, nil, Options.ServiceMetrics(serviceMetrics)) + pp, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.ServiceMetrics(serviceMetrics)) require.NoError(t, err) p := pp.(*spanProcessor) defer require.NoError(t, p.Close()) @@ -377,54 +387,84 @@ func TestSpanProcessorWithNilProcess(t *testing.T) { } func TestSpanProcessorWithCollectorTags(t *testing.T) { - testCollectorTags := map[string]string{ - "extra": "tag", - "env": "prod", - "node": "172.22.18.161", - } + for _, modelVersion := range []string{"v1", "v2"} { + t.Run(modelVersion, func(t *testing.T) { + testCollectorTags := map[string]string{ + "extra": "tag", + "env": "prod", + "node": "172.22.18.161", + } - w := &fakeSpanWriter{} + w := &fakeSpanWriter{} - pp, err := NewSpanProcessor(w, nil, Options.CollectorTags(testCollectorTags)) - require.NoError(t, err) - p := pp.(*spanProcessor) + pp, err := NewSpanProcessor( + v1adapter.NewTraceWriter(w), + nil, + Options.CollectorTags(testCollectorTags), + Options.NumWorkers(1), + Options.QueueSize(1), + ) + require.NoError(t, err) + p := pp.(*spanProcessor) + t.Cleanup(func() { + require.NoError(t, p.Close()) + }) - defer require.NoError(t, p.Close()) - span := &model.Span{ - Process: model.NewProcess("unit-test-service", []model.KeyValue{ - { - Key: "env", - VStr: "prod", - }, - { - Key: "node", - VStr: "k8s-test-node-01", - }, - }), - } - p.addCollectorTags(span) - expected := &model.Span{ - Process: model.NewProcess("unit-test-service", []model.KeyValue{ - { - Key: "env", - VStr: "prod", - }, - { - Key: "extra", - VStr: "tag", - }, - { - Key: "node", - VStr: "172.22.18.161", - }, - { - Key: "node", - VStr: "k8s-test-node-01", - }, - }), - } + span := &model.Span{ + Process: model.NewProcess("unit-test-service", []model.KeyValue{ + model.String("env", "prod"), + model.String("node", "k8s-test-node-01"), + }), + } - assert.Equal(t, expected.Process, span.Process) + var batch processor.Batch + if modelVersion == "v2" { + batch = processor.SpansV2{ + Traces: v1adapter.V1BatchesToTraces([]*model.Batch{{Spans: []*model.Span{span}}}), + } + } else { + batch = processor.SpansV1{ + Spans: []*model.Span{span}, + } + } + _, err = p.ProcessSpans(batch) + require.NoError(t, err) + + require.Eventually(t, func() bool { + w.spansLock.Lock() + defer w.spansLock.Unlock() + return len(w.spans) > 0 + }, time.Second, time.Millisecond) + + w.spansLock.Lock() + defer w.spansLock.Unlock() + span = w.spans[0] + + expected := &model.Span{ + Process: model.NewProcess("unit-test-service", []model.KeyValue{ + model.String("env", "prod"), + model.String("extra", "tag"), + model.String("node", "172.22.18.161"), + model.String("node", "k8s-test-node-01"), + }), + } + if modelVersion == "v2" { + // ptrace.Resource.Attributes do not allow duplicate keys, + // so we only add non-conflicting tags, meaning the node IP + // tag from the collectorTags will not be added. + expected.Process.Tags = slices.Delete(expected.Process.Tags, 2, 3) + typedTags := model.KeyValues(span.Process.Tags) + typedTags.Sort() + } + + m := &jsonpb.Marshaler{Indent: " "} + jsonActual := new(bytes.Buffer) + m.Marshal(jsonActual, span.Process) + jsonExpected := new(bytes.Buffer) + m.Marshal(jsonExpected, expected.Process) + assert.Equal(t, jsonExpected.String(), jsonActual.String()) + }) + } } func TestSpanProcessorCountSpan(t *testing.T) { @@ -476,7 +516,7 @@ func TestSpanProcessorCountSpan(t *testing.T) { } else { opts = append(opts, Options.DynQueueSizeMemory(0)) } - pp, err := NewSpanProcessor(w, nil, opts...) + pp, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, opts...) require.NoError(t, err) p := pp.(*spanProcessor) defer func() { @@ -591,7 +631,7 @@ func TestUpdateDynQueueSize(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { w := &fakeSpanWriter{} - p, err := newSpanProcessor(w, nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes)) + p, err := newSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(tt.initialCapacity), Options.DynQueueSizeWarmup(tt.warmup), Options.DynQueueSizeMemory(tt.sizeInBytes)) require.NoError(t, err) assert.EqualValues(t, tt.initialCapacity, p.queue.Capacity()) @@ -606,7 +646,7 @@ func TestUpdateDynQueueSize(t *testing.T) { func TestUpdateQueueSizeNoActivityYet(t *testing.T) { w := &fakeSpanWriter{} - p, err := newSpanProcessor(w, nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1)) + p, err := newSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1), Options.DynQueueSizeWarmup(1), Options.DynQueueSizeMemory(1)) require.NoError(t, err) assert.NotPanics(t, p.updateQueueSize) } @@ -614,7 +654,8 @@ func TestUpdateQueueSizeNoActivityYet(t *testing.T) { func TestStartDynQueueSizeUpdater(t *testing.T) { w := &fakeSpanWriter{} oneGiB := uint(1024 * 1024 * 1024) - p, err := newSpanProcessor(w, nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB)) + + p, err := newSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(100), Options.DynQueueSizeWarmup(1000), Options.DynQueueSizeMemory(oneGiB)) require.NoError(t, err) assert.EqualValues(t, 100, p.queue.Capacity()) @@ -641,7 +682,7 @@ func TestAdditionalProcessors(t *testing.T) { w := &fakeSpanWriter{} // nil doesn't fail - p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + p, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1)) require.NoError(t, err) res, err := p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ @@ -664,7 +705,7 @@ func TestAdditionalProcessors(t *testing.T) { f := func(_ *model.Span, _ string) { count++ } - p, err = NewSpanProcessor(w, []ProcessSpan{f}, Options.QueueSize(1)) + p, err = NewSpanProcessor(v1adapter.NewTraceWriter(w), []ProcessSpan{f}, Options.QueueSize(1)) require.NoError(t, err) res, err = p.ProcessSpans(processor.SpansV1{ Spans: []*model.Span{ @@ -686,7 +727,7 @@ func TestAdditionalProcessors(t *testing.T) { func TestSpanProcessorContextPropagation(t *testing.T) { w := &fakeSpanWriter{} - p, err := NewSpanProcessor(w, nil, Options.QueueSize(1)) + p, err := NewSpanProcessor(v1adapter.NewTraceWriter(w), nil, Options.QueueSize(1)) require.NoError(t, err) dummyTenant := "context-prop-test-tenant" @@ -720,7 +761,8 @@ func TestSpanProcessorWithOnDroppedSpanOption(t *testing.T) { } w := &blockingWriter{} - pp, err := NewSpanProcessor(w, + pp, err := NewSpanProcessor( + v1adapter.NewTraceWriter(w), nil, Options.NumWorkers(1), Options.QueueSize(1), diff --git a/cmd/collector/main.go b/cmd/collector/main.go index 29303cecdbb1..d1a726cc41e9 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -30,6 +30,7 @@ import ( ss "github.com/jaegertracing/jaeger/plugin/sampling/strategyprovider" "github.com/jaegertracing/jaeger/plugin/storage" "github.com/jaegertracing/jaeger/ports" + "github.com/jaegertracing/jaeger/storage_v2/v1adapter" ) const serviceName = "jaeger-collector" @@ -101,7 +102,7 @@ func main() { ServiceName: serviceName, Logger: logger, MetricsFactory: metricsFactory, - SpanWriter: spanWriter, + TraceWriter: v1adapter.NewTraceWriter(spanWriter), SamplingProvider: samplingProvider, SamplingAggregator: samplingAggregator, HealthCheck: svc.HC(), diff --git a/cmd/jaeger/sanitizer/package_test.go b/cmd/jaeger/sanitizer/package_test.go new file mode 100644 index 000000000000..bae7fe4ba742 --- /dev/null +++ b/cmd/jaeger/sanitizer/package_test.go @@ -0,0 +1,14 @@ +// Copyright (c) 2023 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package sanitizer + +import ( + "testing" + + "github.com/jaegertracing/jaeger/pkg/testutils" +) + +func TestMain(m *testing.M) { + testutils.VerifyGoLeaks(m) +} diff --git a/cmd/jaeger/sanitizer/sanitizer.go b/cmd/jaeger/sanitizer/sanitizer.go new file mode 100644 index 000000000000..09fef4a3d310 --- /dev/null +++ b/cmd/jaeger/sanitizer/sanitizer.go @@ -0,0 +1,11 @@ +// Copyright (c) 2025 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +package sanitizer + +import ( + "github.com/jaegertracing/jaeger/cmd/jaeger/internal/sanitizer" +) + +// Sanitize is a function that applies all sanitizers to the given trace data. +var Sanitize = sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...) diff --git a/storage_v2/v1adapter/writer.go b/storage_v2/v1adapter/writer.go index bd37cad2718c..59b6a1b408c9 100644 --- a/storage_v2/v1adapter/writer.go +++ b/storage_v2/v1adapter/writer.go @@ -10,12 +10,22 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "github.com/jaegertracing/jaeger/storage/spanstore" + "github.com/jaegertracing/jaeger/storage_v2/tracestore" ) +var ErrV1WriterNotAvailable = errors.New("spanstore.Writer is not a wrapper around v1 writer") + type TraceWriter struct { spanWriter spanstore.Writer } +func GetV1Writer(writer tracestore.Writer) (spanstore.Writer, error) { + if tr, ok := writer.(*TraceWriter); ok { + return tr.spanWriter, nil + } + return nil, ErrV1WriterNotAvailable +} + func NewTraceWriter(spanWriter spanstore.Writer) *TraceWriter { return &TraceWriter{ spanWriter: spanWriter, @@ -24,7 +34,7 @@ func NewTraceWriter(spanWriter spanstore.Writer) *TraceWriter { // WriteTraces implements tracestore.Writer. func (t *TraceWriter) WriteTraces(ctx context.Context, td ptrace.Traces) error { - batches := ProtoFromTraces(td) + batches := V1BatchesFromTraces(td) var errs []error for _, batch := range batches { for _, span := range batch.Spans { diff --git a/storage_v2/v1adapter/writer_test.go b/storage_v2/v1adapter/writer_test.go index bc00c7ae0bdd..af9858c667a2 100644 --- a/storage_v2/v1adapter/writer_test.go +++ b/storage_v2/v1adapter/writer_test.go @@ -18,6 +18,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/memory" "github.com/jaegertracing/jaeger/storage/spanstore" spanstoreMocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" + tracestoremocks "github.com/jaegertracing/jaeger/storage_v2/tracestore/mocks" ) func TestWriteTraces(t *testing.T) { @@ -56,6 +57,22 @@ func TestWriteTracesError(t *testing.T) { require.ErrorContains(t, err, "mocked error") } +func TestGetV1Writer_NoError(t *testing.T) { + memstore := memory.NewStore() + traceWriter := &TraceWriter{ + spanWriter: memstore, + } + v1Writer, err := GetV1Writer(traceWriter) + require.NoError(t, err) + require.Equal(t, memstore, v1Writer) +} + +func TestGetV1Writer_Error(t *testing.T) { + w := new(tracestoremocks.Writer) + _, err := GetV1Writer(w) + require.ErrorIs(t, err, ErrV1WriterNotAvailable) +} + func makeTraces() ptrace.Traces { traces := ptrace.NewTraces() rSpans := traces.ResourceSpans().AppendEmpty()