Skip to content

Commit

Permalink
Extract ConfigFactory in a ParserProvider interface
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <bogdandrutu@gmail.com>
  • Loading branch information
bogdandrutu committed Apr 12, 2021
1 parent a31ad5b commit 3be3fa7
Show file tree
Hide file tree
Showing 15 changed files with 479 additions and 251 deletions.
2 changes: 1 addition & 1 deletion config/configparser/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Package configparser implements loading of configuration from Viper configuration.
// Package configparser implements configuration loading from a config.Parser.
// The implementation relies on registered factories that allow creating
// default configuration for each type of receiver/exporter/processor.
package configparser
Expand Down
75 changes: 28 additions & 47 deletions service/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package service

import (
"context"
"errors"
"flag"
"fmt"
"os"
Expand All @@ -30,13 +29,13 @@ import (
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/internal/collector/telemetry"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/parserprovider"
)

const (
Expand Down Expand Up @@ -66,7 +65,7 @@ type Application struct {

factories component.Factories

configFactory ConfigFactory
parserProvider parserprovider.ParserProvider

// stopTestChan is used to terminate the application in end to end tests.
stopTestChan chan struct{}
Expand All @@ -84,58 +83,25 @@ type Parameters struct {
Factories component.Factories
// ApplicationStartInfo provides application start information.
ApplicationStartInfo component.ApplicationStartInfo
// ConfigFactory that creates the configuration.
// If it is not provided the default factory (FileLoaderConfigFactory) is used.
// The default factory loads the configuration file and overrides component's configuration
// ParserProvider provides the configuration's Parser.
// If it is not provided a default provider is used. The default provider loads the configuration
// from a config file define by the --config command line flag and overrides component's configuration
// properties supplied via --set command line flag.
ConfigFactory ConfigFactory
ParserProvider parserprovider.ParserProvider
// LoggingOptions provides a way to change behavior of zap logging.
LoggingOptions []zap.Option
}

// ConfigFactory creates config.
// The ConfigFactory implementation should call AddSetFlagProperties to enable configuration passed via `--set` flag.
// Viper and command instances are passed from the Application.
// The factories also belong to the Application and are equal to the factories passed via Parameters.
type ConfigFactory func(cmd *cobra.Command, factories component.Factories) (*config.Config, error)

// FileLoaderConfigFactory implements ConfigFactory and it creates configuration from file
// and from --set command line flag (if the flag is present).
func FileLoaderConfigFactory(cmd *cobra.Command, factories component.Factories) (*config.Config, error) {
file := builder.GetConfigFile()
if file == "" {
return nil, errors.New("config file not specified")
}

cp, err := config.NewParserFromFile(file)
if err != nil {
return nil, fmt.Errorf("error loading config file %q: %v", file, err)
}

// next overlay the config file with --set flags
if err := AddSetFlagProperties(cp, cmd); err != nil {
return nil, fmt.Errorf("failed to process set flag: %v", err)
}
return configparser.Load(cp, factories)
}

// New creates and returns a new instance of Application.
func New(params Parameters) (*Application, error) {
if err := configcheck.ValidateConfigFromFactories(params.Factories); err != nil {
return nil, err
}

configFactory := params.ConfigFactory
if configFactory == nil {
// use default factory that loads the configuration file
configFactory = FileLoaderConfigFactory
}

app := &Application{
info: params.ApplicationStartInfo,
factories: params.Factories,
stateChannel: make(chan State, Closed+1),
configFactory: configFactory,
info: params.ApplicationStartInfo,
factories: params.Factories,
stateChannel: make(chan State, Closed+1),
}

rootCmd := &cobra.Command{
Expand All @@ -159,6 +125,7 @@ func New(params Parameters) (*Application, error) {
flagSet := new(flag.FlagSet)
addFlagsFns := []func(*flag.FlagSet){
configtelemetry.Flags,
parserprovider.Flags,
telemetry.Flags,
builder.Flags,
loggerFlags,
Expand All @@ -167,10 +134,15 @@ func New(params Parameters) (*Application, error) {
addFlags(flagSet)
}
rootCmd.Flags().AddGoFlagSet(flagSet)
addSetFlag(rootCmd.Flags())

app.rootCmd = rootCmd

parserProvider := params.ParserProvider
if parserProvider == nil {
// use default provider.
parserProvider = parserprovider.Default()
}
app.parserProvider = parserProvider

return app, nil
}

Expand Down Expand Up @@ -249,11 +221,20 @@ func (app *Application) runAndWaitForShutdownEvent() {
func (app *Application) setupConfigurationComponents(ctx context.Context) error {
app.logger.Info("Loading configuration...")

cfg, err := app.configFactory(app.rootCmd, app.factories)
cp, err := app.parserProvider.Get()
if err != nil {
return fmt.Errorf("cannot load configuration's parser: %w", err)
}

cfg, err := configparser.Load(cp, app.factories)
if err != nil {
return fmt.Errorf("cannot load configuration: %w", err)
}

if err = cfg.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}

app.logger.Info("Applying configuration...")

service, err := newService(&settings{
Expand Down Expand Up @@ -339,7 +320,7 @@ func (app *Application) createMemoryBallast() ([]byte, uint64) {
}

// updateService shutdowns the current app.service and setups a new one according
// to the latest configuration. It requires that app.configFactory and app.factories
// to the latest configuration. It requires that app.parserProvider and app.factories
// are properly populated to finish successfully.
func (app *Application) updateService(ctx context.Context) error {
if app.service != nil {
Expand Down
162 changes: 28 additions & 134 deletions service/application_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,32 +19,25 @@ import (
"bufio"
"context"
"errors"
"flag"
"fmt"
"net/http"
"runtime"
"sort"
"strconv"
"strings"
"syscall"
"testing"
"time"

"github.com/prometheus/common/expfmt"
"github.com/spf13/cobra"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/config/configparser"
"go.opentelemetry.io/collector/processor/attributesprocessor"
"go.opentelemetry.io/collector/processor/batchprocessor"
"go.opentelemetry.io/collector/receiver/jaegerreceiver"
"go.opentelemetry.io/collector/service/defaultcomponents"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/parserprovider"
"go.opentelemetry.io/collector/testutil"
)

Expand Down Expand Up @@ -149,10 +142,8 @@ func TestApplication_StartAsGoRoutine(t *testing.T) {

params := Parameters{
ApplicationStartInfo: component.DefaultApplicationStartInfo(),
ConfigFactory: func(_ *cobra.Command, factories component.Factories) (*config.Config, error) {
return constructMimumalOpConfig(t, factories), nil
},
Factories: factories,
ParserProvider: new(minimalParserLoader),
Factories: factories,
}
app, err := New(params)
require.NoError(t, err)
Expand Down Expand Up @@ -225,105 +216,9 @@ func assertMetrics(t *testing.T, prefix string, metricsPort uint16, mandatoryLab
}
}

func TestSetFlag(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)
params := Parameters{
Factories: factories,
}
t.Run("unknown_component", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)
err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.doesnotexist.timeout=2s",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.rootCmd, factories)
require.Error(t, err)
require.Nil(t, cfg)

})
t.Run("component_not_added_to_pipeline", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)
err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch/foo.timeout=2s",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.rootCmd, factories)
require.NoError(t, err)
assert.NotNil(t, cfg)
err = cfg.Validate()
require.NoError(t, err)

var processors []string
for k := range cfg.Processors {
processors = append(processors, k)
}
sort.Strings(processors)
// batch/foo is not added to the pipeline
assert.Equal(t, []string{"attributes", "batch", "batch/foo"}, processors)
assert.Equal(t, []string{"attributes", "batch"}, cfg.Service.Pipelines["traces"].Processors)
})
t.Run("ok", func(t *testing.T) {
app, err := New(params)
require.NoError(t, err)

err = app.rootCmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch.timeout=2s",
// Arrays are overridden and object arrays cannot be indexed
// this creates actions array of size 1
"--set=processors.attributes.actions.key=foo",
"--set=processors.attributes.actions.value=bar",
"--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345",
"--set=extensions.health_check.endpoint=localhost:8080",
})
require.NoError(t, err)
cfg, err := FileLoaderConfigFactory(app.rootCmd, factories)
require.NoError(t, err)
require.NotNil(t, cfg)
err = cfg.Validate()
require.NoError(t, err)

assert.Equal(t, 2, len(cfg.Processors))
batch := cfg.Processors["batch"].(*batchprocessor.Config)
assert.Equal(t, time.Second*2, batch.Timeout)
jaeger := cfg.Receivers["jaeger"].(*jaegerreceiver.Config)
assert.Equal(t, "localhost:12345", jaeger.GRPC.NetAddr.Endpoint)
attributes := cfg.Processors["attributes"].(*attributesprocessor.Config)
require.Equal(t, 1, len(attributes.Actions))
assert.Equal(t, "foo", attributes.Actions[0].Key)
assert.Equal(t, "bar", attributes.Actions[0].Value)
})
}

func TestSetFlag_component_does_not_exist(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)

cmd := &cobra.Command{}
addSetFlag(cmd.Flags())
fs := &flag.FlagSet{}
builder.Flags(fs)
cmd.Flags().AddGoFlagSet(fs)
cmd.ParseFlags([]string{
"--config=testdata/otelcol-config.yaml",
"--set=processors.batch.timeout=2s",
// Arrays are overridden and object arrays cannot be indexed
// this creates actions array of size 1
"--set=processors.attributes.actions.key=foo",
"--set=processors.attributes.actions.value=bar",
"--set=receivers.jaeger.protocols.grpc.endpoint=localhost:12345",
})
cfg, err := FileLoaderConfigFactory(cmd, factories)
require.NoError(t, err)
require.NotNil(t, cfg)
}
type minimalParserLoader struct{}

func constructMimumalOpConfig(t *testing.T, factories component.Factories) *config.Config {
func (*minimalParserLoader) Get() (*config.Parser, error) {
configStr := `
receivers:
otlp:
Expand All @@ -347,37 +242,36 @@ service:
v := config.NewViper()
v.SetConfigType("yaml")
v.ReadConfig(strings.NewReader(configStr))
cfg, err := configparser.Load(config.ParserFromViper(v), factories)
assert.NoError(t, err)
err = cfg.Validate()
assert.NoError(t, err)
return cfg
return config.ParserFromViper(v), nil
}

type errParserLoader struct {
err error
}

func (epl *errParserLoader) Get() (*config.Parser, error) {
return nil, epl.err
}

func TestApplication_updateService(t *testing.T) {
factories, err := defaultcomponents.Components()
require.NoError(t, err)
ctx := context.Background()
sentinelError := errors.New("sentinel error")
returnConfigFactoryFn := func(cfg *config.Config, err error) ConfigFactory {
return func(*cobra.Command, component.Factories) (*config.Config, error) {
return cfg, err
}
}

tests := []struct {
name string
configFactory ConfigFactory
service *service
skip bool
name string
parserProvider parserprovider.ParserProvider
service *service
skip bool
}{
{
name: "first_load_err",
configFactory: returnConfigFactoryFn(nil, sentinelError),
name: "first_load_err",
parserProvider: &errParserLoader{err: sentinelError},
},
{
name: "retire_service_ok_load_err",
configFactory: returnConfigFactoryFn(nil, sentinelError),
name: "retire_service_ok_load_err",
parserProvider: &errParserLoader{err: sentinelError},
service: &service{
logger: zap.NewNop(),
builtExporters: builder.Exporters{},
Expand All @@ -387,8 +281,8 @@ func TestApplication_updateService(t *testing.T) {
},
},
{
name: "retire_service_ok_load_ok",
configFactory: returnConfigFactoryFn(constructMimumalOpConfig(t, factories), nil),
name: "retire_service_ok_load_ok",
parserProvider: new(minimalParserLoader),
service: &service{
logger: zap.NewNop(),
builtExporters: builder.Exporters{},
Expand All @@ -407,10 +301,10 @@ func TestApplication_updateService(t *testing.T) {
}

app := Application{
logger: zap.NewNop(),
configFactory: tt.configFactory,
factories: factories,
service: tt.service,
logger: zap.NewNop(),
parserProvider: tt.parserProvider,
factories: factories,
service: tt.service,
}

err := app.updateService(ctx)
Expand Down
Loading

0 comments on commit 3be3fa7

Please # to comment.