From 8fcb586f69695b8fb6031893b35b28b5607847e3 Mon Sep 17 00:00:00 2001 From: Albert <26584478+albertteoh@users.noreply.github.com> Date: Thu, 14 Jul 2022 09:01:01 +1000 Subject: [PATCH 01/11] Add capability to build docker all-in-one (#3813) Add `all-in-one` component to the list of components to build docker images for. Signed-off-by: Albert Teoh Signed-off-by: Loc Mai --- Makefile | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/Makefile b/Makefile index 8683c079e02..1d0cc87a9ec 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ -JAEGER_IMPORT_PATH=github.com/jaegertracing/jaeger +SHELL := /bin/bash +JAEGER_IMPORT_PATH = github.com/jaegertracing/jaeger STORAGE_PKGS = ./plugin/storage/integration/... include docker/Makefile @@ -306,15 +307,21 @@ docker-images-jaeger-backend-debug: SUFFIX = -debug .PHONY: docker-images-jaeger-backend docker-images-jaeger-backend-debug docker-images-jaeger-backend docker-images-jaeger-backend-debug: create-baseimg create-debugimg - for component in agent collector query ingester ; do \ - docker build --target $(TARGET) \ - --tag $(DOCKER_NAMESPACE)/jaeger-$$component$(SUFFIX):${DOCKER_TAG} \ + for component in "jaeger-agent" "jaeger-collector" "jaeger-query" "jaeger-ingester" "all-in-one" ; do \ + regex="jaeger-(.*)"; \ + component_suffix=$$component; \ + if [[ $$component =~ $$regex ]]; then \ + component_suffix="$${BASH_REMATCH[1]}"; \ + fi; \ + docker buildx build --target $(TARGET) \ + --tag $(DOCKER_NAMESPACE)/$$component$(SUFFIX):${DOCKER_TAG} \ --build-arg base_image=$(BASE_IMAGE) \ --build-arg debug_image=$(DEBUG_IMAGE) \ --build-arg TARGETARCH=$(GOARCH) \ - cmd/$$component ; \ + --load \ + cmd/$$component_suffix; \ echo "Finished building $$component ==============" ; \ - done + done; .PHONY: docker-images-tracegen docker-images-tracegen: From 702ea275dff0abb4b89dce7b16f753aa1f841890 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Fri, 15 Jul 2022 16:14:51 -0400 Subject: [PATCH 02/11] Bump github.com/prometheus/common from 0.36.0 to 0.37.0 (#3814) Bumps [github.com/prometheus/common](https://github.com/prometheus/common) from 0.36.0 to 0.37.0. - [Release notes](https://github.com/prometheus/common/releases) - [Commits](https://github.com/prometheus/common/compare/v0.36.0...v0.37.0) --- updated-dependencies: - dependency-name: github.com/prometheus/common dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Signed-off-by: Loc Mai --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c44afd7db52..6dda7f24272 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/prometheus/client_golang v1.12.2 github.com/prometheus/client_model v0.2.0 - github.com/prometheus/common v0.36.0 + github.com/prometheus/common v0.37.0 github.com/rs/cors v1.8.2 github.com/soheilhy/cmux v0.1.5 github.com/spf13/cobra v1.5.0 diff --git a/go.sum b/go.sum index 92239f5805f..1f69839f178 100644 --- a/go.sum +++ b/go.sum @@ -555,8 +555,8 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.32.1/go.mod h1:vu+V0TpY+O6vW9J44gczi3Ap/oXXR10b+M/gUGO4Hls= -github.com/prometheus/common v0.36.0 h1:78hJTing+BLYLjhXE+Z2BubeEymH5Lr0/Mt8FKkxxYo= -github.com/prometheus/common v0.36.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= +github.com/prometheus/common v0.37.0 h1:ccBbHCgIiT9uSoFY0vX8H3zsNR5eLt17/RQLUvn8pXE= +github.com/prometheus/common v0.37.0/go.mod h1:phzohg0JFMnBEFGxTDbfu3QyL5GI8gTQJFhYO5B3mfA= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= From d761686e185a389205b3fe46703967793c8ba997 Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Wed, 20 Jul 2022 13:27:11 +0700 Subject: [PATCH 03/11] fix: check nil process to avoid nil pointer Signed-off-by: Loc Mai --- plugin/storage/es/spanstore/dbmodel/from_domain.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index a3939d83cb5..8045f0656c0 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -120,6 +120,14 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { } func (fd FromDomain) convertProcess(process *model.Process) Process { + if process == nil { + return Process{} + } + if process.Tags == nil { + return Process{ + ServiceName: process.ServiceName, + } + } tags, tagsMap := fd.convertKeyValuesString(process.Tags) return Process{ ServiceName: process.ServiceName, From 817cdf1e0b9238ff6719e55b092cc7a55c520c4e Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Wed, 20 Jul 2022 14:13:45 +0700 Subject: [PATCH 04/11] fix: use GetTags instead Signed-off-by: Loc Mai --- plugin/storage/es/spanstore/dbmodel/from_domain.go | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index 8045f0656c0..d9fbfb625aa 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -120,15 +120,7 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { } func (fd FromDomain) convertProcess(process *model.Process) Process { - if process == nil { - return Process{} - } - if process.Tags == nil { - return Process{ - ServiceName: process.ServiceName, - } - } - tags, tagsMap := fd.convertKeyValuesString(process.Tags) + tags, tagsMap := fd.convertKeyValuesString(process.GetTags()) return Process{ ServiceName: process.ServiceName, Tags: tags, From c8a8458f71c8320f4ea22316ebf33857fd681e09 Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Wed, 20 Jul 2022 15:05:27 +0700 Subject: [PATCH 05/11] chore: add test cases Signed-off-by: Loc Mai --- .../es/spanstore/dbmodel/from_domain.go | 2 +- .../es/spanstore/dbmodel/from_domain_test.go | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index d9fbfb625aa..afa8c6222bf 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -122,7 +122,7 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { func (fd FromDomain) convertProcess(process *model.Process) Process { tags, tagsMap := fd.convertKeyValuesString(process.GetTags()) return Process{ - ServiceName: process.ServiceName, + ServiceName: process.GetServiceName(), Tags: tags, Tag: tagsMap, } diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go index 8fc0e910cbf..5564dbfcefe 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go @@ -108,6 +108,32 @@ func TestTagMap(t *testing.T) { assert.Equal(t, tagsMap, dbSpan.Process.Tag) } +func TestConvertProcess(t *testing.T) { + tags := []model.KeyValue{ + model.String("foo", "foo"), + model.Bool("a", true), + model.Int64("b.b", 1), + } + + spanWithNilTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} + spanWithNilProcess := model.Span{Tags: tags, Process: nil} + + converter := NewFromDomain(false, []string{}, ":") + dbSpanWithNilTags := converter.FromDomainEmbedProcess(&spanWithNilTags) + dbSpanWithNilProcess := converter.FromDomainEmbedProcess(&spanWithNilProcess) + + assert.Equal(t, 3, len(dbSpanWithNilTags.Tags)) + assert.Equal(t, 3, len(dbSpanWithNilProcess.Tags)) + assert.Equal(t, 0, len(dbSpanWithNilTags.Process.Tags)) + assert.Equal(t, 0, len(dbSpanWithNilProcess.Process.Tags)) + + tagsMap := map[string]interface{}(nil) + assert.Equal(t, tagsMap, dbSpanWithNilTags.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilProcess.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilTags.Process.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilProcess.Process.Tag) +} + func TestConvertKeyValueValue(t *testing.T) { longString := `Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues From 385f7927f8ff83f00ec47c5f49984a90eb03f03d Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Mon, 25 Jul 2022 15:28:21 +0700 Subject: [PATCH 06/11] chore: add sanitizer on span processor Signed-off-by: Loc Mai --- cmd/ingester/app/builder/builder.go | 2 ++ cmd/ingester/app/processor/span_processor.go | 7 ++++++- .../app/processor/span_processor_test.go | 3 +++ .../es/spanstore/dbmodel/from_domain.go | 2 +- .../es/spanstore/dbmodel/from_domain_test.go | 20 +++++++------------ 5 files changed, 19 insertions(+), 15 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index d984b5dcd03..9f6d9b7c73f 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -20,6 +20,7 @@ import ( "go.uber.org/zap" + "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" @@ -47,6 +48,7 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit spParams := processor.SpanProcessorParams{ Writer: spanWriter, Unmarshaller: unmarshaller, + Sanitizers: sanitizer.NewStandardSanitizers(), } spanProcessor := processor.NewSpanProcessor(spParams) diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index ad1c8aa7479..8a059ac0aec 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -19,6 +19,7 @@ import ( "fmt" "io" + "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/plugin/storage/kafka" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -40,11 +41,13 @@ type Message interface { type SpanProcessorParams struct { Writer spanstore.Writer Unmarshaller kafka.Unmarshaller + Sanitizers []sanitizer.SanitizeSpan } // KafkaSpanProcessor implements SpanProcessor for Kafka messages type KafkaSpanProcessor struct { unmarshaller kafka.Unmarshaller + sanitizer sanitizer.SanitizeSpan writer spanstore.Writer io.Closer } @@ -54,6 +57,7 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor { return &KafkaSpanProcessor{ unmarshaller: params.Unmarshaller, writer: params.Writer, + sanitizer: sanitizer.NewChainedSanitizer(params.Sanitizers...), } } @@ -63,6 +67,7 @@ func (s KafkaSpanProcessor) Process(message Message) error { if err != nil { return fmt.Errorf("cannot unmarshall byte array into span: %w", err) } + // TODO context should be propagated from upstream components - return s.writer.WriteSpan(context.TODO(), span) + return s.writer.WriteSpan(context.TODO(), s.sanitizer(span)) } diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go index 0fae62e278f..d2f9ad66ecf 100644 --- a/cmd/ingester/app/processor/span_processor_test.go +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -21,6 +21,7 @@ import ( "github.com/stretchr/testify/assert" + "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" "github.com/jaegertracing/jaeger/model" umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks" @@ -35,9 +36,11 @@ func TestNewSpanProcessor(t *testing.T) { func TestSpanProcessor_Process(t *testing.T) { writer := &smocks.Writer{} unmarshallerMock := &umocks.Unmarshaller{} + sanitizer := sanitizer.NewChainedSanitizer() processor := &KafkaSpanProcessor{ unmarshaller: unmarshallerMock, writer: writer, + sanitizer: sanitizer, } message := &cmocks.Message{} diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index afa8c6222bf..d9fbfb625aa 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -122,7 +122,7 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { func (fd FromDomain) convertProcess(process *model.Process) Process { tags, tagsMap := fd.convertKeyValuesString(process.GetTags()) return Process{ - ServiceName: process.GetServiceName(), + ServiceName: process.ServiceName, Tags: tags, Tag: tagsMap, } diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go index 5564dbfcefe..e56b19fcdf0 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go @@ -116,22 +116,16 @@ func TestConvertProcess(t *testing.T) { } spanWithNilTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} - spanWithNilProcess := model.Span{Tags: tags, Process: nil} converter := NewFromDomain(false, []string{}, ":") dbSpanWithNilTags := converter.FromDomainEmbedProcess(&spanWithNilTags) - dbSpanWithNilProcess := converter.FromDomainEmbedProcess(&spanWithNilProcess) - - assert.Equal(t, 3, len(dbSpanWithNilTags.Tags)) - assert.Equal(t, 3, len(dbSpanWithNilProcess.Tags)) - assert.Equal(t, 0, len(dbSpanWithNilTags.Process.Tags)) - assert.Equal(t, 0, len(dbSpanWithNilProcess.Process.Tags)) - - tagsMap := map[string]interface{}(nil) - assert.Equal(t, tagsMap, dbSpanWithNilTags.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilProcess.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilTags.Process.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilProcess.Process.Tag) + + assert.Len(t, dbSpanWithNilTags.Tags, 3) + assert.Len(t, dbSpanWithNilTags.Process.Tags, 0) + + nilMap := map[string]interface{}(nil) + assert.Equal(t, nilMap, dbSpanWithNilTags.Tag) + assert.Equal(t, nilMap, dbSpanWithNilTags.Process.Tag) } func TestConvertKeyValueValue(t *testing.T) { From bbcb3dff339ee7298227d5fcd641a4b798077b37 Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Wed, 20 Jul 2022 13:27:11 +0700 Subject: [PATCH 07/11] fix: check nil process to avoid nil pointer Signed-off-by: Loc Mai --- .../es/spanstore/dbmodel/from_domain.go | 2 +- .../es/spanstore/dbmodel/from_domain_test.go | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index d9fbfb625aa..afa8c6222bf 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -122,7 +122,7 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { func (fd FromDomain) convertProcess(process *model.Process) Process { tags, tagsMap := fd.convertKeyValuesString(process.GetTags()) return Process{ - ServiceName: process.ServiceName, + ServiceName: process.GetServiceName(), Tags: tags, Tag: tagsMap, } diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go index e56b19fcdf0..db0b0e12dab 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go @@ -108,13 +108,18 @@ func TestTagMap(t *testing.T) { assert.Equal(t, tagsMap, dbSpan.Process.Tag) } +<<<<<<< HEAD func TestConvertProcess(t *testing.T) { +======= +func TestNilProcess(t *testing.T) { +>>>>>>> d21e22d0 (fix: check nil process to avoid nil pointer) tags := []model.KeyValue{ model.String("foo", "foo"), model.Bool("a", true), model.Int64("b.b", 1), } +<<<<<<< HEAD spanWithNilTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} converter := NewFromDomain(false, []string{}, ":") @@ -126,6 +131,25 @@ func TestConvertProcess(t *testing.T) { nilMap := map[string]interface{}(nil) assert.Equal(t, nilMap, dbSpanWithNilTags.Tag) assert.Equal(t, nilMap, dbSpanWithNilTags.Process.Tag) +======= + spanWithNilProcessTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} + spanWithNilProcess := model.Span{Tags: tags, Process: nil} + + converter := NewFromDomain(false, nil, ":") + dbSpanWithNilTags := converter.FromDomainEmbedProcess(&spanWithNilProcessTags) + dbSpanWithNilProcess := converter.FromDomainEmbedProcess(&spanWithNilProcess) + + assert.Equal(t, 3, len(dbSpanWithNilTags.Tags)) + assert.Equal(t, 3, len(dbSpanWithNilProcess.Tags)) + assert.Equal(t, 0, len(dbSpanWithNilTags.Process.Tags)) + assert.Equal(t, 0, len(dbSpanWithNilProcess.Process.Tags)) + + tagsMap := map[string]interface{}(nil) + assert.Equal(t, tagsMap, dbSpanWithNilTags.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilProcess.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilTags.Process.Tag) + assert.Equal(t, tagsMap, dbSpanWithNilProcess.Process.Tag) +>>>>>>> d21e22d0 (fix: check nil process to avoid nil pointer) } func TestConvertKeyValueValue(t *testing.T) { From 826c73a8f3290ade175e2bb7b55ef158a648bb3c Mon Sep 17 00:00:00 2001 From: Ed Snible Date: Wed, 20 Jul 2022 20:41:10 -0400 Subject: [PATCH 08/11] Tenancy for queries (#3791) * Tenancy for queries Signed-off-by: Ed Snible * New parameter for RegisterGRPCGateway() test function Signed-off-by: Ed Snible * More tests that are local to package itself Signed-off-by: Ed Snible * Additional test cases to raise test coverage Signed-off-by: Ed Snible * Fix test Signed-off-by: Ed Snible * spelling Signed-off-by: Ed Snible * Rename file Signed-off-by: Ed Snible * Refactor tenancy packages Signed-off-by: Ed Snible * restore empty_test.go as part of refactoring tenancy out of storage Signed-off-by: Ed Snible * lint/gofumpt Signed-off-by: Ed Snible * Enforce tenancy on non-streaming gRPC and add additional tests Signed-off-by: Ed Snible * Test for tenant flow to storage for both streaming and unary RPC Signed-off-by: Ed Snible * HTTP tenancy test Signed-off-by: Ed Snible * Unit test for unary tenancy handler Signed-off-by: Ed Snible * Factor out rendundent test function Signed-off-by: Ed Snible * Address review comments Signed-off-by: Ed Snible * Error name Signed-off-by: Ed Snible * Refactor TenancyConfig to TenancyManager Signed-off-by: Ed Snible * Address review comments Signed-off-by: Ed Snible * Refactor so that NewTenancyManager() only called from main and tests Signed-off-by: Ed Snible Signed-off-by: Loc Mai --- cmd/all-in-one/main.go | 9 +- cmd/collector/app/collector.go | 7 +- cmd/collector/app/collector_test.go | 9 + cmd/collector/app/flags/flags.go | 2 +- cmd/collector/app/handler/grpc_handler.go | 21 +- .../app/handler/grpc_handler_test.go | 12 +- cmd/collector/app/handler/otlp_receiver.go | 11 +- .../app/handler/otlp_receiver_test.go | 13 +- cmd/collector/app/server/grpc_test.go | 12 +- cmd/collector/app/span_handler_builder.go | 5 +- .../app/span_handler_builder_test.go | 3 + cmd/collector/app/span_processor.go | 4 +- cmd/collector/app/span_processor_test.go | 4 +- cmd/collector/main.go | 12 +- cmd/query/app/apiv3/grpc_gateway.go | 13 +- cmd/query/app/apiv3/grpc_gateway_test.go | 114 ++++++- cmd/query/app/flags.go | 8 + cmd/query/app/grpc_handler_test.go | 322 +++++++++++++++--- cmd/query/app/http_handler.go | 12 +- cmd/query/app/http_handler_test.go | 125 ++++++- cmd/query/app/server.go | 20 +- cmd/query/app/server_test.go | 95 +++++- cmd/query/main.go | 4 +- .../tenancy/tenancy.go => tenancy/config.go} | 19 +- .../config_test.go} | 2 +- storage/tenant.go => pkg/tenancy/context.go | 2 +- .../tenancy/context_test.go | 2 +- pkg/{config => }/tenancy/flags.go | 0 pkg/{config => }/tenancy/flags_test.go | 0 pkg/tenancy/grpc.go | 115 +++++++ pkg/tenancy/grpc_test.go | 113 ++++++ pkg/tenancy/http.go | 65 ++++ pkg/tenancy/http_test.go | 119 +++++++ storage/empty_test.go | 15 + 34 files changed, 1147 insertions(+), 142 deletions(-) rename pkg/{config/tenancy/tenancy.go => tenancy/config.go} (77%) rename pkg/{config/tenancy/tenancy_test.go => tenancy/config_test.go} (97%) rename storage/tenant.go => pkg/tenancy/context.go (98%) rename storage/tenant_test.go => pkg/tenancy/context_test.go (99%) rename pkg/{config => }/tenancy/flags.go (100%) rename pkg/{config => }/tenancy/flags_test.go (100%) create mode 100644 pkg/tenancy/grpc.go create mode 100644 pkg/tenancy/grpc_test.go create mode 100644 pkg/tenancy/http.go create mode 100644 pkg/tenancy/http_test.go create mode 100644 storage/empty_test.go diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index d6dcf9867bd..c3ceb111d77 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -46,6 +46,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metrics/jlibadapter" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" @@ -155,6 +156,8 @@ by default uses only in-memory database.`, logger.Fatal("Failed to configure query service", zap.Error(err)) } + tm := tenancy.NewTenancyManager(&cOpts.GRPC.Tenancy) + // collector c := collectorApp.New(&collectorApp.CollectorParams{ ServiceName: "jaeger-collector", @@ -164,6 +167,7 @@ by default uses only in-memory database.`, StrategyStore: strategyStore, Aggregator: aggregator, HealthCheck: svc.HC(), + TenancyMgr: tm, }) if err := c.Start(cOpts); err != nil { log.Fatal(err) @@ -192,7 +196,7 @@ by default uses only in-memory database.`, querySrv := startQuery( svc, qOpts, qOpts.BuildQueryServiceOptions(storageFactory, logger), spanReader, dependencyReader, metricsQueryService, - metricsFactory, + metricsFactory, tm, ) svc.RunAndThen(func() { @@ -265,10 +269,11 @@ func startQuery( depReader dependencystore.Reader, metricsQueryService querysvc.MetricsQueryService, baseFactory metrics.Factory, + tm *tenancy.TenancyManager, ) *queryApp.Server { spanReader = storageMetrics.NewReadMetricsDecorator(spanReader, baseFactory.Namespace(metrics.NSOptions{Name: "query"})) qs := querysvc.NewQueryService(spanReader, depReader, *queryOpts) - server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, opentracing.GlobalTracer()) + server, err := queryApp.NewServer(svc.Logger, qs, metricsQueryService, qOpts, tm, opentracing.GlobalTracer()) if err != nil { svc.Logger.Fatal("Could not start jaeger-query service", zap.Error(err)) } diff --git a/cmd/collector/app/collector.go b/cmd/collector/app/collector.go index a81213d49d5..2beff366f01 100644 --- a/cmd/collector/app/collector.go +++ b/cmd/collector/app/collector.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/server" "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -52,6 +53,7 @@ type Collector struct { hCheck *healthcheck.HealthCheck spanProcessor processor.SpanProcessor spanHandlers *SpanHandlers + tenancyMgr *tenancy.TenancyManager // state, read only hServer *http.Server @@ -72,6 +74,7 @@ type CollectorParams struct { StrategyStore strategystore.StrategyStore Aggregator strategystore.Aggregator HealthCheck *healthcheck.HealthCheck + TenancyMgr *tenancy.TenancyManager } // New constructs a new collector component, ready to be started @@ -84,6 +87,7 @@ func New(params *CollectorParams) *Collector { strategyStore: params.StrategyStore, aggregator: params.Aggregator, hCheck: params.HealthCheck, + tenancyMgr: params.TenancyMgr, } } @@ -94,6 +98,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { CollectorOpts: options, Logger: c.logger, MetricsFactory: c.metricsFactory, + TenancyMgr: c.tenancyMgr, } var additionalProcessors []ProcessSpan @@ -152,7 +157,7 @@ func (c *Collector) Start(options *flags.CollectorOptions) error { c.zkServer = zkServer if options.OTLP.Enabled { - otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor) + otlpReceiver, err := handler.StartOTLPReceiver(options, c.logger, c.spanProcessor, c.tenancyMgr) if err != nil { return fmt.Errorf("could not start OTLP receiver: %w", err) } diff --git a/cmd/collector/app/collector_test.go b/cmd/collector/app/collector_test.go index 06dc2fe70cd..cf951f22e23 100644 --- a/cmd/collector/app/collector_test.go +++ b/cmd/collector/app/collector_test.go @@ -30,6 +30,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/thrift-gen/sampling" ) @@ -53,6 +54,7 @@ func TestNewCollector(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -61,6 +63,7 @@ func TestNewCollector(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() @@ -77,6 +80,7 @@ func TestCollector_StartErrors(t *testing.T) { baseMetrics := metricstest.NewFactory(time.Hour) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -85,6 +89,7 @@ func TestCollector_StartErrors(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) err := c.Start(options) require.Error(t, err) @@ -130,6 +135,7 @@ func TestCollector_PublishOpts(t *testing.T) { metricsFactory := fork.New("internal", forkFactory, baseMetrics) spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -138,6 +144,7 @@ func TestCollector_PublishOpts(t *testing.T) { SpanWriter: spanWriter, StrategyStore: strategyStore, HealthCheck: hc, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() collectorOpts.NumWorkers = 24 @@ -164,6 +171,7 @@ func TestAggregator(t *testing.T) { spanWriter := &fakeSpanWriter{} strategyStore := &mockStrategyStore{} agg := &mockAggregator{} + tm := &tenancy.TenancyManager{} c := New(&CollectorParams{ ServiceName: "collector", @@ -173,6 +181,7 @@ func TestAggregator(t *testing.T) { StrategyStore: strategyStore, HealthCheck: hc, Aggregator: agg, + TenancyMgr: tm, }) collectorOpts := optionsForEphemeralPorts() collectorOpts.NumWorkers = 10 diff --git a/cmd/collector/app/flags/flags.go b/cmd/collector/app/flags/flags.go index 05f5a190151..bc614348fbe 100644 --- a/cmd/collector/app/flags/flags.go +++ b/cmd/collector/app/flags/flags.go @@ -24,8 +24,8 @@ import ( "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/flags" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" ) diff --git a/cmd/collector/app/handler/grpc_handler.go b/cmd/collector/app/handler/grpc_handler.go index 8028ea7bdd2..1c6d9e79f08 100644 --- a/cmd/collector/app/handler/grpc_handler.go +++ b/cmd/collector/app/handler/grpc_handler.go @@ -25,7 +25,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -36,14 +36,14 @@ type GRPCHandler struct { } // NewGRPCHandler registers routes for this handler on the given router. -func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyConfig *tenancy.TenancyConfig) *GRPCHandler { +func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor, tenancyMgr *tenancy.TenancyManager) *GRPCHandler { return &GRPCHandler{ logger: logger, batchConsumer: newBatchConsumer(logger, spanProcessor, processor.GRPCTransport, processor.ProtoSpanFormat, - tenancyConfig), + tenancyMgr), } } @@ -58,13 +58,10 @@ type batchConsumer struct { logger *zap.Logger spanProcessor processor.SpanProcessor spanOptions processor.SpansOptions - tenancyConfig tenancy.TenancyConfig + tenancyMgr *tenancy.TenancyManager } -func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyConfig *tenancy.TenancyConfig) batchConsumer { - if tenancyConfig == nil { - tenancyConfig = &tenancy.TenancyConfig{} - } +func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, transport processor.InboundTransport, spanFormat processor.SpanFormat, tenancyMgr *tenancy.TenancyManager) batchConsumer { return batchConsumer{ logger: logger, spanProcessor: spanProcessor, @@ -72,7 +69,7 @@ func newBatchConsumer(logger *zap.Logger, spanProcessor processor.SpanProcessor, InboundTransport: transport, SpanFormat: spanFormat, }, - tenancyConfig: *tenancyConfig, + tenancyMgr: tenancyMgr, } } @@ -104,7 +101,7 @@ func (c *batchConsumer) consume(ctx context.Context, batch *model.Batch) error { } func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) { - if !c.tenancyConfig.Enabled { + if !c.tenancyMgr.Enabled { return "", nil } @@ -113,14 +110,14 @@ func (c *batchConsumer) validateTenant(ctx context.Context) (string, error) { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } - tenants := md[c.tenancyConfig.Header] + tenants := md.Get(c.tenancyMgr.Header) if len(tenants) < 1 { return "", status.Errorf(codes.PermissionDenied, "missing tenant header") } else if len(tenants) > 1 { return "", status.Errorf(codes.PermissionDenied, "extra tenant header") } - if !c.tenancyConfig.Valid(tenants[0]) { + if !c.tenancyMgr.Valid(tenants[0]) { return "", status.Errorf(codes.PermissionDenied, "unknown tenant") } diff --git a/cmd/collector/app/handler/grpc_handler_test.go b/cmd/collector/app/handler/grpc_handler_test.go index 9628aeba93b..171835e578c 100644 --- a/cmd/collector/app/handler/grpc_handler_test.go +++ b/cmd/collector/app/handler/grpc_handler_test.go @@ -30,7 +30,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -98,7 +98,7 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp func TestPostSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -133,7 +133,7 @@ func TestPostSpans(t *testing.T) { func TestGRPCCompressionEnabled(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(zap.NewNop(), processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -171,7 +171,7 @@ func TestPostSpansWithError(t *testing.T) { processor := &mockSpanProcessor{expectedError: test.processorError} logger, logBuf := testutils.NewLogger() server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { - handler := NewGRPCHandler(logger, processor, &tenancy.TenancyConfig{}) + handler := NewGRPCHandler(logger, processor, &tenancy.TenancyManager{}) api_v2.RegisterCollectorServiceServer(s, handler) }) defer server.Stop() @@ -210,7 +210,7 @@ func TestPostTenantedSpans(t *testing.T) { processor := &mockSpanProcessor{} server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) { handler := NewGRPCHandler(zap.NewNop(), processor, - tenancy.NewTenancyConfig(&tenancy.Options{ + tenancy.NewTenancyManager(&tenancy.Options{ Enabled: true, Header: tenantHeader, Tenants: []string{dummyTenant}, @@ -346,7 +346,7 @@ func TestGetTenant(t *testing.T) { processor := &mockSpanProcessor{} handler := NewGRPCHandler(zap.NewNop(), processor, - tenancy.NewTenancyConfig(&tenancy.Options{ + tenancy.NewTenancyManager(&tenancy.Options{ Enabled: true, Header: tenantHeader, Tenants: validTenants, diff --git a/cmd/collector/app/handler/otlp_receiver.go b/cmd/collector/app/handler/otlp_receiver.go index 8c714b52042..678fdd78b6d 100644 --- a/cmd/collector/app/handler/otlp_receiver.go +++ b/cmd/collector/app/handler/otlp_receiver.go @@ -34,17 +34,19 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" ) var _ component.Host = (*otelHost)(nil) // API check // StartOTLPReceiver starts OpenTelemetry OTLP receiver listening on gRPC and HTTP ports. -func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor) (component.TracesReceiver, error) { +func StartOTLPReceiver(options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) (component.TracesReceiver, error) { otlpFactory := otlpreceiver.NewFactory() return startOTLPReceiver( options, logger, spanProcessor, + tm, otlpFactory, consumer.NewTraces, otlpFactory.CreateTracesReceiver, @@ -58,6 +60,7 @@ func startOTLPReceiver( options *flags.CollectorOptions, logger *zap.Logger, spanProcessor processor.SpanProcessor, + tm *tenancy.TenancyManager, // from here: params that can be mocked in tests otlpFactory component.ReceiverFactory, newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error), @@ -74,7 +77,7 @@ func startOTLPReceiver( }, } - otlpConsumer := newConsumerDelegate(logger, spanProcessor) + otlpConsumer := newConsumerDelegate(logger, spanProcessor, tm) // the following two constructors never return errors given non-nil arguments, so we ignore errors nextConsumer, err := newTraces(otlpConsumer.consume) if err != nil { @@ -137,13 +140,13 @@ func applyTLSSettings(opts *tlscfg.Options) *configtls.TLSServerSetting { } } -func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor) *consumerDelegate { +func newConsumerDelegate(logger *zap.Logger, spanProcessor processor.SpanProcessor, tm *tenancy.TenancyManager) *consumerDelegate { return &consumerDelegate{ batchConsumer: newBatchConsumer(logger, spanProcessor, processor.UnknownTransport, // could be gRPC or HTTP processor.OTLPSpanFormat, - nil), + tm), protoFromTraces: otlp2jaeger.ProtoFromTraces, } } diff --git a/cmd/collector/app/handler/otlp_receiver_test.go b/cmd/collector/app/handler/otlp_receiver_test.go index 83f17e412e1..b05b5ef5d0b 100644 --- a/cmd/collector/app/handler/otlp_receiver_test.go +++ b/cmd/collector/app/handler/otlp_receiver_test.go @@ -31,6 +31,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/flags" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" ) @@ -48,7 +49,8 @@ func optionsWithPorts(port string) *flags.CollectorOptions { func TestStartOtlpReceiver(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() - rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor) + tm := &tenancy.TenancyManager{} + rec, err := StartOTLPReceiver(optionsWithPorts(":0"), logger, spanProcessor, tm) require.NoError(t, err) defer func() { assert.NoError(t, rec.Shutdown(context.Background())) @@ -79,7 +81,7 @@ func TestConsumerDelegate(t *testing.T) { t.Run(test.expectLog, func(t *testing.T) { logger, logBuf := testutils.NewLogger() spanProcessor := &mockSpanProcessor{expectedError: test.expectErr} - consumer := newConsumerDelegate(logger, spanProcessor) + consumer := newConsumerDelegate(logger, spanProcessor, &tenancy.TenancyManager{}) err := consumer.consume(context.Background(), makeTracesOneSpan()) @@ -98,7 +100,8 @@ func TestStartOtlpReceiver_Error(t *testing.T) { spanProcessor := &mockSpanProcessor{} logger, _ := testutils.NewLogger() opts := optionsWithPorts(":-1") - _, err := StartOTLPReceiver(opts, logger, spanProcessor) + tm := &tenancy.TenancyManager{} + _, err := StartOTLPReceiver(opts, logger, spanProcessor, tm) require.Error(t, err) assert.Contains(t, err.Error(), "could not start the OTLP receiver") @@ -106,7 +109,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { return nil, errors.New("mock error") } f := otlpreceiver.NewFactory() - _, err = startOTLPReceiver(opts, logger, spanProcessor, f, newTraces, f.CreateTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, &tenancy.TenancyManager{}, f, newTraces, f.CreateTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP consumer") @@ -115,7 +118,7 @@ func TestStartOtlpReceiver_Error(t *testing.T) { ) (component.TracesReceiver, error) { return nil, errors.New("mock error") } - _, err = startOTLPReceiver(opts, logger, spanProcessor, f, consumer.NewTraces, createTracesReceiver) + _, err = startOTLPReceiver(opts, logger, spanProcessor, &tenancy.TenancyManager{}, f, consumer.NewTraces, createTracesReceiver) require.Error(t, err) assert.Contains(t, err.Error(), "could not create the OTLP receiver") } diff --git a/cmd/collector/app/server/grpc_test.go b/cmd/collector/app/server/grpc_test.go index 34ab8d0e460..bdf722a8206 100644 --- a/cmd/collector/app/server/grpc_test.go +++ b/cmd/collector/app/server/grpc_test.go @@ -30,8 +30,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/handler" "github.com/jaegertracing/jaeger/internal/grpctest" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" ) @@ -40,7 +40,7 @@ func TestFailToListen(t *testing.T) { logger, _ := zap.NewDevelopment() server, err := StartGRPCServer(&GRPCServerParams{ HostPort: ":-1", - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, }) @@ -57,7 +57,7 @@ func TestFailServe(t *testing.T) { logger := zap.New(core) serveGRPC(grpc.NewServer(), lis, &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, OnError: func(e error) { @@ -72,7 +72,7 @@ func TestFailServe(t *testing.T) { func TestSpanCollector(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, MaxReceiveMessageLength: 1024 * 1024, @@ -97,7 +97,7 @@ func TestSpanCollector(t *testing.T) { func TestCollectorStartWithTLS(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, TLSConfig: tlscfg.Options{ @@ -116,7 +116,7 @@ func TestCollectorStartWithTLS(t *testing.T) { func TestCollectorReflection(t *testing.T) { logger, _ := zap.NewDevelopment() params := &GRPCServerParams{ - Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyConfig{}), + Handler: handler.NewGRPCHandler(logger, &mockSpanProcessor{}, &tenancy.TenancyManager{}), SamplingStore: &mockSamplingStore{}, Logger: logger, } diff --git a/cmd/collector/app/span_handler_builder.go b/cmd/collector/app/span_handler_builder.go index 7a314e80b95..dd5dd311702 100644 --- a/cmd/collector/app/span_handler_builder.go +++ b/cmd/collector/app/span_handler_builder.go @@ -25,8 +25,8 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/processor" zs "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" - "github.com/jaegertracing/jaeger/pkg/config/tenancy" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -36,6 +36,7 @@ type SpanHandlerBuilder struct { CollectorOpts *flags.CollectorOptions Logger *zap.Logger MetricsFactory metrics.Factory + TenancyMgr *tenancy.TenancyManager } // SpanHandlers holds instances to the span handlers built by the SpanHandlerBuilder @@ -76,7 +77,7 @@ func (b *SpanHandlerBuilder) BuildHandlers(spanProcessor processor.SpanProcessor zs.NewChainedSanitizer(zs.NewStandardSanitizers()...), ), handler.NewJaegerSpanHandler(b.Logger, spanProcessor), - handler.NewGRPCHandler(b.Logger, spanProcessor, tenancy.NewTenancyConfig(&b.CollectorOpts.GRPC.Tenancy)), + handler.NewGRPCHandler(b.Logger, spanProcessor, b.TenancyMgr), } } diff --git a/cmd/collector/app/span_handler_builder_test.go b/cmd/collector/app/span_handler_builder_test.go index ec02e0e4503..3f747f83372 100644 --- a/cmd/collector/app/span_handler_builder_test.go +++ b/cmd/collector/app/span_handler_builder_test.go @@ -26,6 +26,7 @@ import ( cmdFlags "github.com/jaegertracing/jaeger/cmd/flags" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/storage/memory" ) @@ -41,6 +42,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { builder := &SpanHandlerBuilder{ SpanWriter: spanWriter, CollectorOpts: cOpts, + TenancyMgr: &tenancy.TenancyManager{}, } assert.NotNil(t, builder.logger()) assert.NotNil(t, builder.metricsFactory()) @@ -50,6 +52,7 @@ func TestNewSpanHandlerBuilder(t *testing.T) { CollectorOpts: cOpts, Logger: zap.NewNop(), MetricsFactory: metrics.NullFactory, + TenancyMgr: &tenancy.TenancyManager{}, } spanProcessor := builder.BuildSpanProcessor() diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 00f73e44548..5e9ff4acafd 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -27,7 +27,7 @@ import ( "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/queue" - "github.com/jaegertracing/jaeger/storage" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) @@ -156,7 +156,7 @@ func (sp *spanProcessor) saveSpan(span *model.Span, tenant string) { // Since we save spans asynchronously from receiving them, we cannot reuse // the inbound Context, as it may be cancelled by the time we reach this point, // so we need to start a new Context. - ctx := storage.WithTenant(context.Background(), tenant) + ctx := tenancy.WithTenant(context.Background(), tenant) if err := sp.spanWriter.WriteSpan(ctx, span); err != nil { sp.logger.Error("Failed to save span", zap.Error(err)) sp.metrics.SavedErrBySvc.ReportServiceNameForSpan(span) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 7b1ff52c8a7..864baf47993 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -34,8 +34,8 @@ import ( "github.com/jaegertracing/jaeger/internal/metricstest" "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/testutils" - "github.com/jaegertracing/jaeger/storage" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" zc "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -184,7 +184,7 @@ func (n *fakeSpanWriter) WriteSpan(ctx context.Context, span *model.Span) error n.tenants = make(map[string]bool) } - n.tenants[storage.GetTenant(ctx)] = true + n.tenants[tenancy.GetTenant(ctx)] = true return n.err } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index c295fc57fde..f92c9482689 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -36,6 +36,7 @@ import ( "github.com/jaegertracing/jaeger/internal/metrics/fork" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" ss "github.com/jaegertracing/jaeger/plugin/sampling/strategystore" "github.com/jaegertracing/jaeger/plugin/storage" @@ -98,6 +99,12 @@ func main() { if err != nil { logger.Fatal("Failed to create sampling strategy store", zap.Error(err)) } + collectorOpts, err := new(flags.CollectorOptions).InitFromViper(v, logger) + if err != nil { + logger.Fatal("Failed to initialize collector", zap.Error(err)) + } + tm := tenancy.NewTenancyManager(&collectorOpts.GRPC.Tenancy) + collector := app.New(&app.CollectorParams{ ServiceName: serviceName, Logger: logger, @@ -106,11 +113,8 @@ func main() { StrategyStore: strategyStore, Aggregator: aggregator, HealthCheck: svc.HC(), + TenancyMgr: tm, }) - collectorOpts, err := new(flags.CollectorOptions).InitFromViper(v, logger) - if err != nil { - logger.Fatal("Failed to initialize collector", zap.Error(err)) - } // Start all Collector services if err := collector.Start(collectorOpts); err != nil { logger.Fatal("Failed to start collector", zap.Error(err)) diff --git a/cmd/query/app/apiv3/grpc_gateway.go b/cmd/query/app/apiv3/grpc_gateway.go index 1709ed626e3..f563f7158e9 100644 --- a/cmd/query/app/apiv3/grpc_gateway.go +++ b/cmd/query/app/apiv3/grpc_gateway.go @@ -26,15 +26,22 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v3" ) // RegisterGRPCGateway registers api_v3 endpoints into provided mux. -func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options) error { +func RegisterGRPCGateway(ctx context.Context, logger *zap.Logger, r *mux.Router, basePath string, grpcEndpoint string, grpcTLS tlscfg.Options, tm *tenancy.TenancyManager) error { jsonpb := &runtime.JSONPb{} - grpcGatewayMux := runtime.NewServeMux( + + muxOpts := []runtime.ServeMuxOption{ runtime.WithMarshalerOption(runtime.MIMEWildcard, jsonpb), - ) + } + if tm.Enabled { + muxOpts = append(muxOpts, runtime.WithMetadata(tm.MetadataAnnotator())) + } + + grpcGatewayMux := runtime.NewServeMux(muxOpts...) var handler http.Handler = grpcGatewayMux if basePath != "/" { handler = http.StripPrefix(basePath, grpcGatewayMux) diff --git a/cmd/query/app/apiv3/grpc_gateway_test.go b/cmd/query/app/apiv3/grpc_gateway_test.go index 0557ab40660..92b9ac5a4cf 100644 --- a/cmd/query/app/apiv3/grpc_gateway_test.go +++ b/cmd/query/app/apiv3/grpc_gateway_test.go @@ -37,6 +37,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" _ "github.com/jaegertracing/jaeger/pkg/gogocodec" // force gogo codec registration + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v3" dependencyStoreMocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" spanstoremocks "github.com/jaegertracing/jaeger/storage/spanstore/mocks" @@ -45,21 +46,15 @@ import ( var testCertKeyLocation = "../../../../pkg/config/tlscfg/testdata/" func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options) { - defer serverTLS.Close() - defer clientTLS.Close() + testGRPCGatewayWithTenancy(t, basePath, serverTLS, clientTLS, + tenancy.Options{ + Enabled: false, + }, + func(*http.Request) {}) +} +func setupGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options, tenancyOptions tenancy.Options) (*spanstoremocks.Reader, net.Listener, *grpc.Server, context.CancelFunc, *http.Server) { r := &spanstoremocks.Reader{} - traceID := model.NewTraceID(150, 160) - r.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( - &model.Trace{ - Spans: []*model.Span{ - { - TraceID: traceID, - SpanID: model.NewSpanID(180), - OperationName: "foobar", - }, - }, - }, nil).Once() q := querysvc.NewQueryService(r, &dependencyStoreMocks.Reader{}, querysvc.QueryServiceOptions{}) @@ -70,6 +65,13 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl creds := credentials.NewTLS(config) serverGRPCOpts = append(serverGRPCOpts, grpc.Creds(creds)) } + if tenancyOptions.Enabled { + tm := tenancy.NewTenancyManager(&tenancyOptions) + serverGRPCOpts = append(serverGRPCOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), + ) + } grpcServer := grpc.NewServer(serverGRPCOpts...) h := &Handler{ QueryService: q, @@ -80,13 +82,11 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl err := grpcServer.Serve(lis) require.NoError(t, err) }() - defer grpcServer.Stop() router := &mux.Router{} router = router.PathPrefix(basePath).Subrouter() ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - err := RegisterGRPCGateway(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS) + err := RegisterGRPCGateway(ctx, zap.NewNop(), router, basePath, lis.Addr().String(), clientTLS, tenancy.NewTenancyManager(&tenancyOptions)) require.NoError(t, err) httpLis, err := net.Listen("tcp", ":0") @@ -98,10 +98,39 @@ func testGRPCGateway(t *testing.T, basePath string, serverTLS tlscfg.Options, cl err = httpServer.Serve(httpLis) require.Equal(t, http.ErrServerClosed, err) }() + return r, httpLis, grpcServer, cancel, httpServer +} + +func testGRPCGatewayWithTenancy(t *testing.T, basePath string, serverTLS tlscfg.Options, clientTLS tlscfg.Options, + tenancyOptions tenancy.Options, + setupRequest func(*http.Request), +) { + defer serverTLS.Close() + defer clientTLS.Close() + + reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t, basePath, serverTLS, clientTLS, tenancyOptions) + defer grpcServer.Stop() + defer cancel() defer httpServer.Shutdown(context.Background()) + + traceID := model.NewTraceID(150, 160) + reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( + &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: model.NewSpanID(180), + OperationName: "foobar", + }, + }, + }, nil).Once() + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil) + require.NoError(t, err) req.Header.Set("Content-Type", "application/json") + setupRequest(req) response, err := http.DefaultClient.Do(req) + require.NoError(t, err) buf := bytes.Buffer{} _, err = buf.ReadFrom(response.Body) require.NoError(t, err) @@ -142,3 +171,56 @@ func TestGRPCGateway_TLS_with_base_path(t *testing.T) { type envelope struct { Result json.RawMessage `json:"result"` } + +func TestTenancyGRPCGateway(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + tm := tenancy.NewTenancyManager(&tenancyOptions) + testGRPCGatewayWithTenancy(t, "/", tlscfg.Options{}, tlscfg.Options{}, + // Configure the gateway to forward tenancy header from HTTP to GRPC + tenancyOptions, + // Add a tenancy header on outbound requests + func(req *http.Request) { + req.Header.Add(tm.Header, "dummy") + }) +} + +func TestTenancyGRPCRejection(t *testing.T) { + basePath := "/" + tenancyOptions := tenancy.Options{Enabled: true} + reader, httpLis, grpcServer, cancel, httpServer := setupGRPCGateway(t, + basePath, tlscfg.Options{}, tlscfg.Options{}, + tenancyOptions) + defer grpcServer.Stop() + defer cancel() + defer httpServer.Shutdown(context.Background()) + + traceID := model.NewTraceID(150, 160) + reader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")).Return( + &model.Trace{ + Spans: []*model.Span{ + { + TraceID: traceID, + SpanID: model.NewSpanID(180), + OperationName: "foobar", + }, + }, + }, nil).Once() + + req, err := http.NewRequest("GET", fmt.Sprintf("http://localhost%s%s/api/v3/traces/123", strings.Replace(httpLis.Addr().String(), "[::]", "", 1), basePath), nil) + require.NoError(t, err) + req.Header.Set("Content-Type", "application/json") + // We don't set tenant header + response, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusForbidden, response.StatusCode) + + // Try again with tenant header set + tm := tenancy.NewTenancyManager(&tenancyOptions) + req.Header.Set(tm.Header, "acme") + response, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, response.StatusCode) + // Skip unmarshal of response; it is enough that it succeeded +} diff --git a/cmd/query/app/flags.go b/cmd/query/app/flags.go index f9daa5041e9..07dd9392a5d 100644 --- a/cmd/query/app/flags.go +++ b/cmd/query/app/flags.go @@ -32,6 +32,7 @@ import ( "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/storage" ) @@ -79,6 +80,8 @@ type QueryOptions struct { AdditionalHeaders http.Header // MaxClockSkewAdjust is the maximum duration by which jaeger-query will adjust a span MaxClockSkewAdjust time.Duration + // Tenancy configures tenancy for query + Tenancy tenancy.Options } // AddFlags adds flags for QueryOptions @@ -122,6 +125,11 @@ func (qOpts *QueryOptions) InitFromViper(v *viper.Viper, logger *zap.Logger) (*Q } else { qOpts.AdditionalHeaders = headers } + if tenancy, err := tenancy.InitFromViper(v); err == nil { + qOpts.Tenancy = tenancy + } else { + return qOpts, fmt.Errorf("failed to parse Tenancy options: %w", err) + } return qOpts, nil } diff --git a/cmd/query/app/grpc_handler_test.go b/cmd/query/app/grpc_handler_test.go index 349e309ccff..c25b890ccf3 100644 --- a/cmd/query/app/grpc_handler_test.go +++ b/cmd/query/app/grpc_handler_test.go @@ -31,10 +31,12 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" "github.com/jaegertracing/jaeger/cmd/query/app/querysvc" "github.com/jaegertracing/jaeger/model" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" @@ -143,9 +145,16 @@ type grpcClient struct { conn *grpc.ClientConn } -func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, net.Addr) { +func newGRPCServer(t *testing.T, q *querysvc.QueryService, mq querysvc.MetricsQueryService, logger *zap.Logger, tracer opentracing.Tracer, tenancyMgr *tenancy.TenancyManager) (*grpc.Server, net.Addr) { lis, _ := net.Listen("tcp", ":0") - grpcServer := grpc.NewServer() + var grpcOpts []grpc.ServerOption + if tenancyMgr.Enabled { + grpcOpts = append(grpcOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tenancyMgr)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tenancyMgr)), + ) + } + grpcServer := grpc.NewServer(grpcOpts...) grpcHandler := &GRPCHandler{ queryService: q, metricsQueryService: mq, @@ -193,47 +202,8 @@ func withMetricsQuery() testOption { } } -func initializeTestServerGRPCWithOptions(t *testing.T, options ...testOption) *grpcServer { - archiveSpanReader := &spanstoremocks.Reader{} - archiveSpanWriter := &spanstoremocks.Writer{} - - spanReader := &spanstoremocks.Reader{} - dependencyReader := &depsmocks.Reader{} - disabledReader, err := disabled.NewMetricsReader() - require.NoError(t, err) - - q := querysvc.NewQueryService(spanReader, dependencyReader, - querysvc.QueryServiceOptions{ - ArchiveSpanReader: archiveSpanReader, - ArchiveSpanWriter: archiveSpanWriter, - }) - - tqs := &testQueryService{ - // Disable metrics query by default. - metricsQueryService: disabledReader, - } - for _, opt := range options { - opt(tqs) - } - - logger := zap.NewNop() - tracer := opentracing.NoopTracer{} - - server, addr := newGRPCServer(t, q, tqs.metricsQueryService, logger, tracer) - - return &grpcServer{ - server: server, - lisAddr: addr, - spanReader: spanReader, - depReader: dependencyReader, - metricsQueryService: tqs.metricsQueryService, - archiveSpanReader: archiveSpanReader, - archiveSpanWriter: archiveSpanWriter, - } -} - func withServerAndClient(t *testing.T, actualTest func(server *grpcServer, client *grpcClient), options ...testOption) { - server := initializeTestServerGRPCWithOptions(t, options...) + server := initializeTenantedTestServerGRPCWithOptions(t, &tenancy.TenancyManager{}, options...) client := newGRPCClient(t, server.lisAddr.String()) defer server.server.Stop() defer client.conn.Close() @@ -930,3 +900,271 @@ func TestMetricsQueryNilRequestGRPC(t *testing.T) { assert.Empty(t, bqp) assert.EqualError(t, err, errNilRequest.Error()) } + +func initializeTenantedTestServerGRPCWithOptions(t *testing.T, tm *tenancy.TenancyManager, options ...testOption) *grpcServer { + archiveSpanReader := &spanstoremocks.Reader{} + archiveSpanWriter := &spanstoremocks.Writer{} + + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + disabledReader, err := disabled.NewMetricsReader() + require.NoError(t, err) + + q := querysvc.NewQueryService( + spanReader, + dependencyReader, + querysvc.QueryServiceOptions{ + ArchiveSpanReader: archiveSpanReader, + ArchiveSpanWriter: archiveSpanWriter, + }) + + tqs := &testQueryService{ + // Disable metrics query by default. + metricsQueryService: disabledReader, + } + for _, opt := range options { + opt(tqs) + } + + logger := zap.NewNop() + tracer := opentracing.NoopTracer{} + + server, addr := newGRPCServer(t, q, tqs.metricsQueryService, logger, tracer, tm) + + return &grpcServer{ + server: server, + lisAddr: addr, + spanReader: spanReader, + depReader: dependencyReader, + metricsQueryService: tqs.metricsQueryService, + archiveSpanReader: archiveSpanReader, + archiveSpanWriter: archiveSpanWriter, + } +} + +func withTenantedServerAndClient(t *testing.T, tm *tenancy.TenancyManager, actualTest func(server *grpcServer, client *grpcClient), options ...testOption) { + server := initializeTenantedTestServerGRPCWithOptions(t, tm, options...) + client := newGRPCClient(t, server.lisAddr.String()) + defer server.server.Stop() + defer client.conn.Close() + + actualTest(server, client) +} + +// withOutgoingMetadata returns a Context with metadata for a server to receive +func withOutgoingMetadata(t *testing.T, ctx context.Context, headerName, headerValue string) context.Context { + t.Helper() + + md := metadata.New(map[string]string{headerName: headerValue}) + return metadata.NewOutgoingContext(ctx, md) +} + +func TestSearchTenancyGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Once() + + // First try without tenancy header + res, err := client.GetTrace(context.Background(), &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + require.NoError(t, err, "could not initiate GetTraceRequest") + + spanResChunk, err := res.Recv() + assertGRPCError(t, err, codes.PermissionDenied, "missing tenant header") + assert.Nil(t, spanResChunk) + + // Next try with tenancy + res, err = client.GetTrace( + withOutgoingMetadata(t, context.Background(), tm.Header, "acme"), + &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + spanResChunk, _ = res.Recv() + + require.NoError(t, err, "expecting gRPC to succeed with any tenancy header") + require.NotNil(t, spanResChunk) + require.NotNil(t, spanResChunk.Spans) + require.Equal(t, len(mockTrace.Spans), len(spanResChunk.Spans)) + assert.Equal(t, mockTraceID, spanResChunk.Spans[0].TraceID) + }) +} + +func TestServicesTenancyGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + expectedServices := []string{"trifle", "bling"} + server.spanReader.On("GetServices", mock.AnythingOfType("*context.valueCtx")).Return(expectedServices, nil).Once() + + // First try without tenancy header + _, err := client.GetServices(context.Background(), &api_v2.GetServicesRequest{}) + assertGRPCError(t, err, codes.PermissionDenied, "missing tenant header") + + // Next try with tenancy + res, err := client.GetServices(withOutgoingMetadata(t, context.Background(), tm.Header, "acme"), &api_v2.GetServicesRequest{}) + require.NoError(t, err, "expecting gRPC to succeed with any tenancy header") + assert.Equal(t, expectedServices, res.Services) + }) +} + +func TestSearchTenancyGRPCExplicitList(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + Header: "non-standard-tenant-header", + Tenants: []string{"mercury", "venus", "mars"}, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + server.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Once() + + for _, tc := range []struct { + name string + tenancyHeader string + tenant string + wantErr bool + failureCode codes.Code + failureMessage string + }{ + { + name: "no header", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "missing tenant header", + }, + { + name: "invalid header", + tenancyHeader: "not-the-correct-header", + tenant: "mercury", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "missing tenant header", + }, + { + name: "missing tenant", + tenancyHeader: tm.Header, + tenant: "", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "unknown tenant", + }, + { + name: "invalid tenant", + tenancyHeader: tm.Header, + tenant: "some-other-tenant-not-in-the-list", + wantErr: true, + failureCode: codes.PermissionDenied, + failureMessage: "unknown tenant", + }, + { + name: "valid tenant", + tenancyHeader: tm.Header, + tenant: "venus", + }, + } { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + if tc.tenancyHeader != "" { + ctx = withOutgoingMetadata(t, context.Background(), tc.tenancyHeader, tc.tenant) + } + res, err := client.GetTrace(ctx, &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + + require.NoError(t, err, "could not initiate GetTraceRequest") + + spanResChunk, err := res.Recv() + + if tc.wantErr { + assertGRPCError(t, err, tc.failureCode, tc.failureMessage) + assert.Nil(t, spanResChunk) + } else { + require.NoError(t, err, "expecting gRPC to succeed") + require.NotNil(t, spanResChunk) + require.NotNil(t, spanResChunk.Spans) + require.Equal(t, len(mockTrace.Spans), len(spanResChunk.Spans)) + assert.Equal(t, mockTraceID, spanResChunk.Spans[0].TraceID) + } + }) + } + }) +} + +func TestTenancyContextFlowGRPC(t *testing.T) { + tm := tenancy.NewTenancyManager(&tenancy.Options{ + Enabled: true, + }) + withTenantedServerAndClient(t, tm, func(server *grpcServer, client *grpcClient) { + // Mock a storage backend with tenant 'acme' and 'megacorp' + allExpectedResults := map[string]struct { + expectedServices []string + expectedTrace *model.Trace + expectedTraceErr error + }{ + "acme": {[]string{"trifle", "bling"}, mockTrace, nil}, + "megacorp": {[]string{"grapefruit"}, nil, errStorageGRPC}, + } + + addTenantedGetServices := func(mockReader *spanstoremocks.Reader, tenant string, expectedServices []string) { + mockReader.On("GetServices", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok { + return false + } + if tenancy.GetTenant(ctx) != tenant { + return false + } + return true + })).Return(expectedServices, nil).Once() + } + addTenantedGetTrace := func(mockReader *spanstoremocks.Reader, tenant string, trace *model.Trace, err error) { + mockReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok { + return false + } + if tenancy.GetTenant(ctx) != tenant { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(trace, err).Once() + } + + for tenant, expected := range allExpectedResults { + addTenantedGetServices(server.spanReader, tenant, expected.expectedServices) + addTenantedGetTrace(server.spanReader, tenant, expected.expectedTrace, expected.expectedTraceErr) + } + + for tenant, expected := range allExpectedResults { + t.Run(tenant, func(t *testing.T) { + // Test context propagation to Unary method. + resGetServices, err := client.GetServices(withOutgoingMetadata(t, context.Background(), tm.Header, tenant), &api_v2.GetServicesRequest{}) + require.NoError(t, err, "expecting gRPC to succeed with %q tenancy header", tenant) + assert.Equal(t, expected.expectedServices, resGetServices.Services) + + // Test context propagation to Streaming method. + resGetTrace, err := client.GetTrace(withOutgoingMetadata(t, context.Background(), tm.Header, tenant), + &api_v2.GetTraceRequest{ + TraceID: mockTraceID, + }) + require.NoError(t, err) + spanResChunk, err := resGetTrace.Recv() + + if expected.expectedTrace != nil { + assert.Equal(t, expected.expectedTrace.Spans[0].TraceID, spanResChunk.Spans[0].TraceID) + } + if expected.expectedTraceErr != nil { + assert.Contains(t, err.Error(), expected.expectedTraceErr.Error()) + } + }) + } + + server.spanReader.AssertExpectations(t) + }) +} diff --git a/cmd/query/app/http_handler.go b/cmd/query/app/http_handler.go index 6b3f098a8cd..8abde1b9b74 100644 --- a/cmd/query/app/http_handler.go +++ b/cmd/query/app/http_handler.go @@ -36,6 +36,7 @@ import ( uiconv "github.com/jaegertracing/jaeger/model/converter/json" ui "github.com/jaegertracing/jaeger/model/json" "github.com/jaegertracing/jaeger/pkg/multierror" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/storage/metricsstore" @@ -84,6 +85,7 @@ type APIHandler struct { queryService *querysvc.QueryService metricsQueryService querysvc.MetricsQueryService queryParser queryParser + tenancyMgr *tenancy.TenancyManager basePath string apiPrefix string logger *zap.Logger @@ -91,13 +93,14 @@ type APIHandler struct { } // NewAPIHandler returns an APIHandler -func NewAPIHandler(queryService *querysvc.QueryService, options ...HandlerOption) *APIHandler { +func NewAPIHandler(queryService *querysvc.QueryService, tm *tenancy.TenancyManager, options ...HandlerOption) *APIHandler { aH := &APIHandler{ queryService: queryService, queryParser: queryParser{ traceQueryLookbackDuration: defaultTraceQueryLookbackDuration, timeNow: time.Now, }, + tenancyMgr: tm, } for _, option := range options { @@ -139,9 +142,14 @@ func (aH *APIHandler) handleFunc( args ...interface{}, ) *mux.Route { route = aH.route(route, args...) + var handler http.Handler + handler = http.HandlerFunc(f) + if aH.tenancyMgr.Enabled { + handler = tenancy.ExtractTenantHTTPHandler(aH.tenancyMgr, handler) + } traceMiddleware := nethttp.Middleware( aH.tracer, - http.HandlerFunc(f), + handler, nethttp.OperationNameFunc(func(r *http.Request) string { return route })) diff --git a/cmd/query/app/http_handler_test.go b/cmd/query/app/http_handler_test.go index afcff3d86c4..881b164cf4d 100644 --- a/cmd/query/app/http_handler_test.go +++ b/cmd/query/app/http_handler_test.go @@ -17,6 +17,7 @@ package app import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -42,6 +43,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" ui "github.com/jaegertracing/jaeger/model/json" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/plugin/metrics/disabled" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" @@ -91,6 +93,7 @@ type structuredTraceResponse struct { func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { return initializeTestServerWithOptions( + &tenancy.TenancyManager{}, queryOptions, append( []HandlerOption{ @@ -105,15 +108,15 @@ func initializeTestServerWithHandler(queryOptions querysvc.QueryServiceOptions, ) } -func initializeTestServerWithOptions(queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { +func initializeTestServerWithOptions(tenancyMgr *tenancy.TenancyManager, queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) *testServer { readStorage := &spanstoremocks.Reader{} dependencyStorage := &depsmocks.Reader{} qs := querysvc.NewQueryService(readStorage, dependencyStorage, queryOptions) r := NewRouter() - handler := NewAPIHandler(qs, options...) + handler := NewAPIHandler(qs, tenancyMgr, options...) handler.RegisterRoutes(r) return &testServer{ - server: httptest.NewServer(r), + server: httptest.NewServer(tenancy.ExtractTenantHTTPHandler(tenancyMgr, r)), spanReader: readStorage, dependencyReader: dependencyStorage, handler: handler, @@ -132,7 +135,7 @@ type testServer struct { } func withTestServer(doTest func(s *testServer), queryOptions querysvc.QueryServiceOptions, options ...HandlerOption) { - ts := initializeTestServerWithOptions(queryOptions, options...) + ts := initializeTestServerWithOptions(&tenancy.TenancyManager{}, queryOptions, options...) defer ts.server.Close() doTest(ts) } @@ -180,7 +183,7 @@ func TestLogOnServerError(t *testing.T) { apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(zap.New(l)), } - h := NewAPIHandler(qs, apiHandlerOptions...) + h := NewAPIHandler(qs, &tenancy.TenancyManager{}, apiHandlerOptions...) e := errors.New("test error") h.handleError(&testHttp.TestResponseWriter{}, e, http.StatusInternalServerError) require.Equal(t, 1, len(*l.logs)) @@ -401,7 +404,7 @@ func TestSearchByTraceIDSuccess(t *testing.T) { func TestSearchByTraceIDSuccessWithArchive(t *testing.T) { archiveReadMock := &spanstoremocks.Reader{} - ts := initializeTestServerWithOptions(querysvc.QueryServiceOptions{ + ts := initializeTestServerWithOptions(&tenancy.TenancyManager{}, querysvc.QueryServiceOptions{ ArchiveSpanReader: archiveReadMock, }) defer ts.server.Close() @@ -444,6 +447,7 @@ func TestSearchByTraceIDFailure(t *testing.T) { func TestSearchModelConversionFailure(t *testing.T) { ts := initializeTestServerWithOptions( + &tenancy.TenancyManager{}, querysvc.QueryServiceOptions{ Adjuster: adjuster.Func(func(trace *model.Trace) (*model.Trace, error) { return trace, errAdjustment @@ -812,11 +816,15 @@ func TestGetMinStep(t *testing.T) { // getJSON fetches a JSON document from a server via HTTP GET func getJSON(url string, out interface{}) error { + return getJSONCustomHeaders(url, make(map[string]string), out) +} + +func getJSONCustomHeaders(url string, additionalHeaders map[string]string, out interface{}) error { req, err := http.NewRequest("GET", url, nil) if err != nil { return err } - return execJSON(req, out) + return execJSON(req, additionalHeaders, out) } // postJSON submits a JSON document to a server via HTTP POST and parses response as JSON. @@ -830,12 +838,15 @@ func postJSON(url string, req interface{}, out interface{}) error { if err != nil { return err } - return execJSON(r, out) + return execJSON(r, make(map[string]string), out) } // execJSON executes an http request against a server and parses response as JSON -func execJSON(req *http.Request, out interface{}) error { +func execJSON(req *http.Request, additionalHeaders map[string]string, out interface{}) error { req.Header.Add("Accept", "application/json") + for k, v := range additionalHeaders { + req.Header.Add(k, v) + } resp, err := httpClient.Do(req) if err != nil { @@ -869,3 +880,99 @@ func execJSON(req *http.Request, out interface{}) error { func parsedError(code int, err string) string { return fmt.Sprintf(`%d error from server: {"data":null,"total":0,"limit":0,"offset":0,"errors":[{"code":%d,"msg":"%s"}]}`+"\n", code, code, err) } + +func TestSearchTenancyHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Twice() + + var response structuredResponse + err := getJSON(ts.server.URL+`/api/traces?traceID=1&traceID=2`, &response) + require.Error(t, err) + assert.Equal(t, "401 error from server: missing tenant header", err.Error()) + assert.Len(t, response.Errors, 0) + assert.Nil(t, response.Data) + + err = getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "acme"}, + &response) + assert.NoError(t, err) + assert.Len(t, response.Errors, 0) + assert.Len(t, response.Data, 2) +} + +func TestSearchTenancyRejectionHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.AnythingOfType("*context.valueCtx"), mock.AnythingOfType("model.TraceID")). + Return(mockTrace, nil).Twice() + + req, err := http.NewRequest("GET", ts.server.URL+`/api/traces?traceID=1&traceID=2`, nil) + assert.NoError(t, err) + req.Header.Add("Accept", "application/json") + // We don't set tenant header + resp, err := httpClient.Do(req) + assert.NoError(t, err) + assert.Equal(t, http.StatusUnauthorized, resp.StatusCode) + + tm := tenancy.NewTenancyManager(&tenancyOptions) + req.Header.Set(tm.Header, "acme") + resp, err = http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + // Skip unmarshal of response; it is enough that it succeeded +} + +func TestSearchTenancyFlowTenantHTTP(t *testing.T) { + tenancyOptions := tenancy.Options{ + Enabled: true, + } + ts := initializeTestServerWithOptions( + tenancy.NewTenancyManager(&tenancyOptions), + querysvc.QueryServiceOptions{}) + defer ts.server.Close() + ts.spanReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok || tenancy.GetTenant(ctx) != "acme" { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(mockTrace, nil).Twice() + ts.spanReader.On("GetTrace", mock.MatchedBy(func(v interface{}) bool { + ctx, ok := v.(context.Context) + if !ok || tenancy.GetTenant(ctx) != "megacorp" { + return false + } + return true + }), mock.AnythingOfType("model.TraceID")).Return(nil, errStorage).Once() + + var responseAcme structuredResponse + err := getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "acme"}, + &responseAcme) + assert.NoError(t, err) + assert.Len(t, responseAcme.Errors, 0) + assert.Len(t, responseAcme.Data, 2) + + var responseMegacorp structuredResponse + err = getJSONCustomHeaders( + ts.server.URL+`/api/traces?traceID=1&traceID=2`, + map[string]string{"x-tenant": "megacorp"}, + &responseMegacorp) + assert.Contains(t, err.Error(), "storage error") + assert.Len(t, responseMegacorp.Errors, 0) + assert.Nil(t, responseMegacorp.Data) +} diff --git a/cmd/query/app/server.go b/cmd/query/app/server.go index c341af13e68..721a0491ead 100644 --- a/cmd/query/app/server.go +++ b/cmd/query/app/server.go @@ -39,6 +39,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/healthcheck" "github.com/jaegertracing/jaeger/pkg/netutils" "github.com/jaegertracing/jaeger/pkg/recoveryhandler" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/proto-gen/api_v2" "github.com/jaegertracing/jaeger/proto-gen/api_v2/metrics" "github.com/jaegertracing/jaeger/proto-gen/api_v3" @@ -64,7 +65,7 @@ type Server struct { } // NewServer creates and initializes Server -func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tracer opentracing.Tracer) (*Server, error) { +func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.TenancyManager, tracer opentracing.Tracer) (*Server, error) { _, httpPort, err := net.SplitHostPort(options.HTTPHostPort) if err != nil { return nil, err @@ -78,12 +79,12 @@ func NewServer(logger *zap.Logger, querySvc *querysvc.QueryService, metricsQuery return nil, errors.New("server with TLS enabled can not use same host ports for gRPC and HTTP. Use dedicated HTTP and gRPC host ports instead") } - grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, logger, tracer) + grpcServer, err := createGRPCServer(querySvc, metricsQuerySvc, options, tm, logger, tracer) if err != nil { return nil, err } - httpServer, closeGRPCGateway, err := createHTTPServer(querySvc, metricsQuerySvc, options, tracer, logger) + httpServer, closeGRPCGateway, err := createHTTPServer(querySvc, metricsQuerySvc, options, tm, tracer, logger) if err != nil { return nil, err } @@ -106,7 +107,7 @@ func (s Server) HealthCheckStatus() chan healthcheck.Status { return s.unavailableChannel } -func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) { +func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, options *QueryOptions, tm *tenancy.TenancyManager, logger *zap.Logger, tracer opentracing.Tracer) (*grpc.Server, error) { var grpcOpts []grpc.ServerOption if options.TLSGRPC.Enabled { @@ -119,6 +120,12 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. grpcOpts = append(grpcOpts, grpc.Creds(creds)) } + if tm.Enabled { + grpcOpts = append(grpcOpts, + grpc.StreamInterceptor(tenancy.NewGuardingStreamInterceptor(tm)), + grpc.UnaryInterceptor(tenancy.NewGuardingUnaryInterceptor(tm)), + ) + } server := grpc.NewServer(grpcOpts...) reflection.Register(server) @@ -144,7 +151,7 @@ func createGRPCServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. return server, nil } -func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) { +func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc.MetricsQueryService, queryOpts *QueryOptions, tm *tenancy.TenancyManager, tracer opentracing.Tracer, logger *zap.Logger) (*http.Server, context.CancelFunc, error) { apiHandlerOptions := []HandlerOption{ HandlerOptions.Logger(logger), HandlerOptions.Tracer(tracer), @@ -153,6 +160,7 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. apiHandler := NewAPIHandler( querySvc, + tm, apiHandlerOptions...) r := NewRouter() if queryOpts.BasePath != "/" { @@ -160,7 +168,7 @@ func createHTTPServer(querySvc *querysvc.QueryService, metricsQuerySvc querysvc. } ctx, closeGRPCGateway := context.WithCancel(context.Background()) - if err := apiv3.RegisterGRPCGateway(ctx, logger, r, queryOpts.BasePath, queryOpts.GRPCHostPort, queryOpts.TLSGRPC); err != nil { + if err := apiv3.RegisterGRPCGateway(ctx, logger, r, queryOpts.BasePath, queryOpts.GRPCHostPort, queryOpts.TLSGRPC, tm); err != nil { closeGRPCGateway() return nil, nil, err } diff --git a/cmd/query/app/server_test.go b/cmd/query/app/server_test.go index d3d798cd2bc..f72373d3434 100644 --- a/cmd/query/app/server_test.go +++ b/cmd/query/app/server_test.go @@ -40,6 +40,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/config/tlscfg" "github.com/jaegertracing/jaeger/pkg/healthcheck" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/ports" "github.com/jaegertracing/jaeger/proto-gen/api_v2" depsmocks "github.com/jaegertracing/jaeger/storage/dependencystore/mocks" @@ -67,7 +68,8 @@ func TestCreateTLSServerSinglePortError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8080", TLSGRPC: tlsCfg, TLSHTTP: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -80,7 +82,8 @@ func TestCreateTLSGrpcServerError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSGRPC: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -93,7 +96,8 @@ func TestCreateTLSHttpServerError(t *testing.T) { } _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, - &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, opentracing.NoopTracer{}) + &QueryOptions{HTTPHostPort: ":8080", GRPCHostPort: ":8081", TLSHTTP: tlsCfg}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) } @@ -335,7 +339,7 @@ func TestServerHTTPTLS(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, + serverOptions, tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -495,7 +499,7 @@ func TestServerGRPCTLS(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, - serverOptions, + serverOptions, tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -550,11 +554,13 @@ func TestServerGRPCTLS(t *testing.T) { func TestServerBadHostPort(t *testing.T) { _, err := NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: "8080", GRPCHostPort: "127.0.0.1:8081", BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) _, err = NewServer(zap.NewNop(), &querysvc.QueryService{}, nil, &QueryOptions{HTTPHostPort: "127.0.0.1:8081", GRPCHostPort: "9123", BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.NotNil(t, err) @@ -585,6 +591,7 @@ func TestServerInUseHostPort(t *testing.T) { GRPCHostPort: tc.grpcHostPort, BearerTokenPropagation: true, }, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}, ) assert.NoError(t, err) @@ -614,6 +621,7 @@ func TestServerSinglePort(t *testing.T) { querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort, BearerTokenPropagation: true}, + tenancy.NewTenancyManager(&tenancy.Options{}), opentracing.NoopTracer{}) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -662,7 +670,8 @@ func TestServerGracefulExit(t *testing.T) { querySvc := &querysvc.QueryService{} tracer := opentracing.NoopTracer{} - server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, tracer) + server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: hostPort, HTTPHostPort: hostPort}, + tenancy.NewTenancyManager(&tenancy.Options{}), tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) go func() { @@ -691,6 +700,7 @@ func TestServerHandlesPortZero(t *testing.T) { tracer := opentracing.NoopTracer{} server, err := NewServer(flagsSvc.Logger, querySvc, nil, &QueryOptions{GRPCHostPort: ":0", HTTPHostPort: ":0"}, + tenancy.NewTenancyManager(&tenancy.Options{}), tracer) assert.Nil(t, err) assert.NoError(t, server.Start()) @@ -714,3 +724,76 @@ func TestServerHandlesPortZero(t *testing.T) { }, }.Execute(t) } + +func TestServerHTTPTenancy(t *testing.T) { + testCases := []struct { + name string + tenant string + errMsg string + status int + }{ + { + name: "no tenant", + // no value for tenant header + status: 401, + }, + { + name: "tenant", + tenant: "acme", + status: 200, + }, + } + + serverOptions := &QueryOptions{ + HTTPHostPort: ":8080", + GRPCHostPort: ":8080", + Tenancy: tenancy.Options{ + Enabled: true, + }, + } + tenancyMgr := tenancy.NewTenancyManager(&serverOptions.Tenancy) + + spanReader := &spanstoremocks.Reader{} + dependencyReader := &depsmocks.Reader{} + + querySvc := querysvc.NewQueryService(spanReader, dependencyReader, querysvc.QueryServiceOptions{}) + server, err := NewServer(zap.NewNop(), querySvc, nil, + serverOptions, tenancyMgr, + opentracing.NoopTracer{}) + require.Nil(t, err) + require.NoError(t, server.Start()) + + for _, test := range testCases { + t.Run(test.name, func(t *testing.T) { + conn, clientError := net.DialTimeout("tcp", "localhost:8080", 2*time.Second) + require.NoError(t, clientError) + + queryString := "/api/traces?service=service&start=0&end=0&operation=operation&limit=200&minDuration=20ms" + req, err := http.NewRequest("GET", "http://localhost:8080"+queryString, nil) + if test.tenant != "" { + req.Header.Add(tenancyMgr.Header, test.tenant) + } + assert.Nil(t, err) + req.Header.Add("Accept", "application/json") + + client := &http.Client{} + resp, err2 := client.Do(req) + if test.errMsg == "" { + require.NoError(t, err2) + } else { + assert.Error(t, err2) + if err != nil { + assert.Equal(t, test.errMsg, err2.Error()) + } + } + assert.Equal(t, test.status, resp.StatusCode) + if err2 == nil { + resp.Body.Close() + } + if conn != nil { + require.Nil(t, conn.Close()) + } + }) + } + server.Close() +} diff --git a/cmd/query/main.go b/cmd/query/main.go index a7cbe340e93..bd9b7e49530 100644 --- a/cmd/query/main.go +++ b/cmd/query/main.go @@ -38,6 +38,7 @@ import ( "github.com/jaegertracing/jaeger/pkg/bearertoken" "github.com/jaegertracing/jaeger/pkg/config" "github.com/jaegertracing/jaeger/pkg/metrics" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/pkg/version" metricsPlugin "github.com/jaegertracing/jaeger/plugin/metrics" "github.com/jaegertracing/jaeger/plugin/storage" @@ -124,7 +125,8 @@ func main() { spanReader, dependencyReader, *queryServiceOptions) - server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tracer) + tm := tenancy.NewTenancyManager(&queryOpts.Tenancy) + server, err := app.NewServer(svc.Logger, queryService, metricsQueryService, queryOpts, tm, tracer) if err != nil { logger.Fatal("Failed to create server", zap.Error(err)) } diff --git a/pkg/config/tenancy/tenancy.go b/pkg/tenancy/config.go similarity index 77% rename from pkg/config/tenancy/tenancy.go rename to pkg/tenancy/config.go index 58909b7af16..5277ead9b93 100644 --- a/pkg/config/tenancy/tenancy.go +++ b/pkg/tenancy/config.go @@ -14,8 +14,8 @@ package tenancy -// TenancyConfig holds the settings for multi-tenant Jaeger -type TenancyConfig struct { +// TenancyManager can check tenant usage for multi-tenant Jaeger configurations +type TenancyManager struct { Enabled bool Header string guard guard @@ -33,16 +33,21 @@ type Options struct { Tenants []string } -// NewTenancyConfig creates a tenancy configuration for tenancy Options -func NewTenancyConfig(options *Options) *TenancyConfig { - return &TenancyConfig{ +// NewTenancyManager creates a TenancyManager from tenancy Options +func NewTenancyManager(options *Options) *TenancyManager { + // Default header value (although set by CLI flags, this helps tests and API users) + header := options.Header + if header == "" && options.Enabled { + header = "x-tenant" + } + return &TenancyManager{ Enabled: options.Enabled, - Header: options.Header, + Header: header, guard: tenancyGuardFactory(options), } } -func (tc *TenancyConfig) Valid(tenant string) bool { +func (tc *TenancyManager) Valid(tenant string) bool { return tc.guard.Valid(tenant) } diff --git a/pkg/config/tenancy/tenancy_test.go b/pkg/tenancy/config_test.go similarity index 97% rename from pkg/config/tenancy/tenancy_test.go rename to pkg/tenancy/config_test.go index b84bd25d405..a38067dc39e 100644 --- a/pkg/config/tenancy/tenancy_test.go +++ b/pkg/tenancy/config_test.go @@ -84,7 +84,7 @@ func TestTenancyValidity(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { - tc := NewTenancyConfig(&test.options) + tc := NewTenancyManager(&test.options) assert.Equal(t, test.valid, tc.Valid(test.tenant)) }) } diff --git a/storage/tenant.go b/pkg/tenancy/context.go similarity index 98% rename from storage/tenant.go rename to pkg/tenancy/context.go index c00dfdd257d..9e0021e3659 100644 --- a/storage/tenant.go +++ b/pkg/tenancy/context.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package tenancy import "context" diff --git a/storage/tenant_test.go b/pkg/tenancy/context_test.go similarity index 99% rename from storage/tenant_test.go rename to pkg/tenancy/context_test.go index 563a511c6e3..d2d29b7ded1 100644 --- a/storage/tenant_test.go +++ b/pkg/tenancy/context_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package storage +package tenancy import ( "context" diff --git a/pkg/config/tenancy/flags.go b/pkg/tenancy/flags.go similarity index 100% rename from pkg/config/tenancy/flags.go rename to pkg/tenancy/flags.go diff --git a/pkg/config/tenancy/flags_test.go b/pkg/tenancy/flags_test.go similarity index 100% rename from pkg/config/tenancy/flags_test.go rename to pkg/tenancy/flags_test.go diff --git a/pkg/tenancy/grpc.go b/pkg/tenancy/grpc.go new file mode 100644 index 00000000000..0d269f59f78 --- /dev/null +++ b/pkg/tenancy/grpc.go @@ -0,0 +1,115 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" +) + +// tenantedServerStream is a wrapper for ServerStream providing settable context +type tenantedServerStream struct { + grpc.ServerStream + context context.Context +} + +func (tss *tenantedServerStream) Context() context.Context { + return tss.context +} + +func getValidTenant(ctx context.Context, tc *TenancyManager) (string, error) { + // Handle case where tenant is already directly in the context + tenant := GetTenant(ctx) + if tenant != "" { + if !tc.Valid(tenant) { + return tenant, status.Errorf(codes.PermissionDenied, "unknown tenant") + } + return tenant, nil + } + + // Handle case where tenant is in the context metadata + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } + + var err error + tenant, err = tenantFromMetadata(md, tc.Header) + if err != nil { + return "", err + } + if !tc.Valid(tenant) { + return tenant, status.Errorf(codes.PermissionDenied, "unknown tenant") + } + + return tenant, nil +} + +func directlyAttachedTenant(ctx context.Context) bool { + return GetTenant(ctx) != "" +} + +// NewGuardingStreamInterceptor blocks handling of streams whose tenancy header doesn't meet tenancy requirements. +// It also ensures the tenant is directly in the context, rather than context metadata. +func NewGuardingStreamInterceptor(tc *TenancyManager) grpc.StreamServerInterceptor { + return func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + tenant, err := getValidTenant(ss.Context(), tc) + if err != nil { + return err + } + + if directlyAttachedTenant(ss.Context()) { + return handler(srv, ss) + } + + // "upgrade" the tenant to be part of the context, rather than just incoming metadata + return handler(srv, &tenantedServerStream{ + ServerStream: ss, + context: WithTenant(ss.Context(), tenant), + }) + } +} + +func tenantFromMetadata(md metadata.MD, tenancyHeader string) (string, error) { + tenants := md.Get(tenancyHeader) + if len(tenants) < 1 { + return "", status.Errorf(codes.PermissionDenied, "missing tenant header") + } else if len(tenants) > 1 { + return "", status.Errorf(codes.PermissionDenied, "extra tenant header") + } + + return tenants[0], nil +} + +// NewGuardingUnaryInterceptor blocks handling of RPCs whose tenancy header doesn't meet tenancy requirements. +// It also ensures the tenant is directly in the context, rather than context metadata. +func NewGuardingUnaryInterceptor(tc *TenancyManager) grpc.UnaryServerInterceptor { + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + tenant, err := getValidTenant(ctx, tc) + if err != nil { + return nil, err + } + + if directlyAttachedTenant(ctx) { + return handler(ctx, req) + } + + return handler(WithTenant(ctx, tenant), req) + } +} diff --git a/pkg/tenancy/grpc_test.go b/pkg/tenancy/grpc_test.go new file mode 100644 index 00000000000..1bdc898f945 --- /dev/null +++ b/pkg/tenancy/grpc_test.go @@ -0,0 +1,113 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +func TestTenancyInterceptors(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + ctx context.Context + errMsg string + }{ + { + name: "missing tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + ctx: context.Background(), + errMsg: "rpc error: code = PermissionDenied desc = missing tenant header", + }, + { + name: "invalid tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: WithTenant(context.Background(), "acme"), + errMsg: "rpc error: code = PermissionDenied desc = unknown tenant", + }, + { + name: "valid tenant context", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: WithTenant(context.Background(), "acme"), + errMsg: "", + }, + { + name: "invalid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme"}}), + errMsg: "rpc error: code = PermissionDenied desc = unknown tenant", + }, + { + name: "missing tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{}), + errMsg: "rpc error: code = PermissionDenied desc = missing tenant header", + }, + { + name: "valid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme"}}), + errMsg: "", + }, + { + name: "extra tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"acme"}}), + ctx: metadata.NewIncomingContext(context.Background(), map[string][]string{"x-tenant": {"acme", "megacorp"}}), + errMsg: "rpc error: code = PermissionDenied desc = extra tenant header", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + interceptor := NewGuardingStreamInterceptor(test.tenancyMgr) + ss := tenantedServerStream{ + context: test.ctx, + } + ssi := grpc.StreamServerInfo{} + handler := func(interface{}, grpc.ServerStream) error { + // do nothing + return nil + } + err := interceptor(0, &ss, &ssi, handler) + if test.errMsg == "" { + assert.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.errMsg, err.Error()) + } + + uinterceptor := NewGuardingUnaryInterceptor(test.tenancyMgr) + usi := &grpc.UnaryServerInfo{} + iface := 0 + uhandler := func(ctx context.Context, req interface{}) (interface{}, error) { + // do nothing + return req, nil + } + _, err = uinterceptor(test.ctx, iface, usi, uhandler) + if test.errMsg == "" { + assert.NoError(t, err) + } else { + require.Error(t, err) + assert.Equal(t, test.errMsg, err.Error()) + } + }) + } +} diff --git a/pkg/tenancy/http.go b/pkg/tenancy/http.go new file mode 100644 index 00000000000..156fb45d400 --- /dev/null +++ b/pkg/tenancy/http.go @@ -0,0 +1,65 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "net/http" + + "google.golang.org/grpc/metadata" +) + +// PropagationHandler returns a http.Handler containing the logic to extract +// the tenancy header of the http.Request and insert the tenant into request.Context +// for propagation. The token can be accessed via tenancy.GetTenant(). +func ExtractTenantHTTPHandler(tc *TenancyManager, h http.Handler) http.Handler { + if !tc.Enabled { + return h + } + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + tenant := r.Header.Get(tc.Header) + if tenant == "" { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("missing tenant header")) + return + } + + if !tc.Valid(tenant) { + w.WriteHeader(http.StatusUnauthorized) + w.Write([]byte("unknown tenant")) + return + } + + ctx := WithTenant(r.Context(), tenant) + h.ServeHTTP(w, r.WithContext(ctx)) + }) +} + +// MetadataAnnotator returns a function suitable for propagating tenancy +// via github.com/grpc-ecosystem/grpc-gateway/runtime.NewServeMux +func (tc *TenancyManager) MetadataAnnotator() func(context.Context, *http.Request) metadata.MD { + return func(ctx context.Context, req *http.Request) metadata.MD { + tenant := req.Header.Get(tc.Header) + if tenant == "" { + // The HTTP request lacked the tenancy header. Pass along + // empty metadata -- the gRPC query service will reject later. + return metadata.Pairs() + } + return metadata.New(map[string]string{ + tc.Header: tenant, + }) + } +} diff --git a/pkg/tenancy/http_test.go b/pkg/tenancy/http_test.go new file mode 100644 index 00000000000..db0ba9191ca --- /dev/null +++ b/pkg/tenancy/http_test.go @@ -0,0 +1,119 @@ +// Copyright (c) 2022 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tenancy + +import ( + "context" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +type testHttpHandler struct { + reached bool +} + +func (thh *testHttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) { + thh.reached = true +} + +func TestProgationHandler(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + shouldReach bool + requestHeaders map[string][]string + }{ + { + name: "untenanted", + tenancyMgr: NewTenancyManager(&Options{}), + requestHeaders: map[string][]string{}, + shouldReach: true, + }, + { + name: "missing tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{}, + shouldReach: false, + }, + { + name: "valid tenant header", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + shouldReach: true, + }, + { + name: "unauthorized tenant", + tenancyMgr: NewTenancyManager(&Options{Enabled: true, Tenants: []string{"megacorp"}}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + shouldReach: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + handler := &testHttpHandler{} + propH := ExtractTenantHTTPHandler(test.tenancyMgr, handler) + req, err := http.NewRequest("GET", "/", strings.NewReader("")) + for k, vs := range test.requestHeaders { + for _, v := range vs { + req.Header.Add(k, v) + } + } + require.NoError(t, err) + writer := httptest.NewRecorder() + propH.ServeHTTP(writer, req) + assert.Equal(t, test.shouldReach, handler.reached) + }) + } +} + +func TestMetadataAnnotator(t *testing.T) { + tests := []struct { + name string + tenancyMgr *TenancyManager + requestHeaders map[string][]string + }{ + { + name: "missing tenant", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{}, + }, + { + name: "tenanted", + tenancyMgr: NewTenancyManager(&Options{Enabled: true}), + requestHeaders: map[string][]string{"x-tenant": {"acme"}}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + req, err := http.NewRequest("GET", "/", strings.NewReader("")) + for k, vs := range test.requestHeaders { + for _, v := range vs { + req.Header.Add(k, v) + } + } + require.NoError(t, err) + annotator := test.tenancyMgr.MetadataAnnotator() + md := annotator(context.Background(), req) + assert.Equal(t, len(test.requestHeaders), len(md)) + }) + } +} diff --git a/storage/empty_test.go b/storage/empty_test.go new file mode 100644 index 00000000000..241676d44eb --- /dev/null +++ b/storage/empty_test.go @@ -0,0 +1,15 @@ +// Copyright (c) 2018 The Jaeger Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package storage From 26a439840fcaacd490393fb920e7b5f2606ace33 Mon Sep 17 00:00:00 2001 From: Ed Snible Date: Sat, 23 Jul 2022 15:30:59 -0400 Subject: [PATCH 09/11] Tenancy for memory storage (#3827) Signed-off-by: Loc Mai --- plugin/storage/memory/factory.go | 2 +- plugin/storage/memory/memory.go | 85 ++++++++++++++------ plugin/storage/memory/memory_test.go | 114 ++++++++++++++++++++------- 3 files changed, 151 insertions(+), 50 deletions(-) diff --git a/plugin/storage/memory/factory.go b/plugin/storage/memory/factory.go index aa0282cbd86..0663f962db1 100644 --- a/plugin/storage/memory/factory.go +++ b/plugin/storage/memory/factory.go @@ -63,7 +63,7 @@ func (f *Factory) InitFromOptions(opts Options) { func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger) error { f.metricsFactory, f.logger = metricsFactory, logger f.store = WithConfiguration(f.options.Configuration) - logger.Info("Memory storage initialized", zap.Any("configuration", f.store.config)) + logger.Info("Memory storage initialized", zap.Any("configuration", f.store.defaultConfig)) f.publishOpts() return nil diff --git a/plugin/storage/memory/memory.go b/plugin/storage/memory/memory.go index b3a1a0454ca..a11c55e5be7 100644 --- a/plugin/storage/memory/memory.go +++ b/plugin/storage/memory/memory.go @@ -27,11 +27,21 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/model/adjuster" "github.com/jaegertracing/jaeger/pkg/memory/config" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) // Store is an in-memory store of traces type Store struct { + sync.RWMutex + // Each tenant gets a copy of default config. + // In the future this can be extended to contain per-tenant configuration. + defaultConfig config.Configuration + perTenant map[string]*Tenant +} + +// Tenant is an in-memory store of traces for a single tenant +type Tenant struct { sync.RWMutex ids []*model.TraceID traces map[model.TraceID]*model.Trace @@ -50,17 +60,42 @@ func NewStore() *Store { // WithConfiguration creates a new in memory storage based on the given configuration func WithConfiguration(configuration config.Configuration) *Store { return &Store{ - ids: make([]*model.TraceID, configuration.MaxTraces), + defaultConfig: configuration, + perTenant: make(map[string]*Tenant), + } +} + +func newTenant(cfg config.Configuration) *Tenant { + return &Tenant{ + ids: make([]*model.TraceID, cfg.MaxTraces), traces: map[model.TraceID]*model.Trace{}, services: map[string]struct{}{}, operations: map[string]map[spanstore.Operation]struct{}{}, deduper: adjuster.SpanIDDeduper(), - config: configuration, + config: cfg, + } +} + +// getTenant returns the per-tenant storage. Note that tenantID has already been checked for by the collector or query +func (st *Store) getTenant(tenantID string) *Tenant { + st.RLock() + tenant, ok := st.perTenant[tenantID] + st.RUnlock() + if !ok { + st.Lock() + defer st.Unlock() + tenant, ok = st.perTenant[tenantID] + if !ok { + tenant = newTenant(st.defaultConfig) + st.perTenant[tenantID] = tenant + } } + return tenant } // GetDependencies returns dependencies between services -func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { +func (st *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) // deduper used below can modify the spans, so we take an exclusive lock m.Lock() defer m.Unlock() @@ -69,9 +104,9 @@ func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback t for _, orig := range m.traces { // SpanIDDeduper never returns an err trace, _ := m.deduper.Adjust(orig) - if m.traceIsBetweenStartAndEnd(startTs, endTs, trace) { + if traceIsBetweenStartAndEnd(startTs, endTs, trace) { for _, s := range trace.Spans { - parentSpan := m.findSpan(trace, s.ParentSpanID()) + parentSpan := findSpan(trace, s.ParentSpanID()) if parentSpan != nil { if parentSpan.Process.ServiceName == s.Process.ServiceName { continue @@ -97,7 +132,7 @@ func (m *Store) GetDependencies(ctx context.Context, endTs time.Time, lookback t return retMe, nil } -func (m *Store) findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { +func findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { for _, s := range trace.Spans { if s.SpanID == spanID { return s @@ -106,7 +141,7 @@ func (m *Store) findSpan(trace *model.Trace, spanID model.SpanID) *model.Span { return nil } -func (m *Store) traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model.Trace) bool { +func traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model.Trace) bool { for _, s := range trace.Spans { if s.StartTime.After(startTs) && endTs.After(s.StartTime) { return true @@ -116,7 +151,8 @@ func (m *Store) traceIsBetweenStartAndEnd(startTs, endTs time.Time, trace *model } // WriteSpan writes the given span -func (m *Store) WriteSpan(ctx context.Context, span *model.Span) error { +func (st *Store) WriteSpan(ctx context.Context, span *model.Span) error { + m := st.getTenant(tenancy.GetTenant(ctx)) m.Lock() defer m.Unlock() if _, ok := m.operations[span.Process.ServiceName]; !ok { @@ -159,18 +195,19 @@ func (m *Store) WriteSpan(ctx context.Context, span *model.Span) error { } // GetTrace gets a trace -func (m *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { +func (st *Store) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) m.RLock() defer m.RUnlock() trace, ok := m.traces[traceID] if !ok { return nil, spanstore.ErrTraceNotFound } - return m.copyTrace(trace) + return copyTrace(trace) } // Spans may still be added to traces after they are returned to user code, so make copies. -func (m *Store) copyTrace(trace *model.Trace) (*model.Trace, error) { +func copyTrace(trace *model.Trace) (*model.Trace, error) { bytes, err := proto.Marshal(trace) if err != nil { return nil, err @@ -182,7 +219,8 @@ func (m *Store) copyTrace(trace *model.Trace) (*model.Trace, error) { } // GetServices returns a list of all known services -func (m *Store) GetServices(ctx context.Context) ([]string, error) { +func (st *Store) GetServices(ctx context.Context) ([]string, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) m.RLock() defer m.RUnlock() var retMe []string @@ -193,10 +231,11 @@ func (m *Store) GetServices(ctx context.Context) ([]string, error) { } // GetOperations returns the operations of a given service -func (m *Store) GetOperations( +func (st *Store) GetOperations( ctx context.Context, query spanstore.OperationQueryParameters, ) ([]spanstore.Operation, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) m.RLock() defer m.RUnlock() var retMe []spanstore.Operation @@ -211,13 +250,14 @@ func (m *Store) GetOperations( } // FindTraces returns all traces in the query parameters are satisfied by a trace's span -func (m *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { +func (st *Store) FindTraces(ctx context.Context, query *spanstore.TraceQueryParameters) ([]*model.Trace, error) { + m := st.getTenant(tenancy.GetTenant(ctx)) m.RLock() defer m.RUnlock() var retMe []*model.Trace for _, trace := range m.traces { - if m.validTrace(trace, query) { - copied, err := m.copyTrace(trace) + if validTrace(trace, query) { + copied, err := copyTrace(trace) if err != nil { return nil, err } @@ -243,9 +283,9 @@ func (m *Store) FindTraceIDs(ctx context.Context, query *spanstore.TraceQueryPar return nil, errors.New("not implemented") } -func (m *Store) validTrace(trace *model.Trace, query *spanstore.TraceQueryParameters) bool { +func validTrace(trace *model.Trace, query *spanstore.TraceQueryParameters) bool { for _, span := range trace.Spans { - if m.validSpan(span, query) { + if validSpan(span, query) { return true } } @@ -261,7 +301,7 @@ func findKeyValueMatch(kvs model.KeyValues, key, value string) (model.KeyValue, return model.KeyValue{}, false } -func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameters) bool { +func validSpan(span *model.Span, query *spanstore.TraceQueryParameters) bool { if query.ServiceName != span.Process.ServiceName { return false } @@ -280,7 +320,7 @@ func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameter if !query.StartTimeMax.IsZero() && span.StartTime.After(query.StartTimeMax) { return false } - spanKVs := m.flattenTags(span) + spanKVs := flattenTags(span) for queryK, queryV := range query.Tags { // (NB): we cannot use the KeyValues.FindKey function because there can be multiple tags with the same key if _, ok := findKeyValueMatch(spanKVs, queryK, queryV); !ok { @@ -290,8 +330,9 @@ func (m *Store) validSpan(span *model.Span, query *spanstore.TraceQueryParameter return true } -func (m *Store) flattenTags(span *model.Span) model.KeyValues { - retMe := span.Tags +func flattenTags(span *model.Span) model.KeyValues { + retMe := []model.KeyValue{} + retMe = append(retMe, span.Tags...) retMe = append(retMe, span.Process.Tags...) for _, l := range span.Logs { retMe = append(retMe, l.Fields...) diff --git a/plugin/storage/memory/memory_test.go b/plugin/storage/memory/memory_test.go index ff6a75e2dcb..21d92ec26f1 100644 --- a/plugin/storage/memory/memory_test.go +++ b/plugin/storage/memory/memory_test.go @@ -24,34 +24,19 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/memory/config" + "github.com/jaegertracing/jaeger/pkg/tenancy" "github.com/jaegertracing/jaeger/storage/spanstore" ) -var traceID = model.NewTraceID(1, 2) +var ( + traceID = model.NewTraceID(1, 2) + testingSpan = makeTestingSpan(traceID, "") +) -var testingSpan = &model.Span{ - TraceID: traceID, - SpanID: model.NewSpanID(1), - Process: &model.Process{ - ServiceName: "serviceName", - Tags: []model.KeyValue(nil), - }, - OperationName: "operationName", - Tags: model.KeyValues{ - model.String("tagKey", "tagValue"), - model.String("span.kind", "client"), - }, - Logs: []model.Log{ - { - Timestamp: time.Now().UTC(), - Fields: []model.KeyValue{ - model.String("logKey", "logValue"), - }, - }, - }, - Duration: time.Second * 5, - StartTime: time.Unix(300, 0).UTC(), -} +var ( + traceID2 = model.NewTraceID(2, 3) + testingSpan2 = makeTestingSpan(traceID2, "2") +) var childSpan1 = &model.Span{ TraceID: traceID, @@ -147,6 +132,7 @@ func withMemoryStore(f func(store *Store)) { } func TestStoreGetEmptyDependencies(t *testing.T) { + // assert.Equal(t, testingSpan, testingSpan1B) // @@@ withMemoryStore(func(store *Store) { links, err := store.GetDependencies(context.Background(), time.Now(), time.Hour) assert.NoError(t, err) @@ -206,8 +192,8 @@ func TestStoreWithLimit(t *testing.T) { assert.NoError(t, err) } - assert.Equal(t, maxTraces, len(store.traces)) - assert.Equal(t, maxTraces, len(store.ids)) + assert.Equal(t, maxTraces, len(store.getTenant("").traces)) + assert.Equal(t, maxTraces, len(store.getTenant("").ids)) } func TestStoreGetTraceSuccess(t *testing.T) { @@ -239,7 +225,7 @@ func TestStoreGetAndMutateTrace(t *testing.T) { func TestStoreGetTraceError(t *testing.T) { withPopulatedMemoryStore(func(store *Store) { - store.traces[testingSpan.TraceID] = &model.Trace{ + store.getTenant("").traces[testingSpan.TraceID] = &model.Trace{ Spans: []*model.Span{nonSerializableSpan}, } _, err := store.GetTrace(context.Background(), testingSpan.TraceID) @@ -463,3 +449,77 @@ func TestStore_FindTraceIDs(t *testing.T) { assert.EqualError(t, err, "not implemented") }) } + +func TestTenantStore(t *testing.T) { + withMemoryStore(func(store *Store) { + ctxAcme := tenancy.WithTenant(context.Background(), "acme") + ctxWonka := tenancy.WithTenant(context.Background(), "wonka") + + assert.NoError(t, store.WriteSpan(ctxAcme, testingSpan)) + assert.NoError(t, store.WriteSpan(ctxWonka, testingSpan2)) + + // Can we retrieve the spans with correct tenancy + trace1, err := store.GetTrace(ctxAcme, testingSpan.TraceID) + assert.NoError(t, err) + assert.Len(t, trace1.Spans, 1) + assert.Equal(t, testingSpan, trace1.Spans[0]) + + trace2, err := store.GetTrace(ctxWonka, testingSpan2.TraceID) + assert.NoError(t, err) + assert.Len(t, trace2.Spans, 1) + assert.Equal(t, testingSpan2, trace2.Spans[0]) + + // Can we query the spans with correct tenancy + traces1, err := store.FindTraces(ctxAcme, &spanstore.TraceQueryParameters{ + ServiceName: "serviceName", + }) + assert.NoError(t, err) + assert.Len(t, traces1, 1) + assert.Len(t, traces1[0].Spans, 1) + assert.Equal(t, testingSpan, traces1[0].Spans[0]) + + traces2, err := store.FindTraces(ctxWonka, &spanstore.TraceQueryParameters{ + ServiceName: "serviceName2", + }) + assert.NoError(t, err) + assert.Len(t, traces2, 1) + assert.Len(t, traces2[0].Spans, 1) + assert.Equal(t, testingSpan2, traces2[0].Spans[0]) + + // Do the spans fail with incorrect tenancy? + _, err = store.GetTrace(ctxAcme, testingSpan2.TraceID) + assert.Error(t, err) + + _, err = store.GetTrace(ctxWonka, testingSpan.TraceID) + assert.Error(t, err) + + _, err = store.GetTrace(context.Background(), testingSpan.TraceID) + assert.Error(t, err) + }) +} + +func makeTestingSpan(traceID model.TraceID, suffix string) *model.Span { + return &model.Span{ + TraceID: traceID, + SpanID: model.NewSpanID(1), + Process: &model.Process{ + ServiceName: "serviceName" + suffix, + Tags: []model.KeyValue(nil), + }, + OperationName: "operationName" + suffix, + Tags: model.KeyValues{ + model.String("tagKey", "tagValue"+suffix), + model.String("span.kind", "client"+suffix), + }, + Logs: []model.Log{ + { + Timestamp: time.Now().UTC(), + Fields: []model.KeyValue{ + model.String("logKey", "logValue"+suffix), + }, + }, + }, + Duration: time.Second * 5, + StartTime: time.Unix(300, 0).UTC(), + } +} From 94ca83fa24087f42b4c3db9ac151e66b58af2c64 Mon Sep 17 00:00:00 2001 From: Loc Mai Date: Mon, 25 Jul 2022 15:37:58 +0700 Subject: [PATCH 10/11] chore: add sanitizer on span processor Signed-off-by: Loc Mai --- .../es/spanstore/dbmodel/from_domain_test.go | 24 ------------------- 1 file changed, 24 deletions(-) diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go index db0b0e12dab..b6d711403b7 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go @@ -108,18 +108,13 @@ func TestTagMap(t *testing.T) { assert.Equal(t, tagsMap, dbSpan.Process.Tag) } -<<<<<<< HEAD -func TestConvertProcess(t *testing.T) { -======= func TestNilProcess(t *testing.T) { ->>>>>>> d21e22d0 (fix: check nil process to avoid nil pointer) tags := []model.KeyValue{ model.String("foo", "foo"), model.Bool("a", true), model.Int64("b.b", 1), } -<<<<<<< HEAD spanWithNilTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} converter := NewFromDomain(false, []string{}, ":") @@ -131,25 +126,6 @@ func TestNilProcess(t *testing.T) { nilMap := map[string]interface{}(nil) assert.Equal(t, nilMap, dbSpanWithNilTags.Tag) assert.Equal(t, nilMap, dbSpanWithNilTags.Process.Tag) -======= - spanWithNilProcessTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} - spanWithNilProcess := model.Span{Tags: tags, Process: nil} - - converter := NewFromDomain(false, nil, ":") - dbSpanWithNilTags := converter.FromDomainEmbedProcess(&spanWithNilProcessTags) - dbSpanWithNilProcess := converter.FromDomainEmbedProcess(&spanWithNilProcess) - - assert.Equal(t, 3, len(dbSpanWithNilTags.Tags)) - assert.Equal(t, 3, len(dbSpanWithNilProcess.Tags)) - assert.Equal(t, 0, len(dbSpanWithNilTags.Process.Tags)) - assert.Equal(t, 0, len(dbSpanWithNilProcess.Process.Tags)) - - tagsMap := map[string]interface{}(nil) - assert.Equal(t, tagsMap, dbSpanWithNilTags.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilProcess.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilTags.Process.Tag) - assert.Equal(t, tagsMap, dbSpanWithNilProcess.Process.Tag) ->>>>>>> d21e22d0 (fix: check nil process to avoid nil pointer) } func TestConvertKeyValueValue(t *testing.T) { From 9d12426c3cb439151d6792ef86232b92f115b897 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Tue, 26 Jul 2022 13:00:17 -0400 Subject: [PATCH 11/11] Simplify the fix Signed-off-by: Yuri Shkuro --- cmd/ingester/app/builder/builder.go | 2 -- cmd/ingester/app/processor/span_processor.go | 3 +- .../app/processor/span_processor_test.go | 33 +++++++++++-------- .../es/spanstore/dbmodel/from_domain.go | 4 +-- .../es/spanstore/dbmodel/from_domain_test.go | 20 ----------- 5 files changed, 22 insertions(+), 40 deletions(-) diff --git a/cmd/ingester/app/builder/builder.go b/cmd/ingester/app/builder/builder.go index 9f6d9b7c73f..d984b5dcd03 100644 --- a/cmd/ingester/app/builder/builder.go +++ b/cmd/ingester/app/builder/builder.go @@ -20,7 +20,6 @@ import ( "go.uber.org/zap" - "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" "github.com/jaegertracing/jaeger/cmd/ingester/app" "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer" "github.com/jaegertracing/jaeger/cmd/ingester/app/processor" @@ -48,7 +47,6 @@ func CreateConsumer(logger *zap.Logger, metricsFactory metrics.Factory, spanWrit spParams := processor.SpanProcessorParams{ Writer: spanWriter, Unmarshaller: unmarshaller, - Sanitizers: sanitizer.NewStandardSanitizers(), } spanProcessor := processor.NewSpanProcessor(spParams) diff --git a/cmd/ingester/app/processor/span_processor.go b/cmd/ingester/app/processor/span_processor.go index 8a059ac0aec..4817103384b 100644 --- a/cmd/ingester/app/processor/span_processor.go +++ b/cmd/ingester/app/processor/span_processor.go @@ -41,7 +41,6 @@ type Message interface { type SpanProcessorParams struct { Writer spanstore.Writer Unmarshaller kafka.Unmarshaller - Sanitizers []sanitizer.SanitizeSpan } // KafkaSpanProcessor implements SpanProcessor for Kafka messages @@ -57,7 +56,7 @@ func NewSpanProcessor(params SpanProcessorParams) *KafkaSpanProcessor { return &KafkaSpanProcessor{ unmarshaller: params.Unmarshaller, writer: params.Writer, - sanitizer: sanitizer.NewChainedSanitizer(params.Sanitizers...), + sanitizer: sanitizer.NewChainedSanitizer(sanitizer.NewStandardSanitizers()...), } } diff --git a/cmd/ingester/app/processor/span_processor_test.go b/cmd/ingester/app/processor/span_processor_test.go index d2f9ad66ecf..5df0159b7a6 100644 --- a/cmd/ingester/app/processor/span_processor_test.go +++ b/cmd/ingester/app/processor/span_processor_test.go @@ -20,8 +20,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" - "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer" cmocks "github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/mocks" "github.com/jaegertracing/jaeger/model" umocks "github.com/jaegertracing/jaeger/pkg/kafka/mocks" @@ -34,27 +34,32 @@ func TestNewSpanProcessor(t *testing.T) { } func TestSpanProcessor_Process(t *testing.T) { - writer := &smocks.Writer{} - unmarshallerMock := &umocks.Unmarshaller{} - sanitizer := sanitizer.NewChainedSanitizer() - processor := &KafkaSpanProcessor{ - unmarshaller: unmarshallerMock, - writer: writer, - sanitizer: sanitizer, - } + mockUnmarshaller := &umocks.Unmarshaller{} + mockWriter := &smocks.Writer{} + processor := NewSpanProcessor(SpanProcessorParams{ + Unmarshaller: mockUnmarshaller, + Writer: mockWriter, + }) message := &cmocks.Message{} - data := []byte("police") - span := &model.Span{} + data := []byte("irrelevant, mock unmarshaller should return the span") + span := &model.Span{ + Process: nil, // we want to make sure sanitizers will fix this data issue. + } message.On("Value").Return(data) - unmarshallerMock.On("Unmarshal", data).Return(span, nil) - writer.On("WriteSpan", context.Background(), span).Return(nil) + mockUnmarshaller.On("Unmarshal", data).Return(span, nil) + mockWriter.On("WriteSpan", context.Background(), span). + Return(nil). + Run(func(args mock.Arguments) { + span := args[1].(*model.Span) + assert.NotNil(t, span.Process, "sanitizer must fix Process=nil data issue") + }) assert.Nil(t, processor.Process(message)) message.AssertExpectations(t) - writer.AssertExpectations(t) + mockWriter.AssertExpectations(t) } func TestSpanProcessor_ProcessError(t *testing.T) { diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain.go b/plugin/storage/es/spanstore/dbmodel/from_domain.go index afa8c6222bf..a3939d83cb5 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain.go @@ -120,9 +120,9 @@ func (fd FromDomain) convertLogs(logs []model.Log) []Log { } func (fd FromDomain) convertProcess(process *model.Process) Process { - tags, tagsMap := fd.convertKeyValuesString(process.GetTags()) + tags, tagsMap := fd.convertKeyValuesString(process.Tags) return Process{ - ServiceName: process.GetServiceName(), + ServiceName: process.ServiceName, Tags: tags, Tag: tagsMap, } diff --git a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go index b6d711403b7..8fc0e910cbf 100644 --- a/plugin/storage/es/spanstore/dbmodel/from_domain_test.go +++ b/plugin/storage/es/spanstore/dbmodel/from_domain_test.go @@ -108,26 +108,6 @@ func TestTagMap(t *testing.T) { assert.Equal(t, tagsMap, dbSpan.Process.Tag) } -func TestNilProcess(t *testing.T) { - tags := []model.KeyValue{ - model.String("foo", "foo"), - model.Bool("a", true), - model.Int64("b.b", 1), - } - - spanWithNilTags := model.Span{Tags: tags, Process: &model.Process{Tags: nil}} - - converter := NewFromDomain(false, []string{}, ":") - dbSpanWithNilTags := converter.FromDomainEmbedProcess(&spanWithNilTags) - - assert.Len(t, dbSpanWithNilTags.Tags, 3) - assert.Len(t, dbSpanWithNilTags.Process.Tags, 0) - - nilMap := map[string]interface{}(nil) - assert.Equal(t, nilMap, dbSpanWithNilTags.Tag) - assert.Equal(t, nilMap, dbSpanWithNilTags.Process.Tag) -} - func TestConvertKeyValueValue(t *testing.T) { longString := `Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues Bender Bending Rodrigues