Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

loopCheckPartitionNumbers function some problem #2460

Closed
cheaster062211 opened this issue Mar 30, 2023 · 2 comments · Fixed by #2563
Closed

loopCheckPartitionNumbers function some problem #2460

cheaster062211 opened this issue Mar 30, 2023 · 2 comments · Fixed by #2563

Comments

@cheaster062211
Copy link

cheaster062211 commented Mar 30, 2023

image
When loopCheckPartitionNumbers function go to line 627,the partition number has changed,so the consumer no longer rebalance,should oldTopicToPartitionNum get when the session was established instead of in the goroutinue?

@github-actions
Copy link

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur.
Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

@github-actions github-actions bot added the stale Issues and pull requests without any recent activity label Jul 17, 2023
@napallday
Copy link
Contributor

I believe this is a very specific edge case issue.

The problem can occur after the leader has already completed the topic-partition assignment during rebalance in c.newSession. Before oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics) is called in loopCheckPartitionNumbers, if new partitions are added for the topics, and another goroutine triggers RefreshMetadata to obtain the latest partitions, the loopCheckPartitionNumbers function will have oldTopicToPartitionNum with the latest partitions. Consequently, it will not trigger a rebalance in the future, even though Sarama is designed to have the automatic metadata refresh logic.

Plz CMIIW

 func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
	// Ensure group is not closed
	select {
	case <-c.closed:
		return ErrClosedConsumerGroup
	default:
	}

	c.lock.Lock()
	defer c.lock.Unlock()

	// Quick exit when no topics are provided
	if len(topics) == 0 {
		return fmt.Errorf("no topics provided")
	}

	// Refresh metadata for requested topics
	if err := c.client.RefreshMetadata(topics...); err != nil {
		return err
	}

	// Init session
	sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
	if errors.Is(err, ErrClosedClient) {
		return ErrClosedConsumerGroup
	} else if err != nil {
		return err
	}

	// loop check topic partition numbers changed
	// will trigger rebalance when any topic partitions number had changed
	// avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
	go c.loopCheckPartitionNumbers(topics, sess)

	// Wait for session exit signal
	<-sess.ctx.Done()

	// Gracefully release session claims
	return sess.release(true)
}

func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
	if c.config.Metadata.RefreshFrequency == time.Duration(0) {
		return
	}
	pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
	defer session.cancel()
	defer pause.Stop()
	var oldTopicToPartitionNum map[string]int
	var err error
	if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
		return
	}
	for {
		if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
			return
		} else {
			for topic, num := range oldTopicToPartitionNum {
				if newTopicToPartitionNum[topic] != num {
					return // trigger the end of the session on exit
				}
			}
		}
		select {
		case <-pause.C:
		case <-session.ctx.Done():
			Logger.Printf(
				"consumergroup/%s loop check partition number coroutine will exit, topics %s\n",
				c.groupID, topics)
			// if session closed by other, should be exited
			return
		case <-c.closed:
			return
		}
	}
}

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
2 participants