Skip to content

Commit

Permalink
jsm tracing propagation fixed release-1.3.x (#365)
Browse files Browse the repository at this point in the history
* jsm tracing propagation fixed

* refactoring, not functional changes: renamed arguments, inlined func call into if-err block

* refactored tracing.go to omit returning nil
added tests for tracing.go

* changed way how http header obtained in tests

* added nil check for msg.Data as well

* added nil check into dispatcher

* code style fix: ineffectual assignment to span

* tracing full test coverage

* improved testing, refactored jsm dispatcher according to review comments
  • Loading branch information
astelmashenko authored Sep 21, 2022
1 parent 2017237 commit 6a38645
Show file tree
Hide file tree
Showing 5 changed files with 248 additions and 23 deletions.
23 changes: 21 additions & 2 deletions pkg/dispatcher/jetstream_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"sync/atomic"
"time"

"go.opencensus.io/trace"
"knative.dev/eventing-natss/pkg/tracing"

"k8s.io/apimachinery/pkg/types"

jsmcloudevents "github.com/cloudevents/sdk-go/protocol/nats_jetstream/v2"
Expand All @@ -43,6 +46,7 @@ import (
const (
// maxJetElements defines a maximum number of outstanding re-connect requests
maxJetElements = 10
jsmChannel = "jsm-channel"
)

var (
Expand Down Expand Up @@ -146,7 +150,9 @@ func jetmessageReceiverFunc(s *jetSubscriptionsSupervisor) eventingchannels.Unbu
s.logger.Error("could not create nats jetstream sender", zap.Error(err))
return errors.Wrap(err, "could not create nats jetstream sender")
}
if err := sender.Send(ctx, message); err != nil {

tpTsTransformers := tracing.SerializeTraceTransformers(trace.FromContext(ctx).SpanContext())
if err := sender.Send(ctx, message, tpTsTransformers...); err != nil {
errMsg := "error during send"
if err.Error() == stan.ErrConnectionClosed.Error() {
errMsg += " - connection to NATSS has been lost, attempting to reconnect"
Expand Down Expand Up @@ -316,7 +322,20 @@ func (s *jetSubscriptionsSupervisor) subscribe(ctx context.Context, channel even
s.logger.Debug("dispatch message", zap.String("deadLetter", deadLetter.String()))
}

executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, nil, destination, reply, deadLetter)
event := tracing.ConvertNatsMsgToEvent(s.logger, stanMsg)
additionalHeaders := tracing.ConvertEventToHttpHeader(event)

sc, ok := tracing.ParseSpanContext(event)
var span *trace.Span
if !ok {
s.logger.Warn("Cannot parse the spancontext, creating a new span")
ctx, span = trace.StartSpan(ctx, jsmChannel+"-"+channel.Name)
} else {
ctx, span = trace.StartSpanWithRemoteParent(ctx, jsmChannel+"-"+channel.Name, sc)
}
defer span.End()

executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)
if err != nil {
s.logger.Error("Failed to dispatch message: ", zap.Error(err))
return
Expand Down
12 changes: 10 additions & 2 deletions pkg/dispatcher/natss_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ import (

const (
// maxElements defines a maximum number of outstanding re-connect requests
maxElements = 10
maxElements = 10
natssChannel = "natss-channel"
)

var (
Expand Down Expand Up @@ -328,7 +329,14 @@ func (s *subscriptionsSupervisor) subscribe(ctx context.Context, channel eventin
event := tracing.ConvertNatssMsgToEvent(s.logger, stanMsg)
additionalHeaders := tracing.ConvertEventToHttpHeader(event)

ctx, span := tracing.StartTraceFromMessage(s.logger, ctx, event, "natsschannel-"+channel.Name)
sc, ok := tracing.ParseSpanContext(event)
var span *trace.Span
if !ok {
s.logger.Warn("Cannot parse the spancontext, creating a new span")
ctx, span = trace.StartSpan(ctx, natssChannel+"-"+channel.Name)
} else {
ctx, span = trace.StartSpanWithRemoteParent(ctx, natssChannel+"-"+channel.Name, sc)
}
defer span.End()

executionInfo, err := s.dispatcher.DispatchMessage(ctx, message, additionalHeaders, destination, reply, deadLetter)
Expand Down
23 changes: 18 additions & 5 deletions pkg/reconciler/dispatcher/jetstream/nats_jetstreaming_schannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ import (
"fmt"
"strings"

configmapinformer "knative.dev/pkg/configmap/informer"
"knative.dev/pkg/tracing"
tracingconfig "knative.dev/pkg/tracing/config"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
Expand Down Expand Up @@ -54,11 +58,11 @@ import (
)

const (
// controllerAgentName is the string used by this controller to identify
// itself when creating events.
//controllerAgentName = "jetstream-ch-dispatcher"
// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "jetstream-ch-dispatcher"

//finalizerName = controllerAgentName
//finalizerName = controllerAgentName
)

// Reconciler reconciles NATS JetStream Channels.
Expand All @@ -71,6 +75,9 @@ type Reconciler struct {
impl *controller.Impl
}

// take func ref, to be able to mock for tests
var setupDynamicPublishing = tracing.SetupDynamicPublishing

// Check that our Reconciler implements controller.Reconciler.
var _ jetstreamchannelreconciler.Interface = (*Reconciler)(nil)
var _ jetstreamchannelreconciler.Finalizer = (*Reconciler)(nil)
Expand All @@ -82,10 +89,16 @@ type envConfig struct {

// NewController initializes the controller and is called by the generated code.
// Registers event handlers to enqueue events.
func NewController(ctx context.Context, _ configmap.Watcher) *controller.Impl {
func NewController(ctx context.Context, watcher configmap.Watcher) *controller.Impl {

logger := logging.FromContext(ctx)

// Setup trace publishing.
iw := watcher.(*configmapinformer.InformedWatcher)
if err := setupDynamicPublishing(logger, iw, controllerAgentName, tracingconfig.ConfigName); err != nil {
logger.Fatalw("Error setting up trace publishing", zap.Error(err))
}

var env envConfig
if err := envconfig.Process("", &env); err != nil {
logger.Fatalw("Failed to process env var", zap.Error(err))
Expand Down
46 changes: 32 additions & 14 deletions pkg/tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"encoding/json"
"net/http"

"github.com/nats-io/nats.go"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/transformer"
Expand Down Expand Up @@ -39,7 +41,7 @@ func keyValTransformer(key string, value string) binding.TransformerFunc {
}

func StartTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *event.Event, spanName string) (context.Context, *trace.Span) {
sc, ok := parseSpanContext(message)
sc, ok := ParseSpanContext(message)
if !ok {
logger.Warn("Cannot parse the spancontext, creating a new span")
return trace.StartSpan(inCtx, spanName)
Expand All @@ -48,38 +50,54 @@ func StartTraceFromMessage(logger *zap.Logger, inCtx context.Context, message *e
return trace.StartSpanWithRemoteParent(inCtx, spanName, sc)
}

func parseSpanContext(message *event.Event) (sc trace.SpanContext, ok bool) {
func ParseSpanContext(message *event.Event) (sc trace.SpanContext, ok bool) {
if message == nil {
return trace.SpanContext{}, false
}
tp, ok := message.Extensions()[traceParentHeader].(string)
if !ok {
return trace.SpanContext{}, false
}
ts := message.Extensions()[traceStateHeader].(string)
ts, _ := message.Extensions()[traceStateHeader].(string)

return format.SpanContextFromHeaders(tp, ts)
}

func ConvertEventToHttpHeader(message *event.Event) http.Header {
additionalHeaders := http.Header{}
if message == nil {
return additionalHeaders
tp, ok := message.Extensions()[traceParentHeader].(string)
if ok {
additionalHeaders.Add(traceParentHeader, tp)
}
ts, ok := message.Extensions()[traceStateHeader].(string)
if ok {
additionalHeaders.Add(traceStateHeader, ts)
}
tp := message.Extensions()[traceParentHeader].(string)
ts := message.Extensions()[traceStateHeader].(string)
additionalHeaders.Add(traceParentHeader, tp)
additionalHeaders.Add(traceStateHeader, ts)

return additionalHeaders
}

func ConvertNatssMsgToEvent(logger *zap.Logger, stanMsg *stan.Msg) *event.Event {
func ConvertNatssMsgToEvent(logger *zap.Logger, msg *stan.Msg) *event.Event {
message := cloudevents.NewEvent()
err := json.Unmarshal(stanMsg.Data, &message)
if err != nil {
if msg == nil || msg.Data == nil {
return &message
}
if err := json.Unmarshal(msg.Data, &message); err != nil {
logger.Error("could not create an event from stan msg", zap.Error(err))
return nil
return &message
}

return &message
}

func ConvertNatsMsgToEvent(logger *zap.Logger, msg *nats.Msg) *event.Event {
message := cloudevents.NewEvent()
if msg == nil || msg.Data == nil {
return &message
}
err := json.Unmarshal(msg.Data, &message)
if err != nil {
logger.Error("could not create an event from nats msg", zap.Error(err))
return &message
}

return &message
Expand Down
167 changes: 167 additions & 0 deletions pkg/tracing/tracing_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package tracing

import (
"context"
"testing"

"github.com/cloudevents/sdk-go/v2/binding"
"go.opencensus.io/trace"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/nats-io/nats.go"
"github.com/nats-io/stan.go"
"go.uber.org/zap"
)

const data = `{
"specversion":"1.0",
"type":"type",
"source":"source",
"id":"1234-1234-1234",
"traceparent": "00-8abe1a4854a9864ffa63046ef07b5dbe-8829876d85d5a76d-01",
"tracestate": "rojo=00f067aa0ba902b7",
"data":{"firstName":"John"} }`
const traceId = "8abe1a4854a9864ffa63046ef07b5dbe"
const tp = "00-" + traceId + "-8829876d85d5a76d-01"
const ts = "rojo=00f067aa0ba902b7"

func TestConvertEventToHttpHeader(t *testing.T) {
event := cloudevents.NewEvent()
event.SetExtension(traceParentHeader, tp)
event.SetExtension(traceStateHeader, ts)

headers := ConvertEventToHttpHeader(&event)
if headers.Get(traceParentHeader) != tp {
t.Fatalf("%s header mismatch", traceParentHeader)
}
if headers.Get(traceStateHeader) != ts {
t.Fatalf("%s header mismatch", traceStateHeader)
}
}

func TestConvertEventToHttpHeaderEmptyEvent(t *testing.T) {
event := cloudevents.NewEvent()
headers := ConvertEventToHttpHeader(&event)
if headers.Get(traceParentHeader) != "" {
t.Fatalf("%s header must be empty", traceParentHeader)
}
if headers.Get(traceStateHeader) != "" {
t.Fatalf("%s header must be empty", traceStateHeader)
}
}

func TestConvertNatsMsgToEventIsNotNullableIfNil(t *testing.T) {
message := ConvertNatsMsgToEvent(zap.NewNop(), nil)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestConvertNatsMsgToEventIsNotNullableEmptyData(t *testing.T) {
msg := nats.NewMsg("subject")
msg.Data = []byte("{}")
message := ConvertNatsMsgToEvent(zap.NewNop(), msg)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestConvertNatsMsgToEventIsNotNullableData(t *testing.T) {
msg := nats.Msg{}
msg.Data = []byte(data)
message := ConvertNatsMsgToEvent(zap.NewNop(), &msg)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestConvertNatssMsgToEventIsNotNullableIfNil(t *testing.T) {
message := ConvertNatssMsgToEvent(zap.NewNop(), nil)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestConvertNatssMsgToEventIsNotNullableEmptyData(t *testing.T) {
msg := stan.Msg{}
msg.Data = []byte("{}")
message := ConvertNatssMsgToEvent(zap.NewNop(), &msg)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestConvertNatssMsgToEventIsNotNullableData(t *testing.T) {
msg := stan.Msg{}
msg.Data = []byte(data)
message := ConvertNatssMsgToEvent(zap.NewNop(), &msg)
if message == nil {
t.Fatalf("Message must be non-nil")
}
}

func TestStartTraceFromMessage(t *testing.T) {
natMsg := nats.Msg{}
natMsg.Data = []byte(data)
msg := ConvertNatsMsgToEvent(zap.NewNop(), &natMsg)
ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), msg, "span-name")
tc := trace.FromContext(ctx)
if traceId != tc.SpanContext().TraceID.String() {
t.Fatalf("TraceId is incorrect, expected: %v, actual: %v", traceId, tc.SpanContext().TraceID)
}
if span == nil {
t.Fatalf("Span must be non-nil")
}
}

func TestStartTraceFromMessageIsNil(t *testing.T) {
ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), nil, "span-name")
tc := trace.FromContext(ctx)
if traceId == tc.SpanContext().TraceID.String() {
t.Fatalf("TraceId must be new")
}
if span == nil {
t.Fatalf("Span must be non-nil")
}
}

func TestStartTraceFromMessageTraceParentIsNil(t *testing.T) {
msg := cloudevents.NewEvent()
ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), &msg, "span-name")
tc := trace.FromContext(ctx)
if traceId == tc.SpanContext().TraceID.String() {
t.Fatalf("TraceId must be new")
}
if span == nil {
t.Fatalf("Span must be non-nil")
}
}

func TestStartTraceFromMessageTraceStateIsNil(t *testing.T) {
msg := cloudevents.NewEvent()
msg.SetExtension(traceParentHeader, tp)
ctx, span := StartTraceFromMessage(zap.NewNop(), context.Background(), &msg, "span-name")
tc := trace.FromContext(ctx)
if traceId != tc.SpanContext().TraceID.String() {
t.Fatalf("TraceId is incorrect, expected: %v, actual: %v", traceId, tc.SpanContext().TraceID)
}
if span == nil {
t.Fatalf("Span must be non-nil")
}
}

func TestSerializeTraceTransformers(t *testing.T) {
msg := cloudevents.NewEvent()
msg.SetExtension(traceParentHeader, tp)
msg.SetExtension(traceStateHeader, ts)
sc, _ := format.SpanContextFromHeaders(tp, ts)
transformers := SerializeTraceTransformers(sc)
message := binding.ToMessage(&msg)
event, _ := binding.ToEvent(context.Background(), message, transformers...)
if tp != event.Extensions()[traceParentHeader] {
t.Fatalf("Traceparent is incorrect, expected: %v, actual: %v", tp, event.Extensions()[traceParentHeader])
}
if ts != event.Extensions()[traceStateHeader] {
t.Fatalf("Tracestate is incorrect, expected: %v, actual: %v", tp, event.Extensions()[traceStateHeader])
}
}

0 comments on commit 6a38645

Please # to comment.