Skip to content

Commit

Permalink
fix kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
plum330 committed Dec 31, 2022
1 parent 23e1cd7 commit ab1dc81
Showing 1 changed file with 11 additions and 3 deletions.
14 changes: 11 additions & 3 deletions xkafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,13 +171,17 @@ type KafkaConsumerGroup struct {
sarama.ConsumerGroup
sarama.ConsumerGroupHandler
Topics []string
context.Context
context.CancelFunc
}

func InitKafkaConsumer(conf *ConsumerGroupConf) *KafkaConsumerGroup {
return &KafkaConsumerGroup{
k := &KafkaConsumerGroup{
ConsumerGroup: newConsumerGroup(conf),
Topics: conf.Topics,
}
k.Context, k.CancelFunc = context.WithCancel(context.TODO())
return k
}

func newConsumerGroup(conf *ConsumerGroupConf) sarama.ConsumerGroup {
Expand All @@ -199,9 +203,12 @@ func newConsumerGroup(conf *ConsumerGroupConf) sarama.ConsumerGroup {

func (kc *KafkaConsumerGroup) Consume() {
for {
err := kc.ConsumerGroup.Consume(context.TODO(), kc.Topics, kc.ConsumerGroupHandler)
err := kc.ConsumerGroup.Consume(kc.Context, kc.Topics, kc.ConsumerGroupHandler)
if err != nil {
logrus.Warnf("consumer group consume fail, err:%+v\n", err)
logrus.WithContext(kc.Context).WithError(err).Warn("consumer group consume fail")
}
if kc.Context.Err() != nil {
return
}
time.Sleep(time.Second)
}
Expand All @@ -213,6 +220,7 @@ func (kc *KafkaConsumerGroup) Start() error {
}

func (kc *KafkaConsumerGroup) Stop(_ context.Context) error {
kc.CancelFunc()
return kc.Close()
}

Expand Down

0 comments on commit ab1dc81

Please # to comment.