Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Configuring Sarama's initial offset to OffsetOldest leads the existing group to reprocess topic messages from start #441

Closed
homier opened this issue Nov 14, 2023 · 3 comments

Comments

@homier
Copy link

homier commented Nov 14, 2023

Hi everyone,

We've started experiencing an issue leading to reprocessing topic messages from start, when we changed Sarama's initial offset policy from OffsetNewest to OffsetOldest. First, we thought that the issue itself is about replacing one policy with another, though it appears that the problem is only about OffsetOldest policy.

I wrote a simple script that consumes messages from topic with OffsetOldest policy and prints it to stdout (attached to the issue). Produced 10 messages, started the worker, and when all the messages were successfully consumed, the consumer committed the latest message offset to consumer group - the behavior I expected. When you stop the consumer with Ctrl-C and restart it immediately, the consumer starts reading messages from the latest offset in group, everything is fine.

But, if you wait for several minutes before starting the consumer again, it will lead to resetting an offset to initial (-2 in case of Sarama) and reprocessing all the messages again.

In our case, it's a really critical issue, since with the reprocessing we have to consume several millions of messages, and it takes several hours to finish due to some computations we should do for every single message.

Seems like the issue relates to this one: IBM/sarama#2036
And one more question - should I open the same issue to Sarama?

Golang version: go version go1.21.1 linux/amd64
Sarama's version: github.com/IBM/sarama v1.41.2
Goka's version: github.com/lovoo/goka v1.1.9

package main

import (
	"context"
	"fmt"

	"github.com/IBM/sarama"
	"github.com/lovoo/goka"
)

type Codec struct{}

func (Codec) Decode(data []byte) (any, error) {
	return string(data), nil
}
func (Codec) Encode(data any) ([]byte, error) {
	return []byte(data.(string)), nil
}

func main() {
	var codec Codec

	group := goka.DefineGroup("goka-test", goka.Input(goka.Stream("goka-test"), codec, handle))

	conf := goka.DefaultConfig()

	// Everything is fine when using `sarama.OffsetNewest`
	conf.Consumer.Offsets.Initial = sarama.OffsetOldest

	proc, err := goka.NewProcessor(
		[]string{"localhost:9092"}, group,
		goka.WithConsumerGroupBuilder(goka.ConsumerGroupBuilderWithConfig(conf)),
		goka.WithLogger(goka.DefaultLogger()),
	)

	if err != nil {
		panic(err)
	}

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	errCh := make(chan error, 1)
	go func() {
		defer close(errCh)
		errCh <- proc.Run(ctx)
	}()

	select {
	case err := <-errCh:
		if err != nil {
			panic(err)
		}
	case <-ctx.Done():
	}
}

func handle(ctx goka.Context, data any) {
	fmt.Println(data)
}
@frairon
Copy link
Contributor

frairon commented Nov 18, 2023

Hi @homier,

to be honest I can't reproduce the behavior at all. It's never reconsumed, however long I waited.
My guess is it's a configuration issue on kafka's consumer-offsets topic. So not related to goka or sarama.
I tried it with the kafka-cluster set up by docker-compose in the examples-folder, everything worked as expected.

Did you use the same cluster? If not and you have access to kafka-tools in your cluster, maybe check the stored offsets for the test-group in the cluster. Maybe they're reset for some reason after some time?

kafka-consumer-groups --bootstrap-server <broker> --group goka-test --describe --offsets

@homier
Copy link
Author

homier commented Nov 19, 2023

Hi @frairon ,

Thanks for the answer! Last week we were trying to figure out what's going on in Kafka when this issue happens.

It appeared that a consumer group was deleted by Kafka after 10 minutes or so, when a processor was shut down. We've tried different Kafka versions until we realized the problem was inside Sarama group configuration.

The issue is fixed by upgrading Sarama to v1.42.1

@frairon
Copy link
Contributor

frairon commented Nov 19, 2023

Alright, that's odd :)
But good to know, thanks for the feedback!

@frairon frairon closed this as completed Nov 19, 2023
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants