From ab1dc81b1309fef8b06f987ce9d855e8d99337fd Mon Sep 17 00:00:00 2001 From: smallfish Date: Sat, 31 Dec 2022 23:31:47 +0800 Subject: [PATCH] fix kafka --- xkafka/kafka.go | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/xkafka/kafka.go b/xkafka/kafka.go index a0893b0..91dd142 100644 --- a/xkafka/kafka.go +++ b/xkafka/kafka.go @@ -171,13 +171,17 @@ type KafkaConsumerGroup struct { sarama.ConsumerGroup sarama.ConsumerGroupHandler Topics []string + context.Context + context.CancelFunc } func InitKafkaConsumer(conf *ConsumerGroupConf) *KafkaConsumerGroup { - return &KafkaConsumerGroup{ + k := &KafkaConsumerGroup{ ConsumerGroup: newConsumerGroup(conf), Topics: conf.Topics, } + k.Context, k.CancelFunc = context.WithCancel(context.TODO()) + return k } func newConsumerGroup(conf *ConsumerGroupConf) sarama.ConsumerGroup { @@ -199,9 +203,12 @@ func newConsumerGroup(conf *ConsumerGroupConf) sarama.ConsumerGroup { func (kc *KafkaConsumerGroup) Consume() { for { - err := kc.ConsumerGroup.Consume(context.TODO(), kc.Topics, kc.ConsumerGroupHandler) + err := kc.ConsumerGroup.Consume(kc.Context, kc.Topics, kc.ConsumerGroupHandler) if err != nil { - logrus.Warnf("consumer group consume fail, err:%+v\n", err) + logrus.WithContext(kc.Context).WithError(err).Warn("consumer group consume fail") + } + if kc.Context.Err() != nil { + return } time.Sleep(time.Second) } @@ -213,6 +220,7 @@ func (kc *KafkaConsumerGroup) Start() error { } func (kc *KafkaConsumerGroup) Stop(_ context.Context) error { + kc.CancelFunc() return kc.Close() }