From 79c3626c151edcdb2551520551da3336ccf4a2a6 Mon Sep 17 00:00:00 2001 From: Lakshay Kalbhor Date: Tue, 30 Apr 2024 15:54:05 +0530 Subject: [PATCH] fix: use sourcePool topics in consumer --- init.go | 10 ---------- internal/relay/config.go | 2 -- internal/relay/source_pool.go | 6 +++--- 3 files changed, 3 insertions(+), 15 deletions(-) diff --git a/init.go b/init.go index f2b9995..6de0bed 100644 --- a/init.go +++ b/init.go @@ -191,16 +191,6 @@ func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.Co log.Fatalf("error unmarshalling `sources` config: %v", err) } - // Pass all the source topics into cg config - var srcs = make([]string, 0, len(topics)) - for _, t := range topics { - srcs = append(srcs, t.SourceTopic) - } - - for i := range src.Sources { - src.Sources[i].Topics = srcs - } - return src.Sources, prod } diff --git a/internal/relay/config.go b/internal/relay/config.go index 120cce6..c2c7ebb 100644 --- a/internal/relay/config.go +++ b/internal/relay/config.go @@ -42,8 +42,6 @@ type KafkaCfg struct { // ConsumerGroupCfg is the consumer group specific config. type ConsumerGroupCfg struct { KafkaCfg `koanf:",squash"` - - Topics []string } // ProducerCfg is the Kafka producer config. diff --git a/internal/relay/source_pool.go b/internal/relay/source_pool.go index c11459d..42167fa 100644 --- a/internal/relay/source_pool.go +++ b/internal/relay/source_pool.go @@ -376,7 +376,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf opts := []kgo.Opt{ kgo.SeedBrokers(cfg.BootstrapBrokers...), kgo.FetchMaxWait(sp.cfg.ReqTimeout), - kgo.ConsumeTopics(cfg.Topics...), + kgo.ConsumeTopics(sp.cfg.Topics...), kgo.ConsumerGroup(sp.cfg.GroupID), kgo.InstanceID(sp.cfg.InstanceID), kgo.SessionTimeout(cfg.SessionTimeout), @@ -412,7 +412,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf return nil, err } - if err := testConnection(cl, cfg.SessionTimeout, cfg.Topics, nil); err != nil { + if err := testConnection(cl, cfg.SessionTimeout, sp.cfg.Topics, nil); err != nil { return nil, err } @@ -546,7 +546,7 @@ waitForTopicLag: } // Get end offsets of the topics - topicOffsets, err := admCl.ListEndOffsets(ctx, s.Config.Topics...) + topicOffsets, err := admCl.ListEndOffsets(ctx, sp.cfg.Topics...) if err != nil { sp.log.Error("error fetching offsets", "err", err) return err