Skip to content

Commit

Permalink
Fix filter initialization.
Browse files Browse the repository at this point in the history
  • Loading branch information
knadh committed May 3, 2024
1 parent a0f9070 commit 620fd55
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 11 deletions.
2 changes: 1 addition & 1 deletion config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ request_timeout = "100ms"


# Custom go-plugin filter to load to filter messages when relaying
[filter.testfilter]
[filters.test]
config = '''
{
"address": ["127.0.0.1:6379"],
Expand Down
20 changes: 11 additions & 9 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,15 @@ func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerC
return src.Sources, prod
}

// initFilterProviders loads the go plugin, initializes it and return a map of filter plugins.
func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map[string]filter.Provider, error) {
// initFilters loads the go plugin, initializes it and return a map of filter plugins.
func initFilters(ko *koanf.Koanf, lo *slog.Logger) (map[string]filter.Provider, error) {
out := make(map[string]filter.Provider)
for _, fName := range names {
plg, err := plugin.Open(fName)
for _, name := range ko.MapKeys("filters") {
plg, err := plugin.Open(name)
if err != nil {
return nil, fmt.Errorf("error loading provider plugin '%s': %v", fName, err)
return nil, fmt.Errorf("error loading provider plugin '%s': %v", name, err)
}
id := strings.TrimSuffix(filepath.Base(fName), filepath.Ext(fName))
id := strings.TrimSuffix(filepath.Base(name), filepath.Ext(name))

newFunc, err := plg.Lookup("New")
if err != nil {
Expand All @@ -214,17 +214,19 @@ func initFilterProviders(names []string, ko *koanf.Koanf, log *slog.Logger) (map
}

var cfg filter.Config
ko.Unmarshal("filter."+id, &cfg)
if err := ko.Unmarshal("filter."+id, &cfg); err != nil {
log.Fatalf("error unmarshalling filter config: %s: %v", name, err)
}
if cfg.Config == "" {
log.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id))
lo.Info(fmt.Sprintf("WARNING: No config 'filter.%s' for '%s' in config", id, id))
}

// Initialize the plugin.
prov, err := f([]byte(cfg.Config))
if err != nil {
return nil, fmt.Errorf("error initializing filter provider plugin '%s': %v", id, err)
}
log.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, fName))
lo.Info(fmt.Sprintf("loaded filter provider plugin '%s' from %s", id, name))

p, ok := prov.(filter.Provider)
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func main() {
lo := initLog(ko)

// Load the optional filter providers.
filters, err := initFilterProviders(ko.Strings("filter"), ko, lo)
filters, err := initFilters(ko, lo)
if err != nil {
log.Fatalf("error initializing filter provider: %v", err)
}
Expand Down

0 comments on commit 620fd55

Please # to comment.