Skip to content

Commit

Permalink
Allow Jaeger's GRPC handler to flow the tenant from an HTTP header
Browse files Browse the repository at this point in the history
Signed-off-by: Ed Snible <snible@us.ibm.com>
  • Loading branch information
esnible committed May 17, 2022
1 parent 4f9f7df commit cec2799
Show file tree
Hide file tree
Showing 8 changed files with 193 additions and 14 deletions.
5 changes: 5 additions & 0 deletions cmd/collector/app/builder_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
collectorGRPCMaxReceiveMessageLength = "collector.grpc-server.max-message-size"
collectorMaxConnectionAge = "collector.grpc-server.max-connection-age"
collectorMaxConnectionAgeGrace = "collector.grpc-server.max-connection-age-grace"
collectorTenantHeader = "collector.tenant-header"
)

var tlsGRPCFlagsConfig = tlscfg.ServerFlagsConfig{
Expand Down Expand Up @@ -88,6 +89,8 @@ type CollectorOptions struct {
// CollectorGRPCMaxConnectionAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
CollectorGRPCMaxConnectionAgeGrace time.Duration
// Optional tenant header. If specified, the name of the header that carries the tenant
CollectorTenantHeader string
}

// AddFlags adds flags for CollectorOptions
Expand All @@ -104,6 +107,7 @@ func AddFlags(flags *flag.FlagSet) {
flags.Uint(collectorDynQueueSizeMemory, 0, "(experimental) The max memory size in MiB to use for the dynamic queue.")
flags.Duration(collectorMaxConnectionAge, 0, "The maximum amount of time a connection may exist. Set this value to a few seconds or minutes on highly elastic environments, so that clients discover new collector nodes frequently. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters")
flags.Duration(collectorMaxConnectionAgeGrace, 0, "The additive period after MaxConnectionAge after which the connection will be forcibly closed. See https://pkg.go.dev/google.golang.org/grpc/keepalive#ServerParameters")
flags.String(collectorTenantHeader, "", "(experimental) tenant header")

tlsGRPCFlagsConfig.AddFlags(flags)
tlsHTTPFlagsConfig.AddFlags(flags)
Expand All @@ -124,6 +128,7 @@ func (cOpts *CollectorOptions) InitFromViper(v *viper.Viper) (*CollectorOptions,
cOpts.CollectorGRPCMaxReceiveMessageLength = v.GetInt(collectorGRPCMaxReceiveMessageLength)
cOpts.CollectorGRPCMaxConnectionAge = v.GetDuration(collectorMaxConnectionAge)
cOpts.CollectorGRPCMaxConnectionAgeGrace = v.GetDuration(collectorMaxConnectionAgeGrace)
cOpts.CollectorTenantHeader = v.GetString(collectorTenantHeader)
if tlsGrpc, err := tlsGRPCFlagsConfig.InitFromViper(v); err == nil {
cOpts.TLSGRPC = tlsGrpc
} else {
Expand Down
3 changes: 2 additions & 1 deletion cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
}

c.spanProcessor = handlerBuilder.BuildSpanProcessor(additionalProcessors...)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor)
c.spanHandlers = handlerBuilder.BuildHandlers(c.spanProcessor, builderOpts)

grpcServer, err := server.StartGRPCServer(&server.GRPCServerParams{
HostPort: builderOpts.CollectorGRPCHostPort,
Expand All @@ -104,6 +104,7 @@ func (c *Collector) Start(builderOpts *CollectorOptions) error {
MaxReceiveMessageLength: builderOpts.CollectorGRPCMaxReceiveMessageLength,
MaxConnectionAge: builderOpts.CollectorGRPCMaxConnectionAge,
MaxConnectionAgeGrace: builderOpts.CollectorGRPCMaxConnectionAgeGrace,
TenantHeaderName: builderOpts.CollectorTenantHeader,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
Expand Down
33 changes: 31 additions & 2 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
Expand All @@ -30,13 +31,15 @@ import (
type GRPCHandler struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
tenantHeader string
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenantHeader string) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
tenantHeader: tenantHeader,
}
}

Expand All @@ -47,9 +50,15 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
span.Process = r.Batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
tenant, err := g.getTenant(ctx)
if err != nil {
g.logger.Error("rejecting spans (no tenant)", zap.Error(err))
return nil, err
}
_, err = g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
Tenant: tenant,
})
if err != nil {
if err == processor.ErrBusy {
Expand All @@ -60,3 +69,23 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}
return &api_v2.PostSpansResponse{}, nil
}

// WithTenant creates a Context with a tenant association
func (g *GRPCHandler) getTenant(ctx context.Context) (string, error) {
// If the collector isn't looking for a tenant, no tenant is needed
if g.tenantHeader == "" {
return "", nil
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", status.Errorf(codes.Internal, "missing metadata")
}
tenants := md[g.tenantHeader]
if len(tenants) < 1 {
return "", status.Errorf(codes.PermissionDenied, "missing tenant header")
} else if len(tenants) > 1 {
return "", status.Errorf(codes.PermissionDenied, "extra tenant header")
}

return tenants[0], nil
}
149 changes: 146 additions & 3 deletions cmd/collector/app/handler/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -36,16 +37,29 @@ type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
tenants map[string]bool
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts processor.SpansOptions) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
oks := make([]bool, len(spans))

