From 217646f17fa9b413861c15b0aab3dd45eeeb5d36 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Tue, 30 Apr 2024 15:54:05 +0530 Subject: [PATCH] fix: use sourcePool topics in consumer --- init.go | 12 +----------- internal/relay/config.go | 2 -- internal/relay/source_pool.go | 6 +++--- main.go | 2 +- 4 files changed, 5 insertions(+), 17 deletions(-) diff --git a/init.go b/init.go index f2b9995..dfe2197 100644 --- a/init.go +++ b/init.go @@ -175,7 +175,7 @@ func initTopicsMap(ko *koanf.Koanf) map[string]relay.Topic { } // initKafkaConfig reads the source(s)/target Kafka configuration. -func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) { +func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) { // Read source Kafka config. src := struct { Sources []relay.ConsumerGroupCfg `koanf:"sources"` @@ -191,16 +191,6 @@ func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.Co log.Fatalf("error unmarshalling `sources` config: %v", err) } - // Pass all the source topics into cg config - var srcs = make([]string, 0, len(topics)) - for _, t := range topics { - srcs = append(srcs, t.SourceTopic) - } - - for i := range src.Sources { - src.Sources[i].Topics = srcs - } - return src.Sources, prod } diff --git a/internal/relay/config.go b/internal/relay/config.go index 120cce6..c2c7ebb 100644 --- a/internal/relay/config.go +++ b/internal/relay/config.go @@ -42,8 +42,6 @@ type KafkaCfg struct { // ConsumerGroupCfg is the consumer group specific config. type ConsumerGroupCfg struct { KafkaCfg `koanf:",squash"` - - Topics []string } // ProducerCfg is the Kafka producer config. diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index c11459d..42167fa 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -376,7 +376,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf opts := []kgo.Opt{ kgo.SeedBrokers(cfg.BootstrapBrokers...), kgo.FetchMaxWait(sp.cfg.ReqTimeout), - kgo.ConsumeTopics(cfg.Topics...), + kgo.ConsumeTopics(sp.cfg.Topics...), kgo.ConsumerGroup(sp.cfg.GroupID), kgo.InstanceID(sp.cfg.InstanceID), kgo.SessionTimeout(cfg.SessionTimeout), @@ -412,7 +412,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf return nil, err } - if err := testConnection(cl, cfg.SessionTimeout, cfg.Topics, nil); err != nil { + if err := testConnection(cl, cfg.SessionTimeout, sp.cfg.Topics, nil); err != nil { return nil, err } @@ -546,7 +546,7 @@ waitForTopicLag: } // Get end offsets of the topics - topicOffsets, err := admCl.ListEndOffsets(ctx, s.Config.Topics...) + topicOffsets, err := admCl.ListEndOffsets(ctx, sp.cfg.Topics...) if err != nil { sp.log.Error("error fetching offsets", "err", err) return err diff --git a/main.go b/main.go index cb875d3..98c6de9 100644 --- a/main.go +++ b/main.go @@ -50,7 +50,7 @@ func main() { topics := initTopicsMap(ko) // Initialize the source and target Kafka config. - consumerCfgs, prodConfig := initKafkaConfig(ko, topics) + consumerCfgs, prodConfig := initKafkaConfig(ko) // Initialize the target Kafka (producer) relay. target, err := relay.NewTarget(globalCtx, initTargetConfig(ko), prodConfig, topics, metr, lo)