Skip to content

Commit

Permalink
fix: use sourcePool topics in consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshay Kalbhor committed Apr 30, 2024
1 parent 2c16961 commit 79c3626
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 15 deletions.
10 changes: 0 additions & 10 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,16 +191,6 @@ func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.Co
log.Fatalf("error unmarshalling `sources` config: %v", err)
}

// Pass all the source topics into cg config
var srcs = make([]string, 0, len(topics))
for _, t := range topics {
srcs = append(srcs, t.SourceTopic)
}

for i := range src.Sources {
src.Sources[i].Topics = srcs
}

return src.Sources, prod
}

Expand Down
2 changes: 0 additions & 2 deletions internal/relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type KafkaCfg struct {
// ConsumerGroupCfg is the consumer group specific config.
type ConsumerGroupCfg struct {
KafkaCfg `koanf:",squash"`

Topics []string
}

// ProducerCfg is the Kafka producer config.
Expand Down
6 changes: 3 additions & 3 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.BootstrapBrokers...),
kgo.FetchMaxWait(sp.cfg.ReqTimeout),
kgo.ConsumeTopics(cfg.Topics...),
kgo.ConsumeTopics(sp.cfg.Topics...),
kgo.ConsumerGroup(sp.cfg.GroupID),
kgo.InstanceID(sp.cfg.InstanceID),
kgo.SessionTimeout(cfg.SessionTimeout),
Expand Down Expand Up @@ -412,7 +412,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
return nil, err
}

if err := testConnection(cl, cfg.SessionTimeout, cfg.Topics, nil); err != nil {
if err := testConnection(cl, cfg.SessionTimeout, sp.cfg.Topics, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -546,7 +546,7 @@ waitForTopicLag:
}

// Get end offsets of the topics
topicOffsets, err := admCl.ListEndOffsets(ctx, s.Config.Topics...)
topicOffsets, err := admCl.ListEndOffsets(ctx, sp.cfg.Topics...)
if err != nil {
sp.log.Error("error fetching offsets", "err", err)
return err
Expand Down

0 comments on commit 79c3626

Please # to comment.