Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[ingester/fix] Apply sanitizers to avoid panic on span.process=nil #3819

Merged
merged 13 commits into from
Jul 26, 2022
Merged
6 changes: 5 additions & 1 deletion cmd/ingester/app/processor/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"io"

"github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer"
"github.com/jaegertracing/jaeger/plugin/storage/kafka"
"github.com/jaegertracing/jaeger/storage/spanstore"
)
Expand All @@ -45,6 +46,7 @@ type SpanProcessorParams struct {
// KafkaSpanProcessor implements SpanProcessor for Kafka messages
type KafkaSpanProcessor struct {
unmarshaller kafka.Unmarshaller
sanitizer sanitizer.SanitizeSpan
writer spanstore.Writer
io.Closer
}
Expand All @@ -54,6 +56,7 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor {
return &KafkaSpanProcessor{
unmarshaller: params.Unmarshaller,
writer: params.Writer,
sanitizer: sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...),
}
}

Expand All @@ -63,6 +66,7 @@ func (s KafkaSpanProcessor) Process(message Message) error {
if err != nil {
return fmt.Errorf("cannot unmarshall byte array into span: %w", err)
}

// TODO context should be propagated from upstream components
return s.writer.WriteSpan(context.TODO(), span)
return s.writer.WriteSpan(context.TODO(), s.sanitizer(span))
}
30 changes: 19 additions & 11 deletions cmd/ingester/app/processor/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"

cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -33,25 +34,32 @@ func TestNewSpanProcessor(t *testing.T) {
}

func TestSpanProcessor_Process(t *testing.T) {
writer := &smocks.Writer{}
unmarshallerMock := &umocks.Unmarshaller{}
processor := &KafkaSpanProcessor{
unmarshaller: unmarshallerMock,
writer: writer,
}
mockUnmarshaller := &umocks.Unmarshaller{}
mockWriter := &smocks.Writer{}
processor := NewSpanProcessor(SpanProcessorParams{
Unmarshaller: mockUnmarshaller,
Writer: mockWriter,
})

message := &cmocks.Message{}
data := []byte("police")
span := &model.Span{}
data := []byte("irrelevant, mock unmarshaller should return the span")
span := &model.Span{
Process: nil, // we want to make sure sanitizers will fix this data issue.
}

message.On("Value").Return(data)
unmarshallerMock.On("Unmarshal", data).Return(span, nil)
writer.On("WriteSpan", context.Background(), span).Return(nil)
mockUnmarshaller.On("Unmarshal", data).Return(span, nil)
mockWriter.On("WriteSpan", context.Background(), span).
Return(nil).
Run(func(args mock.Arguments) {
span := args[1].(*model.Span)
assert.NotNil(t, span.Process, "sanitizer must fix Process=nil data issue")
})

assert.Nil(t, processor.Process(message))

message.AssertExpectations(t)
writer.AssertExpectations(t)
mockWriter.AssertExpectations(t)
}

func TestSpanProcessor_ProcessError(t *testing.T) {
Expand Down