if p.tenants == nil {
p.tenants = make(map[string]bool)
}
p.tenants[opts.Tenant] = true

return oks, p.expectedError
}

func (p *mockSpanProcessor) getTenants() map[string]bool {
p.mux.Lock()
defer p.mux.Unlock()
return p.tenants
}

func (p *mockSpanProcessor) getSpans() []*model.Span {
p.mux.Lock()
defer p.mux.Unlock()
Expand All @@ -56,6 +70,7 @@ func (p *mockSpanProcessor) reset() {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = nil
p.tenants = nil
}

func (p *mockSpanProcessor) Close() error {
Expand Down Expand Up @@ -83,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
func TestPostSpans(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, "")
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand Down Expand Up @@ -114,7 +129,7 @@ func TestPostSpans(t *testing.T) {
func TestGRPCCompressionEnabled(t *testing.T) {
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, "")
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand All @@ -132,7 +147,7 @@ func TestPostSpansWithError(t *testing.T) {
expectedError := errors.New("test-error")
processor := &mockSpanProcessor{expectedError: expectedError}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
handler := NewGRPCHandler(zap.NewNop(), processor, "")
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
Expand All @@ -152,3 +167,131 @@ func TestPostSpansWithError(t *testing.T) {
require.Contains(t, err.Error(), expectedError.Error())
require.Len(t, processor.getSpans(), 1)
}

// withMetadata returns a Context with metadata for outbound (client) calls
func withMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context {
t.Helper()

md := metadata.New(map[string]string{headerName: headerValue})
return metadata.NewOutgoingContext(ctx, md)
}

func TestPostTenantedSpans(t *testing.T) {
tenantHeader := "x-tenant"
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor, tenantHeader)
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()

dummyTenant := "grpc-test-tenant"
ctxWithTenant := withMetadata(context.Background(), tenantHeader, dummyTenant, t)
ctxNoTenant := context.Background()
mdTwoTenants := metadata.Pairs()
mdTwoTenants.Set(tenantHeader, "a", "b")
ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants)

withMetadata(context.Background(),
tenantHeader, dummyTenant, t)

tests := []struct {
ctx context.Context
batch model.Batch
mustFail bool
expected []*model.Span
expectedTenants map[string]bool
}{
{
ctx: ctxWithTenant,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

mustFail: false,
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}},
expectedTenants: map[string]bool{dummyTenant: true},
},
{
ctx: ctxNoTenant,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

// Because NewGRPCHandler expects a tenant header, it will reject spans without one
mustFail: true,
expected: nil,
expectedTenants: nil,
},
{
ctx: ctxTwoTenants,
batch: model.Batch{Process: &model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},

// NewGRPCHandler rejects spans with multiple values for tenant header
mustFail: true,
expected: nil,
expectedTenants: nil,
},
}
for ncase, test := range tests {
_, err := client.PostSpans(test.ctx, &api_v2.PostSpansRequest{
Batch: test.batch,
})
if test.mustFail {
require.Error(t, err, "case %d", ncase)
} else {
require.NoError(t, err, "case %d", ncase)
}
assert.Equal(t, test.expected, processor.getSpans())
assert.Equal(t, test.expectedTenants, processor.getTenants())
processor.reset()
}
}

