diff --git a/pkg/dispatcher/jetstream_dispatcher.go b/pkg/dispatcher/jetstream_dispatcher.go index 5d08da92a..49d5b736a 100644 --- a/pkg/dispatcher/jetstream_dispatcher.go +++ b/pkg/dispatcher/jetstream_dispatcher.go @@ -25,6 +25,9 @@ import ( "sync/atomic" "time" + "go.opencensus.io/trace" + "knative.dev/eventing-natss/pkg/tracing" + "k8s.io/apimachinery/pkg/types" jsmcloudevents "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2" @@ -43,6 +46,7 @@ import ( const ( // maxJetElements defines a maximum number of outstanding re-connect requests maxJetElements = 10 + jsmChannel = "jsm-channel" ) var ( @@ -146,7 +150,9 @@ func jetmessageReceiverFunc(s *jetSubscriptionsSupervisor) eventingchannels.Unbu s.logger.Error("could not create nats jetstream sender", zap.Error(err)) return errors.Wrap(err, "could not create nats jetstream sender") } - if err := sender.Send(ctx, message); err != nil { + + tpTsTransformers := tracing.SerializeTraceTransformers(trace.FromContext(ctx).SpanContext()) + if err := sender.Send(ctx, message, tpTsTransformers...); err != nil { errMsg := "error during send" if err.Error() == stan.ErrConnectionClosed.Error() { errMsg += " - connection to NATSS has been lost, attempting to reconnect" @@ -316,7 +322,20 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String())) } - executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, nil, destination, reply, deadLetter) + event := tracing.ConvertNatsMsgToEvent(s.logger, stanMsg) + additionalHeaders := tracing.ConvertEventToHttpHeader(event) + + sc, ok := tracing.ParseSpanContext(event) + var span *trace.Span + if !ok { + s.logger.Warn("Cannot parse the spancontext, creating a new span") + ctx, span = trace.StartSpan(ctx, jsmChannel+"-"+channel.Name) + } else { + ctx, span = trace.StartSpanWithRemoteParent(ctx, jsmChannel+"-"+channel.Name, sc) + } + defer span.End() + + executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter) if err != nil { s.logger.Error("Failed to dispatch message: ", zap.Error(err)) return diff --git a/pkg/dispatcher/natss_dispatcher.go b/pkg/dispatcher/natss_dispatcher.go index 952eaa8c0..e5acd1e82 100644 --- a/pkg/dispatcher/natss_dispatcher.go +++ b/pkg/dispatcher/natss_dispatcher.go @@ -46,7 +46,8 @@ import ( const ( // maxElements defines a maximum number of outstanding re-connect requests - maxElements = 10 + maxElements = 10 + natssChannel = "natss-channel" ) var ( @@ -328,7 +329,14 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin event := tracing.ConvertNatssMsgToEvent(s.logger, stanMsg) additionalHeaders := tracing.ConvertEventToHttpHeader(event) - ctx, span := tracing.StartTraceFromMessage(s.logger, ctx, event, "natsschannel-"+channel.Name) + sc, ok := tracing.ParseSpanContext(event) + var span *trace.Span + if !ok { + s.logger.Warn("Cannot parse the spancontext, creating a new span") + ctx, span = trace.StartSpan(ctx, natssChannel+"-"+channel.Name) + } else { + ctx, span = trace.StartSpanWithRemoteParent(ctx, natssChannel+"-"+channel.Name, sc) + } defer span.End() executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter) diff --git a/pkg/reconciler/dispatcher/jetstream/nats_jetstreaming_schannel.go b/pkg/reconciler/dispatcher/jetstream/nats_jetstreaming_schannel.go index 8de6a9641..a179d8928 100644 --- a/pkg/reconciler/dispatcher/jetstream/nats_jetstreaming_schannel.go +++ b/pkg/reconciler/dispatcher/jetstream/nats_jetstreaming_schannel.go @@ -21,6 +21,10 @@ import ( "fmt" "strings" + configmapinformer "knative.dev/pkg/configmap/informer" + "knative.dev/pkg/tracing" + tracingconfig "knative.dev/pkg/tracing/config" + corev1 "k8s.io/api/core/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -54,11 +58,11 @@ import ( ) const ( -// controllerAgentName is the string used by this controller to identify -// itself when creating events. -//controllerAgentName = "jetstream-ch-dispatcher" + // controllerAgentName is the string used by this controller to identify + // itself when creating events. + controllerAgentName = "jetstream-ch-dispatcher" -//finalizerName = controllerAgentName + //finalizerName = controllerAgentName ) // Reconciler reconciles NATS JetStream Channels. @@ -71,6 +75,9 @@ type Reconciler struct { impl *controller.Impl } +// take func ref, to be able to mock for tests +var setupDynamicPublishing = tracing.SetupDynamicPublishing + // Check that our Reconciler implements controller.Reconciler. var _ jetstreamchannelreconciler.Interface = (*Reconciler)(nil) var _ jetstreamchannelreconciler.Finalizer = (*Reconciler)(nil) @@ -82,10 +89,16 @@ type envConfig struct { // NewController initializes the controller and is called by the generated code. // Registers event handlers to enqueue events. -func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl { +func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl { logger := logging.FromContext(ctx) + // Setup trace publishing. + iw := watcher.(*configmapinformer.InformedWatcher) + if err := setupDynamicPublishing(logger, iw, controllerAgentName, tracingconfig.ConfigName); err != nil { + logger.Fatalw("Error setting up trace publishing", zap.Error(err)) + } + var env envConfig if err := envconfig.Process("", &env); err != nil { logger.Fatalw("Failed to process env var", zap.Error(err)) diff --git a/pkg/tracing/tracing.go b/pkg/tracing/tracing.go index b08667842..fb5cfede8 100644 --- a/pkg/tracing/tracing.go +++ b/pkg/tracing/tracing.go @@ -5,6 +5,8 @@ import ( "encoding/json" "net/http" + "github.com/nats-io/nats.go" + cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/transformer" @@ -39,7 +41,7 @@ func keyValTransformer(key string, value string) binding.TransformerFunc { } func StartTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *event.Event, spanName string) (context.Context, *trace.Span) { - sc, ok := parseSpanContext(message) + sc, ok := ParseSpanContext(message) if !ok { logger.Warn("Cannot parse the spancontext, creating a new span") return trace.StartSpan(inCtx, spanName) @@ -48,7 +50,7 @@ func StartTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *e return trace.StartSpanWithRemoteParent(inCtx, spanName, sc) } -func parseSpanContext(message *event.Event) (sc trace.SpanContext, ok bool) { +func ParseSpanContext(message *event.Event) (sc trace.SpanContext, ok bool) { if message == nil { return trace.SpanContext{}, false } @@ -56,30 +58,46 @@ func parseSpanContext(message *event.Event) (sc trace.SpanContext, ok bool) { if !ok { return trace.SpanContext{}, false } - ts := message.Extensions()[traceStateHeader].(string) + ts, _ := message.Extensions()[traceStateHeader].(string) return format.SpanContextFromHeaders(tp, ts) } func ConvertEventToHttpHeader(message *event.Event) http.Header { additionalHeaders := http.Header{} - if message == nil { - return additionalHeaders + tp, ok := message.Extensions()[traceParentHeader].(string) + if ok { + additionalHeaders.Add(traceParentHeader, tp) + } + ts, ok := message.Extensions()[traceStateHeader].(string) + if ok { + additionalHeaders.Add(traceStateHeader, ts) } - tp := message.Extensions()[traceParentHeader].(string) - ts := message.Extensions()[traceStateHeader].(string) - additionalHeaders.Add(traceParentHeader, tp) - additionalHeaders.Add(traceStateHeader, ts) - return additionalHeaders } -func ConvertNatssMsgToEvent(logger *zap.Logger, stanMsg *stan.Msg) *event.Event { +func ConvertNatssMsgToEvent(logger *zap.Logger, msg *stan.Msg) *event.Event { message := cloudevents.NewEvent() - err := json.Unmarshal(stanMsg.Data, &message) - if err != nil { + if msg == nil || msg.Data == nil { + return &message + } + if err := json.Unmarshal(msg.Data, &message); err != nil { logger.Error("could not create an event from stan msg", zap.Error(err)) - return nil + return &message + } + + return &message +} + +func ConvertNatsMsgToEvent(logger *zap.Logger, msg *nats.Msg) *event.Event { + message := cloudevents.NewEvent() + if msg == nil || msg.Data == nil { + return &message + } + err := json.Unmarshal(msg.Data, &message) + if err != nil { + logger.Error("could not create an event from nats msg", zap.Error(err)) + return &message } return &message diff --git a/pkg/tracing/tracing_test.go b/pkg/tracing/tracing_test.go new file mode 100644 index 000000000..e652b1237 --- /dev/null +++ b/pkg/tracing/tracing_test.go @@ -0,0 +1,167 @@ +package tracing + +import ( + "context" + "testing" + + "github.com/cloudevents/sdk-go/v2/binding" + "go.opencensus.io/trace" + + cloudevents "github.com/cloudevents/sdk-go/v2" + "github.com/nats-io/nats.go" + "github.com/nats-io/stan.go" + "go.uber.org/zap" +) + +const data = `{ + "specversion":"1.0", + "type":"type", + "source":"source", + "id":"1234-1234-1234", + "traceparent": "00-8abe1a4854a9864ffa63046ef07b5dbe-8829876d85d5a76d-01", + "tracestate": "rojo=00f067aa0ba902b7", + "data":{"firstName":"John"} }` +const traceId = "8abe1a4854a9864ffa63046ef07b5dbe" +const tp = "00-" + traceId + "-8829876d85d5a76d-01" +const ts = "rojo=00f067aa0ba902b7" + +func TestConvertEventToHttpHeader(t *testing.T) { + event := cloudevents.NewEvent() + event.SetExtension(traceParentHeader, tp) + event.SetExtension(traceStateHeader, ts) + + headers := ConvertEventToHttpHeader(&event) + if headers.Get(traceParentHeader) != tp { + t.Fatalf("%s header mismatch", traceParentHeader) + } + if headers.Get(traceStateHeader) != ts { + t.Fatalf("%s header mismatch", traceStateHeader) + } +} + +func TestConvertEventToHttpHeaderEmptyEvent(t *testing.T) { + event := cloudevents.NewEvent() + headers := ConvertEventToHttpHeader(&event) + if headers.Get(traceParentHeader) != "" { + t.Fatalf("%s header must be empty", traceParentHeader) + } + if headers.Get(traceStateHeader) != "" { + t.Fatalf("%s header must be empty", traceStateHeader) + } +} + +func TestConvertNatsMsgToEventIsNotNullableIfNil(t *testing.T) { + message := ConvertNatsMsgToEvent(zap.NewNop(), nil) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestConvertNatsMsgToEventIsNotNullableEmptyData(t *testing.T) { + msg := nats.NewMsg("subject") + msg.Data = []byte("{}") + message := ConvertNatsMsgToEvent(zap.NewNop(), msg) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestConvertNatsMsgToEventIsNotNullableData(t *testing.T) { + msg := nats.Msg{} + msg.Data = []byte(data) + message := ConvertNatsMsgToEvent(zap.NewNop(), &msg) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestConvertNatssMsgToEventIsNotNullableIfNil(t *testing.T) { + message := ConvertNatssMsgToEvent(zap.NewNop(), nil) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestConvertNatssMsgToEventIsNotNullableEmptyData(t *testing.T) { + msg := stan.Msg{} + msg.Data = []byte("{}") + message := ConvertNatssMsgToEvent(zap.NewNop(), &msg) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestConvertNatssMsgToEventIsNotNullableData(t *testing.T) { + msg := stan.Msg{} + msg.Data = []byte(data) + message := ConvertNatssMsgToEvent(zap.NewNop(), &msg) + if message == nil { + t.Fatalf("Message must be non-nil") + } +} + +func TestStartTraceFromMessage(t *testing.T) { + natMsg := nats.Msg{} + natMsg.Data = []byte(data) + msg := ConvertNatsMsgToEvent(zap.NewNop(), &natMsg) + ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), msg, "span-name") + tc := trace.FromContext(ctx) + if traceId != tc.SpanContext().TraceID.String() { + t.Fatalf("TraceId is incorrect, expected: %v, actual: %v", traceId, tc.SpanContext().TraceID) + } + if span == nil { + t.Fatalf("Span must be non-nil") + } +} + +func TestStartTraceFromMessageIsNil(t *testing.T) { + ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), nil, "span-name") + tc := trace.FromContext(ctx) + if traceId == tc.SpanContext().TraceID.String() { + t.Fatalf("TraceId must be new") + } + if span == nil { + t.Fatalf("Span must be non-nil") + } +} + +func TestStartTraceFromMessageTraceParentIsNil(t *testing.T) { + msg := cloudevents.NewEvent() + ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), &msg, "span-name") + tc := trace.FromContext(ctx) + if traceId == tc.SpanContext().TraceID.String() { + t.Fatalf("TraceId must be new") + } + if span == nil { + t.Fatalf("Span must be non-nil") + } +} + +func TestStartTraceFromMessageTraceStateIsNil(t *testing.T) { + msg := cloudevents.NewEvent() + msg.SetExtension(traceParentHeader, tp) + ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), &msg, "span-name") + tc := trace.FromContext(ctx) + if traceId != tc.SpanContext().TraceID.String() { + t.Fatalf("TraceId is incorrect, expected: %v, actual: %v", traceId, tc.SpanContext().TraceID) + } + if span == nil { + t.Fatalf("Span must be non-nil") + } +} + +func TestSerializeTraceTransformers(t *testing.T) { + msg := cloudevents.NewEvent() + msg.SetExtension(traceParentHeader, tp) + msg.SetExtension(traceStateHeader, ts) + sc, _ := format.SpanContextFromHeaders(tp, ts) + transformers := SerializeTraceTransformers(sc) + message := binding.ToMessage(&msg) + event, _ := binding.ToEvent(context.Background(), message, transformers...) + if tp != event.Extensions()[traceParentHeader] { + t.Fatalf("Traceparent is incorrect, expected: %v, actual: %v", tp, event.Extensions()[traceParentHeader]) + } + if ts != event.Extensions()[traceStateHeader] { + t.Fatalf("Tracestate is incorrect, expected: %v, actual: %v", tp, event.Extensions()[traceStateHeader]) + } +}