Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[jaeger-v2] Consolidate v1 and v2 Configurations for GRPC Storage #6042

Merged
merged 7 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/jaeger/internal/extension/jaegerstorage/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type Config struct {
type Backend struct {
Memory *memory.Configuration `mapstructure:"memory"`
Badger *badger.Config `mapstructure:"badger"`
GRPC *grpc.ConfigV2 `mapstructure:"grpc"`
GRPC *grpc.Config `mapstructure:"grpc"`
Cassandra *cassandra.Options `mapstructure:"cassandra"`
Elasticsearch *esCfg.Configuration `mapstructure:"elasticsearch"`
Opensearch *esCfg.Configuration `mapstructure:"opensearch"`
Expand All @@ -66,7 +66,7 @@ func (cfg *Backend) Unmarshal(conf *confmap.Conf) error {
cfg.Badger = v
}
if conf.IsSet("grpc") {
v := grpc.DefaultConfigV2()
v := grpc.DefaultConfig()
cfg.GRPC = &v
}
if conf.IsSet("cassandra") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ func TestGRPC(t *testing.T) {
ext := makeStorageExtenion(t, &Config{
Backends: map[string]Backend{
"foo": {
GRPC: &grpc.ConfigV2{
GRPC: &grpc.Config{
ClientConfig: configgrpc.ClientConfig{
Endpoint: "localhost:12345",
},
Expand Down
31 changes: 4 additions & 27 deletions plugin/storage/grpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,51 +4,28 @@
package grpc

import (
"time"

"go.opentelemetry.io/collector/config/configgrpc"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/jaegertracing/jaeger/pkg/config/tlscfg"
"github.com/jaegertracing/jaeger/pkg/tenancy"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
)

// Configuration describes the options to customize the storage behavior.
type Configuration struct {
RemoteServerAddr string `yaml:"server" mapstructure:"server"`
RemoteTLS tlscfg.Options
RemoteConnectTimeout time.Duration `yaml:"connection-timeout" mapstructure:"connection-timeout"`
TenancyOpts tenancy.Options
}

type ConfigV2 struct {
// Config describes the options to customize the storage behavior
type Config struct {
Tenancy tenancy.Options `mapstructure:"multi_tenancy"`
configgrpc.ClientConfig `mapstructure:",squash"`
exporterhelper.TimeoutSettings `mapstructure:",squash"`
}

func DefaultConfigV2() ConfigV2 {
return ConfigV2{
func DefaultConfig() Config {
return Config{
TimeoutSettings: exporterhelper.TimeoutConfig{
Timeout: defaultConnectionTimeout,
},
}
}

func (c *Configuration) TranslateToConfigV2() *ConfigV2 {
return &ConfigV2{
Tenancy: c.TenancyOpts,
ClientConfig: configgrpc.ClientConfig{
Endpoint: c.RemoteServerAddr,
TLSSetting: c.RemoteTLS.ToOtelClientConfig(),
},
TimeoutSettings: exporterhelper.TimeoutConfig{
Timeout: c.RemoteConnectTimeout,
},
}
}

// ClientPluginServices defines services plugin can expose and its capabilities
type ClientPluginServices struct {
shared.PluginServices
Expand Down
4 changes: 2 additions & 2 deletions plugin/storage/grpc/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestDefaultConfigV2(t *testing.T) {
cfg := DefaultConfigV2()
func TestDefaultConfig(t *testing.T) {
cfg := DefaultConfig()
assert.NotEmpty(t, cfg.Timeout)
}
30 changes: 10 additions & 20 deletions plugin/storage/grpc/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,9 @@ type Factory struct {
metricsFactory metrics.Factory
logger *zap.Logger
tracerProvider trace.TracerProvider

// configV1 is used for backward compatibility. it will be removed in v2.
// In the main initialization logic, only configV2 is used.
configV1 Configuration
configV2 *ConfigV2

services *ClientPluginServices
remoteConn *grpc.ClientConn
config Config
services *ClientPluginServices
remoteConn *grpc.ClientConn
}

// NewFactory creates a new Factory.
Expand All @@ -61,12 +56,12 @@ func NewFactory() *Factory {

// NewFactoryWithConfig is used from jaeger(v2).
func NewFactoryWithConfig(
cfg ConfigV2,
cfg Config,
metricsFactory metrics.Factory,
logger *zap.Logger,
) (*Factory, error) {
f := NewFactory()
f.configV2 = &cfg
f.config = cfg
if err := f.Initialize(metricsFactory, logger); err != nil {
return nil, err
}
Expand All @@ -75,12 +70,12 @@ func NewFactoryWithConfig(

// AddFlags implements plugin.Configurable
func (*Factory) AddFlags(flagSet *flag.FlagSet) {
v1AddFlags(flagSet)
addFlags(flagSet)
}

// InitFromViper implements plugin.Configurable
func (f *Factory) InitFromViper(v *viper.Viper, logger *zap.Logger) {
if err := v1InitFromViper(&f.configV1, v); err != nil {
if err := initFromViper(&f.config, v); err != nil {
logger.Fatal("unable to initialize gRPC storage factory", zap.Error(err))
}
}
Expand All @@ -90,10 +85,6 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
f.metricsFactory, f.logger = metricsFactory, logger
f.tracerProvider = otel.GetTracerProvider()

if f.configV2 == nil {
f.configV2 = f.configV1.TranslateToConfigV2()
}

telset := component.TelemetrySettings{
Logger: logger,
TracerProvider: f.tracerProvider,
Expand All @@ -107,22 +98,22 @@ func (f *Factory) Initialize(metricsFactory metrics.Factory, logger *zap.Logger)
for _, opt := range opts {
clientOpts = append(clientOpts, configgrpc.WithGrpcDialOption(opt))
}
return f.configV2.ToClientConnWithOptions(context.Background(), componenttest.NewNopHost(), telset, clientOpts...)
return f.config.ToClientConnWithOptions(context.Background(), componenttest.NewNopHost(), telset, clientOpts...)
}

var err error
f.services, err = f.newRemoteStorage(telset, newClientFn)
if err != nil {
return fmt.Errorf("grpc storage builder failed to create a store: %w", err)
}
logger.Info("Remote storage configuration", zap.Any("configuration", f.configV2))
logger.Info("Remote storage configuration", zap.Any("configuration", f.config))
return nil
}

type newClientFn func(opts ...grpc.DialOption) (*grpc.ClientConn, error)

func (f *Factory) newRemoteStorage(telset component.TelemetrySettings, newClient newClientFn) (*ClientPluginServices, error) {
c := f.configV2
c := f.config
opts := []grpc.DialOption{
grpc.WithStatsHandler(otelgrpc.NewClientHandler(otelgrpc.WithTracerProvider(telset.TracerProvider))),
}
Expand Down Expand Up @@ -208,6 +199,5 @@ func (f *Factory) Close() error {
if f.remoteConn != nil {
errs = append(errs, f.remoteConn.Close())
}
errs = append(errs, f.configV1.RemoteTLS.Close())
return errors.Join(errs...)
}
10 changes: 5 additions & 5 deletions plugin/storage/grpc/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func makeFactory(t *testing.T) *Factory {
}

func TestNewFactoryError(t *testing.T) {
cfg := &ConfigV2{
cfg := &Config{
ClientConfig: configgrpc.ClientConfig{
// non-empty Auth is currently not supported
Auth: &configauth.Authentication{},
Expand All @@ -113,15 +113,15 @@ func TestNewFactoryError(t *testing.T) {
t.Run("viper", func(t *testing.T) {
f := NewFactory()
f.InitFromViper(viper.New(), zap.NewNop())
f.configV2 = cfg
f.config = *cfg
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove & from declaration and * from here

err := f.Initialize(metrics.NullFactory, zap.NewNop())
require.Error(t, err)
assert.Contains(t, err.Error(), "authenticator")
})

t.Run("client", func(t *testing.T) {
// this is a silly test to verify handling of error from grpc.NewClient, which cannot be induced via params.
f, err := NewFactoryWithConfig(ConfigV2{}, metrics.NullFactory, zap.NewNop())
f, err := NewFactoryWithConfig(Config{}, metrics.NullFactory, zap.NewNop())
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, f.Close()) })
newClientFn := func(_ ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
Expand Down Expand Up @@ -162,7 +162,7 @@ func TestGRPCStorageFactoryWithConfig(t *testing.T) {
}()
defer s.Stop()

cfg := ConfigV2{
cfg := Config{
ClientConfig: configgrpc.ClientConfig{
Endpoint: lis.Addr().String(),
},
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestWithCLIFlags(t *testing.T) {
})
require.NoError(t, err)
f.InitFromViper(v, zap.NewNop())
assert.Equal(t, "foo:1234", f.configV1.RemoteServerAddr)
assert.Equal(t, "foo:1234", f.config.ClientConfig.Endpoint)
require.NoError(t, f.Close())
}

Expand Down
16 changes: 8 additions & 8 deletions plugin/storage/grpc/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,22 +27,22 @@ func tlsFlagsConfig() tlscfg.ClientFlagsConfig {
}
}

// AddFlags adds flags for Options
func v1AddFlags(flagSet *flag.FlagSet) {
// addFlags adds flags for Options
func addFlags(flagSet *flag.FlagSet) {
tlsFlagsConfig().AddFlags(flagSet)

flagSet.String(remoteServer, "", "The remote storage gRPC server address as host:port")
flagSet.Duration(remoteConnectionTimeout, defaultConnectionTimeout, "The remote storage gRPC server connection timeout")
}

func v1InitFromViper(cfg *Configuration, v *viper.Viper) error {
cfg.RemoteServerAddr = v.GetString(remoteServer)
var err error
cfg.RemoteTLS, err = tlsFlagsConfig().InitFromViper(v)
func initFromViper(cfg *Config, v *viper.Viper) error {
cfg.ClientConfig.Endpoint = v.GetString(remoteServer)
remoteTLS, err := tlsFlagsConfig().InitFromViper(v)
if err != nil {
return fmt.Errorf("failed to parse gRPC storage TLS options: %w", err)
}
cfg.RemoteConnectTimeout = v.GetDuration(remoteConnectionTimeout)
cfg.TenancyOpts = tenancy.InitFromViper(v)
cfg.ClientConfig.TLSSetting = remoteTLS.ToOtelClientConfig()
cfg.TimeoutSettings.Timeout = v.GetDuration(remoteConnectionTimeout)
cfg.Tenancy = tenancy.InitFromViper(v)
return nil
}
39 changes: 19 additions & 20 deletions plugin/storage/grpc/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,61 +18,60 @@ import (
)

func TestOptionsWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags, tenancy.AddFlags)
v, command := config.Viperize(addFlags, tenancy.AddFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=foo:12345",
"--multi-tenancy.header=x-scope-orgid",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "foo:12345", cfg.RemoteServerAddr)
assert.False(t, cfg.TenancyOpts.Enabled)
assert.Equal(t, "x-scope-orgid", cfg.TenancyOpts.Header)
assert.Equal(t, "foo:12345", cfg.ClientConfig.Endpoint)
assert.False(t, cfg.Tenancy.Enabled)
assert.Equal(t, "x-scope-orgid", cfg.Tenancy.Header)
}

func TestRemoteOptionsWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=true",
"--grpc-storage.connection-timeout=60s",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr)
assert.True(t, cfg.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout)
assert.Equal(t, "localhost:2001", cfg.ClientConfig.Endpoint)
assert.False(t, cfg.ClientConfig.TLSSetting.Insecure)
assert.Equal(t, 60*time.Second, cfg.TimeoutSettings.Timeout)
}

func TestRemoteOptionsNoTLSWithFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.server=localhost:2001",
"--grpc-storage.tls.enabled=false",
"--grpc-storage.connection-timeout=60s",
})
require.NoError(t, err)
var cfg Configuration
require.NoError(t, v1InitFromViper(&cfg, v))
var cfg Config
require.NoError(t, initFromViper(&cfg, v))

assert.Equal(t, "localhost:2001", cfg.RemoteServerAddr)
assert.False(t, cfg.RemoteTLS.Enabled)
assert.Equal(t, 60*time.Second, cfg.RemoteConnectTimeout)
assert.Equal(t, "localhost:2001", cfg.ClientConfig.Endpoint)
assert.True(t, cfg.ClientConfig.TLSSetting.Insecure)
assert.Equal(t, 60*time.Second, cfg.TimeoutSettings.Timeout)
}

func TestFailedTLSFlags(t *testing.T) {
v, command := config.Viperize(v1AddFlags)
v, command := config.Viperize(addFlags)
err := command.ParseFlags([]string{
"--grpc-storage.tls.enabled=false",
"--grpc-storage.tls.cert=blah", // invalid unless tls.enabled=true
})
require.NoError(t, err)
f := NewFactory()
f.configV2 = nil
core, logs := observer.New(zap.NewAtomicLevelAt(zapcore.ErrorLevel))
logger := zap.New(core, zap.WithFatalHook(zapcore.WriteThenPanic))
require.Panics(t, func() { f.InitFromViper(v, logger) })
Expand Down
Loading