Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Optionally supply tenancy when reporting spans v2 #3750

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/agent/app/proxy_builders.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,6 @@ import (
// GRPCCollectorProxyBuilder creates CollectorProxyBuilder for GRPC reporter
func GRPCCollectorProxyBuilder(builder *grpc.ConnBuilder) CollectorProxyBuilder {
return func(opts ProxyBuilderOptions) (proxy CollectorProxy, err error) {
return grpc.NewCollectorProxy(builder, opts.AgentTags, opts.Metrics, opts.Logger)
return grpc.NewCollectorProxy(builder, opts.AgentTags, opts.Metrics, builder.CollectorTenancyHeader, opts.Logger)
}
}
3 changes: 3 additions & 0 deletions cmd/agent/app/reporter/grpc/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type ConnBuilder struct {
DiscoveryMinPeers int
Notifier discovery.Notifier
Discoverer discovery.Discoverer

// CollectorTenancyHeader is the header used for reporting to a multi-tenant Jaeger, typically x-tenant
CollectorTenancyHeader string
}

// NewConnBuilder creates a new grpc connection builder.
Expand Down
3 changes: 2 additions & 1 deletion cmd/agent/app/reporter/grpc/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func TestProxyBuilder(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
proxy, err := NewCollectorProxy(test.grpcBuilder, nil, metrics.NullFactory, zap.NewNop())
proxy, err := NewCollectorProxy(test.grpcBuilder, nil, metrics.NullFactory, "", zap.NewNop())
if test.expectError {
require.Error(t, err)
} else {
Expand Down Expand Up @@ -359,6 +359,7 @@ func TestProxyClientTLS(t *testing.T) {
grpcBuilder,
nil,
mFactory,
"",
zap.NewNop())

require.NoError(t, err)
Expand Down
4 changes: 2 additions & 2 deletions cmd/agent/app/reporter/grpc/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ type ProxyBuilder struct {
}

// NewCollectorProxy creates ProxyBuilder
func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, logger *zap.Logger) (*ProxyBuilder, error) {
func NewCollectorProxy(builder *ConnBuilder, agentTags map[string]string, mFactory metrics.Factory, tenantHeader string, logger *zap.Logger) (*ProxyBuilder, error) {
conn, err := builder.CreateConnection(logger, mFactory)
if err != nil {
return nil, err
}
grpcMetrics := mFactory.Namespace(metrics.NSOptions{Name: "", Tags: map[string]string{"protocol": "grpc"}})
r1 := NewReporter(conn, agentTags, logger)
r1 := NewMultitenantReporter(conn, agentTags, tenantHeader, logger)
r2 := reporter.WrapWithMetrics(r1, grpcMetrics)
r3 := reporter.WrapWithClientMetrics(reporter.ClientMetricsReporterParams{
Reporter: r2,
Expand Down
2 changes: 1 addition & 1 deletion cmd/agent/app/reporter/grpc/collector_proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func TestMultipleCollectors(t *testing.T) {
defer s2.Stop()

mFactory := metricstest.NewFactory(time.Microsecond)
proxy, err := NewCollectorProxy(&ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, zap.NewNop())
proxy, err := NewCollectorProxy(&ConnBuilder{CollectorHostPorts: []string{addr1.String(), addr2.String()}}, nil, mFactory, "", zap.NewNop())
require.NoError(t, err)
require.NotNil(t, proxy)
assert.NotNil(t, proxy.GetReporter())
Expand Down
13 changes: 8 additions & 5 deletions cmd/agent/app/reporter/grpc/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ import (
)

const (
gRPCPrefix = "reporter.grpc"
collectorHostPort = gRPCPrefix + ".host-port"
retry = gRPCPrefix + ".retry.max"
defaultMaxRetry = 3
discoveryMinPeers = gRPCPrefix + ".discovery.min-peers"
gRPCPrefix = "reporter.grpc"
collectorHostPort = gRPCPrefix + ".host-port"
retry = gRPCPrefix + ".retry.max"
defaultMaxRetry = 3
discoveryMinPeers = gRPCPrefix + ".discovery.min-peers"
collectorTenancyHeader = gRPCPrefix + ".tenancy-header"
)

var tlsFlagsConfig = tlscfg.ClientFlagsConfig{
Expand All @@ -42,6 +43,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Int(discoveryMinPeers, 3, "Max number of collectors to which the agent will try to connect at any given time")
flags.String(collectorHostPort, "", "Comma-separated string representing host:port of a static list of collectors to connect to directly")
tlsFlagsConfig.AddFlags(flags)
flags.String(collectorTenancyHeader, "", "HTTP header carrying reporter tenant")
}

// InitFromViper initializes Options with properties retrieved from Viper.
Expand All @@ -57,5 +59,6 @@ func (b *ConnBuilder) InitFromViper(v *viper.Viper) (*ConnBuilder, error) {
return b, fmt.Errorf("failed to process TLS options: %w", err)
}
b.DiscoveryMinPeers = v.GetInt(discoveryMinPeers)
b.CollectorTenancyHeader = v.GetString(collectorTenancyHeader)
return b, nil
}
4 changes: 4 additions & 0 deletions cmd/agent/app/reporter/grpc/flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func TestBindFlags(t *testing.T) {
cOpts: []string{"--reporter.grpc.host-port=localhost:1111,localhost:2222", "--reporter.grpc.discovery.min-peers=5"},
expected: &ConnBuilder{CollectorHostPorts: []string{"localhost:1111", "localhost:2222"}, MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 5},
},
{
cOpts: []string{"--reporter.grpc.tenancy-header=jaeger-tenant"},
expected: &ConnBuilder{MaxRetry: defaultMaxRetry, DiscoveryMinPeers: 3, CollectorTenancyHeader: "jaeger-tenant"},
},
}
for _, test := range tests {
v := viper.New()
Expand Down
66 changes: 61 additions & 5 deletions cmd/agent/app/reporter/grpc/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,17 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

zipkin2 "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/model"
jConverter "github.com/jaegertracing/jaeger/model/converter/thrift/jaeger"
"github.com/jaegertracing/jaeger/model/converter/thrift/zipkin"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
"github.com/jaegertracing/jaeger/storage"
thrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)
Expand All @@ -36,21 +41,64 @@ type Reporter struct {
agentTags []model.KeyValue
logger *zap.Logger
sanitizer zipkin2.Sanitizer

tenantHeader string
}

// NewReporter creates gRPC reporter.
func NewReporter(conn *grpc.ClientConn, agentTags map[string]string, logger *zap.Logger) *Reporter {
return NewMultitenantReporter(conn, agentTags, "", logger)
}

func NewMultitenantReporter(conn *grpc.ClientConn, agentTags map[string]string, tenantHeader string, logger *zap.Logger) *Reporter {
return &Reporter{
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.NewStandardSanitizers()...),
collector: api_v2.NewCollectorServiceClient(conn),
agentTags: makeModelKeyValue(agentTags),
logger: logger,
sanitizer: zipkin2.NewChainedSanitizer(zipkin2.NewStandardSanitizers()...),
tenantHeader: tenantHeader,
}
}

// EmitBatch implements EmitBatch() of Reporter
func (r *Reporter) EmitBatch(ctx context.Context, b *thrift.Batch) error {
return r.send(ctx, jConverter.ToDomain(b.Spans, nil), jConverter.ToDomainProcess(b.Process))
// If we aren't sending tenant headers, forward along the batch
if r.tenantHeader == "" {
return r.send(ctx, jConverter.ToDomain(b.Spans, nil), jConverter.ToDomainProcess(b.Process))
}

// Partition batch by tenant
batches := make(map[string]*[]*thrift.Span)
for _, span := range b.Spans {
tenant := ""
for _, tag := range span.Tags {
if tag.GetKey() == app.TenancyTag {
tenant = tag.GetVStr()
break
}
}
if tenant == "" {
tenant = tenancy.MissingTenant
}

batch, ok := batches[tenant]
if !ok {
batch = &[]*thrift.Span{}
batches[tenant] = batch
}
*batch = append(*batch, span)
}

// Send each tenant's spans
var errors []error
for tenant, partitionedBatch := range batches {
tenantedCtx := storage.WithTenant(ctx, tenant)
err := r.send(tenantedCtx, jConverter.ToDomain(*partitionedBatch, nil), jConverter.ToDomainProcess(b.Process))
if err != nil {
errors = append(errors, err)
}
}
return multierror.Wrap(errors)
}

// EmitZipkinBatch implements EmitZipkinBatch() of Reporter
Expand All @@ -66,6 +114,14 @@ func (r *Reporter) EmitZipkinBatch(ctx context.Context, zSpans []*zipkincore.Spa
}

func (r *Reporter) send(ctx context.Context, spans []*model.Span, process *model.Process) error {
if r.tenantHeader != "" {
tenant := storage.GetTenant(ctx)
md := metadata.New(map[string]string{
r.tenantHeader: tenant,
})
ctx = metadata.NewOutgoingContext(ctx, md)
}

spans, process = addProcessTags(spans, process, r.agentTags)
batch := model.Batch{Spans: spans, Process: process}
req := &api_v2.PostSpansRequest{Batch: batch}
Expand Down
83 changes: 83 additions & 0 deletions cmd/agent/app/reporter/grpc/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/jaegertracing/jaeger/cmd/query/app"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
jThrift "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
Expand All @@ -35,6 +37,9 @@ import (
type mockSpanHandler struct {
mux sync.Mutex
requests []*api_v2.PostSpansRequest

tenantHeader string
tenants map[string]bool
}

func (h *mockSpanHandler) getRequests() []*api_v2.PostSpansRequest {
Expand All @@ -43,10 +48,28 @@ func (h *mockSpanHandler) getRequests() []*api_v2.PostSpansRequest {
return h.requests
}

func (h *mockSpanHandler) getTenants() map[string]bool {
h.mux.Lock()
defer h.mux.Unlock()
return h.tenants
}

func (h *mockSpanHandler) PostSpans(c context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
h.mux.Lock()
defer h.mux.Unlock()
h.requests = append(h.requests, r)
if h.tenants == nil {
h.tenants = make(map[string]bool)
}
if h.tenantHeader != "" {
md, ok := metadata.FromIncomingContext(c)
if ok {
tenants := md.Get(h.tenantHeader)
if len(tenants) > 0 {
h.tenants[tenants[0]] = true
}
}
}
return &api_v2.PostSpansResponse{}, nil
}

Expand Down Expand Up @@ -178,3 +201,63 @@ func TestReporter_MakeModelKeyValue(t *testing.T) {

assert.Equal(t, expectedTags, actualTags)
}

func TestReporter_EmitTenantedBatch(t *testing.T) {
handler := &mockSpanHandler{
tenantHeader: "x-tenant",
}
s, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer s.Stop()
conn, err := grpc.Dial(addr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
//nolint:staticcheck // don't care about errors
defer conn.Close()
require.NoError(t, err)
rep := NewMultitenantReporter(conn, nil, "x-tenant", zap.NewNop())

exampleTag := "acme"
tm := time.Unix(158, 0)
tests := []struct {
in *jThrift.Batch
expected model.Batch
err string

expectedTenants map[string]bool
}{
{
in: &jThrift.Batch{Process: &jThrift.Process{ServiceName: "node"}, Spans: []*jThrift.Span{{
OperationName: "foo",
StartTime: int64(model.TimeAsEpochMicroseconds(tm)),
Tags: []*jThrift.Tag{
{
Key: app.TenancyTag,
VStr: &exampleTag,
},
},
}}},
expected: model.Batch{Process: &model.Process{ServiceName: "node"}, Spans: []*model.Span{{
OperationName: "foo", StartTime: tm.UTC(),
Tags: []model.KeyValue{
{
Key: app.TenancyTag,
VStr: exampleTag,
},
},
}}},
expectedTenants: map[string]bool{
"acme": true,
},
},
}
for _, test := range tests {
err = rep.EmitBatch(context.Background(), test.in)
if test.err != "" {
assert.EqualError(t, err, test.err)
} else {
require.Equal(t, 1, len(handler.requests))
assert.Equal(t, test.expected, handler.requests[0].GetBatch())
assert.Equal(t, test.expectedTenants, handler.getTenants())
}
}
}
9 changes: 9 additions & 0 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/jaegertracing/jaeger/cmd/query/app/querysvc"
"github.com/jaegertracing/jaeger/cmd/status"
"github.com/jaegertracing/jaeger/pkg/config"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/version"
metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics"
ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore"
Expand Down Expand Up @@ -154,6 +155,10 @@ by default uses only in-memory database.`,
if err != nil {
logger.Fatal("Failed to configure query service", zap.Error(err))
}
tOpts, err := tenancy.InitFromViper(v)
if err != nil {
logger.Fatal("Failed to initialize tenancy", zap.Error(err))
}

// collector
c := collectorApp.New(&collectorApp.CollectorParams{
Expand All @@ -174,6 +179,10 @@ by default uses only in-memory database.`,
if len(grpcBuilder.CollectorHostPorts) == 0 {
grpcBuilder.CollectorHostPorts = append(grpcBuilder.CollectorHostPorts, cOpts.GRPC.HostPort)
}
// if the agent reporter grpc tenant header was not explicitly set then use whatever the collector expects, if anything
if grpcBuilder.CollectorTenancyHeader == "" && tOpts.Enabled {
grpcBuilder.CollectorTenancyHeader = tOpts.Header
}
agentMetricsFactory := metricsFactory.Namespace(metrics.NSOptions{Name: "agent", Tags: nil})
builders := map[agentRep.Type]agentApp.CollectorProxyBuilder{
agentRep.GRPC: agentApp.GRPCCollectorProxyBuilder(grpcBuilder),
Expand Down
20 changes: 19 additions & 1 deletion cmd/query/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/jaegertracing/jaeger/model"
uiconv "github.com/jaegertracing/jaeger/model/converter/json"
ui "github.com/jaegertracing/jaeger/model/json"
"github.com/jaegertracing/jaeger/pkg/config/tenancy"
"github.com/jaegertracing/jaeger/pkg/multierror"
"github.com/jaegertracing/jaeger/plugin/metrics/disabled"
"github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics"
Expand Down Expand Up @@ -144,7 +145,24 @@ func (aH *APIHandler) handleFunc(
http.HandlerFunc(f),
nethttp.OperationNameFunc(func(r *http.Request) string {
return route
}))
}),
// @@@ ecs TODO only create this if tenancy is enabled
nethttp.MWSpanObserver(
func(span opentracing.Span, r *http.Request) {
// @@@ ecs TODO make header configurable
tenant := r.Header.Get("x-tenant")
if tenant == "" {
// @@@ ecs TODO Jaeger generates a span for
// rejected queries, re-order so that only valid
// queries generate spans.
// (There might also be a missing tenant if Jaeger
// traced internal "side work", such as syncing with a time
// server, but we aren't considering that for this POC.)
tenant = tenancy.MissingTenant
}
span.SetTag(TenancyTag, tenant)
}),
)
return router.HandleFunc(route, traceMiddleware.ServeHTTP)
}

Expand Down
Loading