// withIncomingMetadata returns a Context with metadata for a server to receive
func withIncomingMetadata(ctx context.Context, headerName, headerValue string, t *testing.T) context.Context {
t.Helper()

md := metadata.New(map[string]string{headerName: headerValue})
return metadata.NewIncomingContext(ctx, md)
}

func TestGetTenant(t *testing.T) {
tenantHeader := "some-tenant-header"

mdTwoTenants := metadata.Pairs()
mdTwoTenants.Set(tenantHeader, "a", "b")
ctxTwoTenants := metadata.NewOutgoingContext(context.Background(), mdTwoTenants)

tests := []struct {
ctx context.Context
tenant string
mustFail bool
}{
{
ctx: withIncomingMetadata(context.TODO(), tenantHeader, "acme", t),
mustFail: false,
tenant: "acme",
},
{
ctx: context.TODO(),
mustFail: true,
tenant: "",
},
{
ctx: ctxTwoTenants,
mustFail: true,
tenant: "",
},
}

processor := &mockSpanProcessor{}
handler := NewGRPCHandler(zap.NewNop(), processor, tenantHeader)
for ncase, test := range tests {
tenant, err := handler.getTenant(test.ctx)
if test.mustFail {
require.Error(t, err, "case %d", ncase)
} else {
require.NoError(t, err, "case %d", ncase)
}
assert.Equal(t, test.tenant, tenant)
}
}
1 change: 1 addition & 0 deletions cmd/collector/app/server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type GRPCServerParams struct {
MaxReceiveMessageLength int
MaxConnectionAge time.Duration
MaxConnectionAgeGrace time.Duration
TenantHeaderName string

// Set by the server to indicate the actual host:port of the server.
HostPortActual string
Expand Down
10 changes: 5 additions & 5 deletions cmd/collector/app/server/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestFailToListen(t *testing.T) {
logger, _ := zap.NewDevelopment()
server, err := StartGRPCServer(&GRPCServerParams{
HostPort: ":-1",
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""),
SamplingStore: &mockSamplingStore{},
Logger: logger,
})
Expand All @@ -56,7 +56,7 @@ func TestFailServe(t *testing.T) {

logger := zap.New(core)
serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""),
SamplingStore: &mockSamplingStore{},
Logger: logger,
OnError: func(e error) {
Expand All @@ -71,7 +71,7 @@ func TestFailServe(t *testing.T) {
func TestSpanCollector(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""),
SamplingStore: &mockSamplingStore{},
Logger: logger,
MaxReceiveMessageLength: 1024 * 1024,
Expand All @@ -96,7 +96,7 @@ func TestSpanCollector(t *testing.T) {
func TestCollectorStartWithTLS(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""),
SamplingStore: &mockSamplingStore{},
Logger: logger,
TLSConfig: tlscfg.Options{
Expand All @@ -115,7 +115,7 @@ func TestCollectorStartWithTLS(t *testing.T) {
func TestCollectorReflection(t *testing.T) {
logger, _ := zap.NewDevelopment()
params := &GRPCServerParams{
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}),
Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, ""),
SamplingStore: &mockSamplingStore{},
Logger: logger,
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/collector/app/span_handler_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,15 +65,15 @@ func (b *SpanHandlerBuilder) BuildSpanProcessor(additional ...ProcessSpan) proce
}

// BuildHandlers builds span handlers (Zipkin, Jaeger)
func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor) *SpanHandlers {
func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor, builderOpts *CollectorOptions) *SpanHandlers {
return &SpanHandlers{
handler.NewZipkinSpanHandler(
b.Logger,
spanProcessor,
zs.NewChainedSanitizer(zs.NewStandardSanitizers()...),
),
handler.NewJaegerSpanHandler(b.Logger, spanProcessor),
handler.NewGRPCHandler(b.Logger, spanProcessor),
handler.NewGRPCHandler(b.Logger, spanProcessor, builderOpts.CollectorTenantHeader),
}
}

Expand Down
Loading

0 comments on commit cec2799

Please # to comment.