Skip to content

Commit ba942c1

Browse files
committed
Replace session context with main context
Had to go around this problem using mainContext as sessions get closed quite often when the total topics is huge. Using context in struct instead of golang suggested approach of passing context around. Since sarama does not support passing context around. IBM/sarama#1776 golang/go#22602
1 parent d799fff commit ba942c1

File tree

3 files changed

+13
-5
lines changed

3 files changed

+13
-5
lines changed

cmd/redshiftloader/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ func run(cmd *cobra.Command, args []string) {
8989
groupConfig,
9090
redshiftloader.NewConsumer(
9191
ready,
92+
ctx,
9293
groupConfig.Sarama,
9394
redshifter,
9495
),

pkg/redshiftloader/consumer.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,17 @@
11
package redshiftloader
22

33
import (
4+
"context"
45
"github.com/Shopify/sarama"
56
"github.com/practo/klog/v2"
67
"github.com/practo/tipoca-stream/redshiftsink/pkg/kafka"
78
"github.com/practo/tipoca-stream/redshiftsink/pkg/redshift"
89
)
910

10-
func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer {
11+
func NewConsumer(ready chan bool, mainContext context.Context, saramaConfig kafka.SaramaConfig, redshifter *redshift.Redshift) consumer {
1112
return consumer{
1213
ready: ready,
14+
mainContext: mainContext,
1315
saramaConfig: saramaConfig,
1416
redshifter: redshifter,
1517

@@ -23,6 +25,7 @@ func NewConsumer(ready chan bool, saramaConfig kafka.SaramaConfig, redshifter *r
2325
type consumer struct {
2426
// Ready is used to signal the main thread about the readiness
2527
ready chan bool
28+
mainContext context.Context
2629
loader *loader
2730
saramaConfig kafka.SaramaConfig
2831
redshifter *redshift.Redshift
@@ -57,14 +60,14 @@ func (c consumer) processMessage(
5760
if c.loader.processor == nil {
5861
c.loader.processor = newLoadProcessor(
5962
message.Topic, message.Partition,
60-
session, c.saramaConfig, c.redshifter,
63+
session, c.mainContext, c.saramaConfig, c.redshifter,
6164
)
6265
}
6366
// TODO: not sure added below for safety, it may not be required
6467
c.loader.processor.session = session
6568

6669
select {
67-
case <-c.loader.processor.session.Context().Done():
70+
case <-c.mainContext.Done():
6871
klog.Info("Graceful shutdown requested, not inserting in batch")
6972
return nil
7073
default:
@@ -93,7 +96,7 @@ func (c consumer) ConsumeClaim(session sarama.ConsumerGroupSession,
9396
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
9497
for message := range claim.Messages() {
9598
select {
96-
case <-session.Context().Done():
99+
case <-c.mainContext.Done():
97100
klog.Infof(
98101
"%s: Gracefully shutdown. Stopped taking new messages.",
99102
claim.Topic(),

pkg/redshiftloader/load_processor.go

+5-1
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ type loadProcessor struct {
2828
// session is required to commit the offsets on succesfull processing
2929
session sarama.ConsumerGroupSession
3030

31+
mainContext context.Context
32+
3133
// s3Sink
3234
s3sink *s3sink.S3Sink
3335

@@ -82,6 +84,7 @@ type loadProcessor struct {
8284

8385
func newLoadProcessor(
8486
topic string, partition int32, session sarama.ConsumerGroupSession,
87+
mainContext context.Context,
8588
saramaConfig kafka.SaramaConfig,
8689
redshifter *redshift.Redshift) *loadProcessor {
8790

@@ -102,6 +105,7 @@ func newLoadProcessor(
102105
partition: partition,
103106
autoCommit: saramaConfig.AutoCommit,
104107
session: session,
108+
mainContext: mainContext,
105109
s3sink: sink,
106110
messageTransformer: debezium.NewMessageTransformer(),
107111
schemaTransformer: debezium.NewSchemaTransformer(
@@ -118,7 +122,7 @@ func newLoadProcessor(
118122
// TODO: get rid of this https://github.com/herryg91/gobatch/issues/2
119123
func (b *loadProcessor) ctxCancelled() bool {
120124
select {
121-
case <-b.session.Context().Done():
125+
case <-b.mainContext.Done():
122126
klog.Infof(
123127
"topic:%s, batchId:%d, lastCommittedOffset:%d: Cancelled.\n",
124128
b.topic, b.batchId, b.lastCommittedOffset,

0 commit comments

Comments
 (0)