From 3935c789810a74dee1eb5a3f7ed09acb65bd6135 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20Arg=C3=BCello?= Date: Fri, 18 Oct 2024 17:23:32 +0200 Subject: [PATCH] contrib/confluentinc/confluent-kafka-go: fix goroutine leak in Produce (#2924) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Dario Castañé --- .../confluent-kafka-go/kafka.v2/kafka.go | 34 ++- .../confluent-kafka-go/kafka.v2/kafka_test.go | 199 +++++++++++------- .../confluent-kafka-go/kafka/kafka.go | 34 ++- .../confluent-kafka-go/kafka/kafka_test.go | 198 ++++++++++------- 4 files changed, 289 insertions(+), 176 deletions(-) diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go index 463ba2391c..f450c03f94 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka.go @@ -334,31 +334,43 @@ func (p *Producer) Close() { func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { span := p.startSpan(msg) + var errChan chan error + // if the user has selected a delivery channel, we will wrap it and - // wait for the delivery event to finish the span + // wait for the delivery event to finish the span. + // in case the Produce call returns an error, we won't receive anything + // in the deliveryChan, so we use errChan to make sure this goroutine exits. if deliveryChan != nil { + errChan = make(chan error, 1) oldDeliveryChan := deliveryChan deliveryChan = make(chan kafka.Event) go func() { var err error - evt := <-deliveryChan - if msg, ok := evt.(*kafka.Message); ok { - // delivery errors are returned via TopicPartition.Error - err = msg.TopicPartition.Error - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) + select { + case evt := <-deliveryChan: + if msg, ok := evt.(*kafka.Message); ok { + // delivery errors are returned via TopicPartition.Error + err = msg.TopicPartition.Error + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) + } + oldDeliveryChan <- evt + + case e := <-errChan: + err = e } span.Finish(tracer.WithError(err)) - oldDeliveryChan <- evt }() } setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) - // with no delivery channel or enqueue error, finish immediately - if err != nil || deliveryChan == nil { - span.Finish(tracer.WithError(err)) + if err != nil { + if deliveryChan != nil { + errChan <- err + } else { + span.Finish(tracer.WithError(err)) + } } - return err } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go index bc33c09ad9..6f3112a69a 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka.v2/kafka_test.go @@ -23,6 +23,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/v2/kafka" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) var ( @@ -30,82 +31,6 @@ var ( testTopic = "gotest" ) -type consumerActionFn func(c *Consumer) (*kafka.Message, error) - -func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("to enable integration test, set the INTEGRATION environment variable") - } - mt := mocktracer.Start() - defer mt.Stop() - - // first write a message to the topic - p, err := NewProducer(&kafka.ConfigMap{ - "bootstrap.servers": "127.0.0.1:9092", - "go.delivery.reports": true, - }, producerOpts...) - require.NoError(t, err) - - delivery := make(chan kafka.Event, 1) - err = p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &testTopic, - Partition: 0, - }, - Key: []byte("key2"), - Value: []byte("value2"), - }, delivery) - require.NoError(t, err) - - msg1, _ := (<-delivery).(*kafka.Message) - p.Close() - - // next attempt to consume the message - c, err := NewConsumer(&kafka.ConfigMap{ - "group.id": testGroupID, - "bootstrap.servers": "127.0.0.1:9092", - "fetch.wait.max.ms": 500, - "socket.timeout.ms": 1500, - "session.timeout.ms": 1500, - "enable.auto.offset.store": false, - }, consumerOpts...) - require.NoError(t, err) - - err = c.Assign([]kafka.TopicPartition{ - {Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset}, - }) - require.NoError(t, err) - - msg2, err := consumerAction(c) - require.NoError(t, err) - _, err = c.CommitMessage(msg2) - require.NoError(t, err) - assert.Equal(t, msg1.String(), msg2.String()) - err = c.Close() - require.NoError(t, err) - - spans := mt.FinishedSpans() - require.Len(t, spans, 2) - // they should be linked via headers - assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - - if c.cfg.dataStreamsEnabled { - backlogs := mt.SentDSMBacklogs() - toMap := func(b []internaldsm.Backlog) map[string]struct{} { - m := make(map[string]struct{}) - for _, b := range backlogs { - m[strings.Join(b.Tags, "")] = struct{}{} - } - return m - } - backlogsMap := toMap(backlogs) - require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit") - require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark") - require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce") - } - return spans, msg2 -} - func TestConsumerChannel(t *testing.T) { // we can test consuming via the Events channel by artifically sending // messages. Testing .Poll is done via an integration test. @@ -320,7 +245,7 @@ func TestCustomTags(t *testing.T) { "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, - }, WithCustomTag("foo", func(msg *kafka.Message) interface{} { + }, WithCustomTag("foo", func(_ *kafka.Message) interface{} { return "bar" }), WithCustomTag("key", func(msg *kafka.Message) interface{} { return msg.Key @@ -370,3 +295,123 @@ func TestNamingSchema(t *testing.T) { } namingschematest.NewKafkaTest(genSpans)(t) } + +// Test we don't leak goroutines and properly close the span when Produce returns an error. +func TestProduceError(t *testing.T) { + defer func() { + err := goleak.Find() + if err != nil { + // if a goroutine is leaking, ensure it is not coming from this package + assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go") + } + }() + + mt := mocktracer.Start() + defer mt.Stop() + + // first write a message to the topic + p, err := NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "go.delivery.reports": true, + }) + require.NoError(t, err) + defer p.Close() + + // this empty message should cause an error in the Produce call. + topic := "" + msg := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + }, + } + deliveryChan := make(chan kafka.Event, 1) + err = p.Produce(msg, deliveryChan) + require.Error(t, err) + require.EqualError(t, err, "Local: Invalid argument or configuration") + + select { + case <-deliveryChan: + assert.Fail(t, "there should be no events in the deliveryChan") + case <-time.After(1 * time.Second): + // assume there is no event + } + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) +} + +type consumerActionFn func(c *Consumer) (*kafka.Message, error) + +func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { + if _, ok := os.LookupEnv("INTEGRATION"); !ok { + t.Skip("to enable integration test, set the INTEGRATION environment variable") + } + mt := mocktracer.Start() + defer mt.Stop() + + // first write a message to the topic + p, err := NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "go.delivery.reports": true, + }, producerOpts...) + require.NoError(t, err) + + delivery := make(chan kafka.Event, 1) + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &testTopic, + Partition: 0, + }, + Key: []byte("key2"), + Value: []byte("value2"), + }, delivery) + require.NoError(t, err) + + msg1, _ := (<-delivery).(*kafka.Message) + p.Close() + + // next attempt to consume the message + c, err := NewConsumer(&kafka.ConfigMap{ + "group.id": testGroupID, + "bootstrap.servers": "127.0.0.1:9092", + "fetch.wait.max.ms": 500, + "socket.timeout.ms": 1500, + "session.timeout.ms": 1500, + "enable.auto.offset.store": false, + }, consumerOpts...) + require.NoError(t, err) + + err = c.Assign([]kafka.TopicPartition{ + {Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset}, + }) + require.NoError(t, err) + + msg2, err := consumerAction(c) + require.NoError(t, err) + _, err = c.CommitMessage(msg2) + require.NoError(t, err) + assert.Equal(t, msg1.String(), msg2.String()) + err = c.Close() + require.NoError(t, err) + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + // they should be linked via headers + assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) + + if c.cfg.dataStreamsEnabled { + backlogs := mt.SentDSMBacklogs() + toMap := func(_ []internaldsm.Backlog) map[string]struct{} { + m := make(map[string]struct{}) + for _, b := range backlogs { + m[strings.Join(b.Tags, "")] = struct{}{} + } + return m + } + backlogsMap := toMap(backlogs) + require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit") + require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark") + require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce") + } + return spans, msg2 +} diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go index d78d4c8860..b61388d3bb 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka.go @@ -334,31 +334,43 @@ func (p *Producer) Close() { func (p *Producer) Produce(msg *kafka.Message, deliveryChan chan kafka.Event) error { span := p.startSpan(msg) + var errChan chan error + // if the user has selected a delivery channel, we will wrap it and - // wait for the delivery event to finish the span + // wait for the delivery event to finish the span. + // in case the Produce call returns an error, we won't receive anything + // in the deliveryChan, so we use errChan to make sure this goroutine exits. if deliveryChan != nil { + errChan = make(chan error, 1) oldDeliveryChan := deliveryChan deliveryChan = make(chan kafka.Event) go func() { var err error - evt := <-deliveryChan - if msg, ok := evt.(*kafka.Message); ok { - // delivery errors are returned via TopicPartition.Error - err = msg.TopicPartition.Error - trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) + select { + case evt := <-deliveryChan: + if msg, ok := evt.(*kafka.Message); ok { + // delivery errors are returned via TopicPartition.Error + err = msg.TopicPartition.Error + trackProduceOffsets(p.cfg.dataStreamsEnabled, msg, err) + } + oldDeliveryChan <- evt + + case e := <-errChan: + err = e } span.Finish(tracer.WithError(err)) - oldDeliveryChan <- evt }() } setProduceCheckpoint(p.cfg.dataStreamsEnabled, p.libraryVersion, msg) err := p.Producer.Produce(msg, deliveryChan) - // with no delivery channel or enqueue error, finish immediately - if err != nil || deliveryChan == nil { - span.Finish(tracer.WithError(err)) + if err != nil { + if deliveryChan != nil { + errChan <- err + } else { + span.Finish(tracer.WithError(err)) + } } - return err } diff --git a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go index d7cc103141..343b4c531b 100644 --- a/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go +++ b/contrib/confluentinc/confluent-kafka-go/kafka/kafka_test.go @@ -23,6 +23,7 @@ import ( "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/goleak" ) var ( @@ -30,82 +31,6 @@ var ( testTopic = "gotest" ) -type consumerActionFn func(c *Consumer) (*kafka.Message, error) - -func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { - if _, ok := os.LookupEnv("INTEGRATION"); !ok { - t.Skip("to enable integration test, set the INTEGRATION environment variable") - } - mt := mocktracer.Start() - defer mt.Stop() - - // first write a message to the topic - p, err := NewProducer(&kafka.ConfigMap{ - "bootstrap.servers": "127.0.0.1:9092", - "go.delivery.reports": true, - }, producerOpts...) - require.NoError(t, err) - - delivery := make(chan kafka.Event, 1) - err = p.Produce(&kafka.Message{ - TopicPartition: kafka.TopicPartition{ - Topic: &testTopic, - Partition: 0, - }, - Key: []byte("key2"), - Value: []byte("value2"), - }, delivery) - require.NoError(t, err) - - msg1, _ := (<-delivery).(*kafka.Message) - p.Close() - - // next attempt to consume the message - c, err := NewConsumer(&kafka.ConfigMap{ - "group.id": testGroupID, - "bootstrap.servers": "127.0.0.1:9092", - "fetch.wait.max.ms": 500, - "socket.timeout.ms": 1500, - "session.timeout.ms": 1500, - "enable.auto.offset.store": false, - }, consumerOpts...) - require.NoError(t, err) - - err = c.Assign([]kafka.TopicPartition{ - {Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset}, - }) - require.NoError(t, err) - - msg2, err := consumerAction(c) - require.NoError(t, err) - _, err = c.CommitMessage(msg2) - require.NoError(t, err) - assert.Equal(t, msg1.String(), msg2.String()) - err = c.Close() - require.NoError(t, err) - - spans := mt.FinishedSpans() - require.Len(t, spans, 2) - // they should be linked via headers - assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) - - if c.cfg.dataStreamsEnabled { - backlogs := mt.SentDSMBacklogs() - toMap := func(b []internaldsm.Backlog) map[string]struct{} { - m := make(map[string]struct{}) - for _, b := range backlogs { - m[strings.Join(b.Tags, "")] = struct{}{} - } - return m - } - backlogsMap := toMap(backlogs) - require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit") - require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark") - require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce") - } - return spans, msg2 -} - func TestConsumerChannel(t *testing.T) { // we can test consuming via the Events channel by artifically sending // messages. Testing .Poll is done via an integration test. @@ -320,7 +245,7 @@ func TestCustomTags(t *testing.T) { "socket.timeout.ms": 10, "session.timeout.ms": 10, "enable.auto.offset.store": false, - }, WithCustomTag("foo", func(msg *kafka.Message) interface{} { + }, WithCustomTag("foo", func(_ *kafka.Message) interface{} { return "bar" }), WithCustomTag("key", func(msg *kafka.Message) interface{} { return msg.Key @@ -370,3 +295,122 @@ func TestNamingSchema(t *testing.T) { } namingschematest.NewKafkaTest(genSpans)(t) } + +// Test we don't leak goroutines and properly close the span when Produce returns an error +func TestProduceError(t *testing.T) { + defer func() { + err := goleak.Find() + if err != nil { + // if a goroutine is leaking, ensure it is not coming from this package + assert.NotContains(t, err.Error(), "contrib/confluentinc/confluent-kafka-go") + } + }() + + mt := mocktracer.Start() + defer mt.Stop() + + // first write a message to the topic + p, err := NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "go.delivery.reports": true, + }) + require.NoError(t, err) + defer p.Close() + + topic := "" + msg := &kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &topic, + }, + } + deliveryChan := make(chan kafka.Event, 1) + err = p.Produce(msg, deliveryChan) + require.Error(t, err) + require.EqualError(t, err, "Local: Invalid argument or configuration") + + select { + case <-deliveryChan: + assert.Fail(t, "there should be no events in the deliveryChan") + case <-time.After(1 * time.Second): + // assume there is no event + } + + spans := mt.FinishedSpans() + assert.Len(t, spans, 1) +} + +type consumerActionFn func(c *Consumer) (*kafka.Message, error) + +func produceThenConsume(t *testing.T, consumerAction consumerActionFn, producerOpts []Option, consumerOpts []Option) ([]mocktracer.Span, *kafka.Message) { + if _, ok := os.LookupEnv("INTEGRATION"); !ok { + t.Skip("to enable integration test, set the INTEGRATION environment variable") + } + mt := mocktracer.Start() + defer mt.Stop() + + // first write a message to the topic + p, err := NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": "127.0.0.1:9092", + "go.delivery.reports": true, + }, producerOpts...) + require.NoError(t, err) + + delivery := make(chan kafka.Event, 1) + err = p.Produce(&kafka.Message{ + TopicPartition: kafka.TopicPartition{ + Topic: &testTopic, + Partition: 0, + }, + Key: []byte("key2"), + Value: []byte("value2"), + }, delivery) + require.NoError(t, err) + + msg1, _ := (<-delivery).(*kafka.Message) + p.Close() + + // next attempt to consume the message + c, err := NewConsumer(&kafka.ConfigMap{ + "group.id": testGroupID, + "bootstrap.servers": "127.0.0.1:9092", + "fetch.wait.max.ms": 500, + "socket.timeout.ms": 1500, + "session.timeout.ms": 1500, + "enable.auto.offset.store": false, + }, consumerOpts...) + require.NoError(t, err) + + err = c.Assign([]kafka.TopicPartition{ + {Topic: &testTopic, Partition: 0, Offset: msg1.TopicPartition.Offset}, + }) + require.NoError(t, err) + + msg2, err := consumerAction(c) + require.NoError(t, err) + _, err = c.CommitMessage(msg2) + require.NoError(t, err) + assert.Equal(t, msg1.String(), msg2.String()) + err = c.Close() + require.NoError(t, err) + + spans := mt.FinishedSpans() + require.Len(t, spans, 2) + // they should be linked via headers + assert.Equal(t, spans[0].TraceID(), spans[1].TraceID()) + + if c.cfg.dataStreamsEnabled { + backlogs := mt.SentDSMBacklogs() + toMap := func(_ []internaldsm.Backlog) map[string]struct{} { + m := make(map[string]struct{}) + for _, b := range backlogs { + m[strings.Join(b.Tags, "")] = struct{}{} + } + return m + } + backlogsMap := toMap(backlogs) + require.Contains(t, backlogsMap, "consumer_group:"+testGroupID+"partition:0"+"topic:"+testTopic+"type:kafka_commit") + require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_high_watermark") + require.Contains(t, backlogsMap, "partition:0"+"topic:"+testTopic+"type:kafka_produce") + } + return spans, msg2 +}