diff --git a/cmd/collector/app/builder_flags.go b/cmd/collector/app/builder_flags.go index fc22d42db2ca..660eea3be35c 100644 --- a/cmd/collector/app/builder_flags.go +++ b/cmd/collector/app/builder_flags.go @@ -40,6 +40,7 @@ const ( collectorGRPCMaxReceiveMessageLength = "collector.grpc-server.max-message-size" collectorMaxConnectionAge = "collector.grpc-server.max-connection-age" collectorMaxConnectionAgeGrace = "collector.grpc-server.max-connection-age-grace" + collectorTenantHeader = "collector.tenant-header" ) var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{ @@ -88,6 +89,8 @@ type CollectorOptions struct { // CollectorGRPCMaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. // See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace. CollectorGRPCMaxConnectionAgeGrace time.Duration + // Optional tenant header. If specified, the name of the header that carries the tenant + CollectorTenantHeader string } // AddFlags adds flags for CollectorOptions @@ -104,6 +107,7 @@ func AddFlags(flags *flag.FlagSet) { flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.") flags.Duration(collectorMaxConnectionAge, 0, "The maximum amount of time a connection may exist. Set this value to a few seconds or minutes on highly elastic environments, so that clients discover new collector nodes frequently. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") flags.Duration(collectorMaxConnectionAgeGrace, 0, "The additive period after MaxConnectionAge after which the connection will be forcibly closed. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters") + flags.String(collectorTenantHeader, "", "(experimental) tenant header") tlsGRPCFlagsConfig.AddFlags(flags) tlsHTTPFlagsConfig.AddFlags(flags) @@ -124,6 +128,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions, cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength) cOpts.CollectorGRPCMaxConnectionAge = v.GetDuration(collectorMaxConnectionAge) cOpts.CollectorGRPCMaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace) + cOpts.CollectorTenantHeader = v.GetString(collectorTenantHeader) if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil { cOpts.TLSGRPC = tlsGrpc } else { diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index 0f92a9965e7d..540ab2a1f7b0 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -93,7 +93,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { } c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...) - c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor) + c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor, builderOpts) grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{ HostPort: builderOpts.CollectorGRPCHostPort, @@ -104,6 +104,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error { MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength, MaxConnectionAge: builderOpts.CollectorGRPCMaxConnectionAge, MaxConnectionAgeGrace: builderOpts.CollectorGRPCMaxConnectionAgeGrace, + TenantHeaderName: builderOpts.CollectorTenantHeader, }) if err != nil { return fmt.Errorf("could not start gRPC collector %w", err) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 995fa141ed45..3b6acc4f5174 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc/codes" _ "google.golang.org/grpc/encoding/gzip" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" @@ -30,13 +31,15 @@ import ( type GRPCHandler struct { logger *zap.Logger spanProcessor processor.SpanProcessor + tenantHeader string } // NewGRPCHandler registers routes for this handler on the given router. -func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler { +func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenantHeader string) *GRPCHandler { return &GRPCHandler{ logger: logger, spanProcessor: spanProcessor, + tenantHeader: tenantHeader, } } @@ -47,9 +50,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) span.Process = r.Batch.Process } } - _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{ + tenant, err := g.getTenant(ctx) + if err != nil { + g.logger.Error("rejecting spans (no tenant)", zap.Error(err)) + return nil, err + } + _, err = g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{ InboundTransport: processor.GRPCTransport, SpanFormat: processor.ProtoSpanFormat, + Tenant: tenant, }) if err != nil { if err == processor.ErrBusy { @@ -60,3 +69,23 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) } return &api_v2.PostSpansResponse{}, nil } + +// WithTenant creates a Context with a tenant association +func (g *GRPCHandler) getTenant(ctx context.Context) (string, error) { + // If the collector isn't looking for a tenant, no tenant is needed + if g.tenantHeader == "" { + return "", nil + } + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", status.Errorf(codes.Internal, "missing metadata") + } + tenants := md[g.tenantHeader] + if len(tenants) < 1 { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } else if len(tenants) > 1 { + return "", status.Errorf(codes.PermissionDenied, "extra tenant header") + } + + return tenants[0], nil +} diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 3b109af70f9a..35fac3cf766a 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -26,6 +26,7 @@ import ( "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" @@ -36,6 +37,7 @@ type mockSpanProcessor struct { expectedError error mux sync.Mutex spans []*model.Span + tenants map[string]bool } func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) { @@ -43,9 +45,21 @@ func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.Spa defer p.mux.Unlock() p.spans = append(p.spans, spans...) oks := make([]bool, len(spans)) + + if p.tenants == nil { + p.tenants = make(map[string]bool) + } + p.tenants[opts.Tenant] = true + return oks, p.expectedError } +func (p *mockSpanProcessor) getTenants() map[string]bool { + p.mux.Lock() + defer p.mux.Unlock() + return p.tenants +} + func (p *mockSpanProcessor) getSpans() []*model.Span { p.mux.Lock() defer p.mux.Unlock() @@ -56,6 +70,7 @@ func (p *mockSpanProcessor) reset() { p.mux.Lock() defer p.mux.Unlock() p.spans = nil + p.tenants = nil } func (p *mockSpanProcessor) Close() error { @@ -83,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp func TestPostSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) + handler := NewGRPCHandler(zap.NewNop(), processor, "") api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -114,7 +129,7 @@ func TestPostSpans(t *testing.T) { func TestGRPCCompressionEnabled(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) + handler := NewGRPCHandler(zap.NewNop(), processor, "") api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -132,7 +147,7 @@ func TestPostSpansWithError(t *testing.T) { expectedError := errors.New("test-error") processor := &mockSpanProcessor{expectedError: expectedError} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor) + handler := NewGRPCHandler(zap.NewNop(), processor, "") api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -152,3 +167,131 @@ func TestPostSpansWithError(t *testing.T) { require.Contains(t, err.Error(), expectedError.Error()) require.Len(t, processor.getSpans(), 1) } + +// withMetadata returns a Context with metadata for outbound (client) calls +func withMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewOutgoingContext(ctx, md) +} + +func TestPostTenantedSpans(t *testing.T) { + tenantHeader := "x-tenant" + processor := &mockSpanProcessor{} + server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { + handler := NewGRPCHandler(zap.NewNop(), processor, tenantHeader) + api_v2.RegisterCollectorServiceServer(s, handler) + }) + defer server.Stop() + client, conn := newClient(t, addr) + defer conn.Close() + + dummyTenant := "grpc-test-tenant" + ctxWithTenant := withMetadata(context.Background(), tenantHeader, dummyTenant, t) + ctxNoTenant := context.Background() + mdTwoTenants := metadata.Pairs() + mdTwoTenants.Set(tenantHeader, "a", "b") + ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants) + + withMetadata(context.Background(), + tenantHeader, dummyTenant, t) + + tests := []struct { + ctx context.Context + batch model.Batch + mustFail bool + expected []*model.Span + expectedTenants map[string]bool + }{ + { + ctx: ctxWithTenant, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + mustFail: false, + expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}, + expectedTenants: map[string]bool{dummyTenant: true}, + }, + { + ctx: ctxNoTenant, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + // Because NewGRPCHandler expects a tenant header, it will reject spans without one + mustFail: true, + expected: nil, + expectedTenants: nil, + }, + { + ctx: ctxTwoTenants, + batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}}, + + // NewGRPCHandler rejects spans with multiple values for tenant header + mustFail: true, + expected: nil, + expectedTenants: nil, + }, + } + for ncase, test := range tests { + _, err := client.PostSpans(test.ctx, &api_v2.PostSpansRequest{ + Batch: test.batch, + }) + if test.mustFail { + require.Error(t, err, "case %d", ncase) + } else { + require.NoError(t, err, "case %d", ncase) + } + assert.Equal(t, test.expected, processor.getSpans()) + assert.Equal(t, test.expectedTenants, processor.getTenants()) + processor.reset() + } +} + +// withIncomingMetadata returns a Context with metadata for a server to receive +func withIncomingMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewIncomingContext(ctx, md) +} + +func TestGetTenant(t *testing.T) { + tenantHeader := "some-tenant-header" + + mdTwoTenants := metadata.Pairs() + mdTwoTenants.Set(tenantHeader, "a", "b") + ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants) + + tests := []struct { + ctx context.Context + tenant string + mustFail bool + }{ + { + ctx: withIncomingMetadata(context.TODO(), tenantHeader, "acme", t), + mustFail: false, + tenant: "acme", + }, + { + ctx: context.TODO(), + mustFail: true, + tenant: "", + }, + { + ctx: ctxTwoTenants, + mustFail: true, + tenant: "", + }, + } + + processor := &mockSpanProcessor{} + handler := NewGRPCHandler(zap.NewNop(), processor, tenantHeader) + for ncase, test := range tests { + tenant, err := handler.getTenant(test.ctx) + if test.mustFail { + require.Error(t, err, "case %d", ncase) + } else { + require.NoError(t, err, "case %d", ncase) + } + assert.Equal(t, test.tenant, tenant) + } +} diff --git a/cmd/collector/app/server/grpc.go b/cmd/collector/app/server/grpc.go index e40655a08beb..80c4b115fbc0 100644 --- a/cmd/collector/app/server/grpc.go +++ b/cmd/collector/app/server/grpc.go @@ -43,6 +43,7 @@ type GRPCServerParams struct { MaxReceiveMessageLength int MaxConnectionAge time.Duration MaxConnectionAgeGrace time.Duration + TenantHeaderName string // Set by the server to indicate the actual host:port of the server. HostPortActual string diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 71280e8cfe2d..5b1394e0bb6f 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -39,7 +39,7 @@ func TestFailToListen(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartGRPCServer(&GRPCServerParams{ HostPort: ":-1", - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""), SamplingStore: &mockSamplingStore{}, Logger: logger, }) @@ -56,7 +56,7 @@ func TestFailServe(t *testing.T) { logger := zap.New(core) serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""), SamplingStore: &mockSamplingStore{}, Logger: logger, OnError: func(e error) { @@ -71,7 +71,7 @@ func TestFailServe(t *testing.T) { func TestSpanCollector(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""), SamplingStore: &mockSamplingStore{}, Logger: logger, MaxReceiveMessageLength: 1024 * 1024, @@ -96,7 +96,7 @@ func TestSpanCollector(t *testing.T) { func TestCollectorStartWithTLS(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""), SamplingStore: &mockSamplingStore{}, Logger: logger, TLSConfig: tlscfg.Options{ @@ -115,7 +115,7 @@ func TestCollectorStartWithTLS(t *testing.T) { func TestCollectorReflection(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""), SamplingStore: &mockSamplingStore{}, Logger: logger, } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 69e5aac865e8..1b40a01d75c5 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -65,7 +65,7 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce } // BuildHandlers builds span handlers (Zipkin, Jaeger) -func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers { +func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor, builderOpts *CollectorOptions) *SpanHandlers { return &SpanHandlers{ handler.NewZipkinSpanHandler( b.Logger, @@ -73,7 +73,7 @@ func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor zs.NewChainedSanitizer(zs.NewStandardSanitizers()...), ), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), - handler.NewGRPCHandler(b.Logger, spanProcessor), + handler.NewGRPCHandler(b.Logger, spanProcessor, builderOpts.CollectorTenantHeader), } } diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index cac63f818ec7..8c75e85e1908 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -52,7 +52,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { } spanProcessor := builder.BuildSpanProcessor() - spanHandlers := builder.BuildHandlers(spanProcessor) + spanHandlers := builder.BuildHandlers(spanProcessor, &CollectorOptions{}) assert.NotNil(t, spanHandlers.ZipkinSpansHandler) assert.NotNil(t, spanHandlers.JaegerBatchesHandler) assert.NotNil(t, spanHandlers.GRPCHandler)