Skip to content

Commit

Permalink
fix: use sourcePool topics in consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Lakshay Kalbhor committed Apr 30, 2024
1 parent 2c16961 commit 217646f
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 17 deletions.
12 changes: 1 addition & 11 deletions init.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func initTopicsMap(ko *koanf.Koanf) map[string]relay.Topic {
}

// initKafkaConfig reads the source(s)/target Kafka configuration.
func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) {
func initKafkaConfig(ko *koanf.Koanf) ([]relay.ConsumerGroupCfg, relay.ProducerCfg) {
// Read source Kafka config.
src := struct {
Sources []relay.ConsumerGroupCfg `koanf:"sources"`
Expand All @@ -191,16 +191,6 @@ func initKafkaConfig(ko *koanf.Koanf, topics map[string]relay.Topic) ([]relay.Co
log.Fatalf("error unmarshalling `sources` config: %v", err)
}

// Pass all the source topics into cg config
var srcs = make([]string, 0, len(topics))
for _, t := range topics {
srcs = append(srcs, t.SourceTopic)
}

for i := range src.Sources {
src.Sources[i].Topics = srcs
}

return src.Sources, prod
}

Expand Down
2 changes: 0 additions & 2 deletions internal/relay/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ type KafkaCfg struct {
// ConsumerGroupCfg is the consumer group specific config.
type ConsumerGroupCfg struct {
KafkaCfg `koanf:",squash"`

Topics []string
}

// ProducerCfg is the Kafka producer config.
Expand Down
6 changes: 3 additions & 3 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
opts := []kgo.Opt{
kgo.SeedBrokers(cfg.BootstrapBrokers...),
kgo.FetchMaxWait(sp.cfg.ReqTimeout),
kgo.ConsumeTopics(cfg.Topics...),
kgo.ConsumeTopics(sp.cfg.Topics...),
kgo.ConsumerGroup(sp.cfg.GroupID),
kgo.InstanceID(sp.cfg.InstanceID),
kgo.SessionTimeout(cfg.SessionTimeout),
Expand Down Expand Up @@ -412,7 +412,7 @@ func (sp *SourcePool) initConsumerGroup(ctx context.Context, cfg ConsumerGroupCf
return nil, err
}

if err := testConnection(cl, cfg.SessionTimeout, cfg.Topics, nil); err != nil {
if err := testConnection(cl, cfg.SessionTimeout, sp.cfg.Topics, nil); err != nil {
return nil, err
}

Expand Down Expand Up @@ -546,7 +546,7 @@ waitForTopicLag:
}

// Get end offsets of the topics
topicOffsets, err := admCl.ListEndOffsets(ctx, s.Config.Topics...)
topicOffsets, err := admCl.ListEndOffsets(ctx, sp.cfg.Topics...)
if err != nil {
sp.log.Error("error fetching offsets", "err", err)
return err
Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func main() {
topics := initTopicsMap(ko)

// Initialize the source and target Kafka config.
consumerCfgs, prodConfig := initKafkaConfig(ko, topics)
consumerCfgs, prodConfig := initKafkaConfig(ko)

// Initialize the target Kafka (producer) relay.
target, err := relay.NewTarget(globalCtx, initTargetConfig(ko), prodConfig, topics, metr, lo)
Expand Down

0 comments on commit 217646f

Please # to comment.