From 620fd55692dc7ca99c9949b16503bfd35f9c7ef2 Mon Sep 17 00:00:00 2001 From: Kailash Nadh Date: Fri, 3 May 2024 12:53:24 +0530 Subject: [PATCH] Fix filter initialization. --- config.sample.toml | 2 +- init.go | 20 +++++++++++--------- main.go | 2 +- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/config.sample.toml b/config.sample.toml index dabaaf7..2aa5301 100644 --- a/config.sample.toml +++ b/config.sample.toml @@ -116,7 +116,7 @@ request_timeout = "100ms" # Custom go-plugin filter to load to filter messages when relaying -[filter.testfilter] +[filters.test] config = ''' { "address": ["127.0.0.1:6379"], diff --git a/init.go b/init.go index d2a8e17..1f4a58e 100644 --- a/init.go +++ b/init.go @@ -194,15 +194,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC return src.Sources, prod } -// initFilterProviders loads the go plugin, initializes it and return a map of filter plugins. -func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map[string]filter.Provider, error) { +// initFilters loads the go plugin, initializes it and return a map of filter plugins. +func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, error) { out := make(map[string]filter.Provider) - for _, fName := range names { - plg, err := plugin.Open(fName) + for _, name := range ko.MapKeys("filters") { + plg, err := plugin.Open(name) if err != nil { - return nil, fmt.Errorf("error loading provider plugin '%s': %v", fName, err) + return nil, fmt.Errorf("error loading provider plugin '%s': %v", name, err) } - id := strings.TrimSuffix(filepath.Base(fName), filepath.Ext(fName)) + id := strings.TrimSuffix(filepath.Base(name), filepath.Ext(name)) newFunc, err := plg.Lookup("New") if err != nil { @@ -214,9 +214,11 @@ func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map } var cfg filter.Config - ko.Unmarshal("filter."+id, &cfg) + if err := ko.Unmarshal("filter."+id, &cfg); err != nil { + log.Fatalf("error unmarshalling filter config: %s: %v", name, err) + } if cfg.Config == "" { - log.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id)) + lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id)) } // Initialize the plugin. @@ -224,7 +226,7 @@ func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map if err != nil { return nil, fmt.Errorf("error initializing filter provider plugin '%s': %v", id, err) } - log.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, fName)) + lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, name)) p, ok := prov.(filter.Provider) if !ok { diff --git a/main.go b/main.go index d94ac74..1e387c5 100644 --- a/main.go +++ b/main.go @@ -34,7 +34,7 @@ func main() { lo := initLog(ko) // Load the optional filter providers. - filters, err := initFilterProviders(ko.Strings("filter"), ko, lo) + filters, err := initFilters(ko, lo) if err != nil { log.Fatalf("error initializing filter provider: %v", err) }