Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

LeaveGroupRequest still use the old version request #2486

Closed
ai-zelenin opened this issue Jul 6, 2023 · 0 comments · Fixed by #2544
Closed

LeaveGroupRequest still use the old version request #2486

ai-zelenin opened this issue Jul 6, 2023 · 0 comments · Fixed by #2544
Labels

Comments

@ai-zelenin
Copy link

ai-zelenin commented Jul 6, 2023

Versions
Sarama Kafka Go
v1.38.1 3.4.0 1.20
Configuration
func NewSaramaConfig(cfg *Config) *sarama.Config {
	saramaConfig := sarama.NewConfig()

	saramaConfig.Metadata.Full = false
	saramaConfig.Metadata.AllowAutoTopicCreation = true

	saramaConfig.Net.MaxOpenRequests = cfg.MaxOpenRequests
	saramaConfig.Net.SASL.Enable = cfg.User != "" && cfg.Password != ""
	saramaConfig.Net.SASL.Mechanism = sarama.SASLTypePlaintext
	saramaConfig.Net.SASL.User = cfg.User
	saramaConfig.Net.SASL.Password = cfg.Password

	saramaConfig.Consumer.Return.Errors = true
	saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
	saramaConfig.Consumer.Offsets.AutoCommit.Enable = false

	saramaConfig.Producer.Retry.Max = 3
	if cfg.ProducerTimeout > 0 {
		saramaConfig.Producer.Timeout = cfg.ProducerTimeout
	}
	saramaConfig.Producer.Partitioner = sarama.NewRoundRobinPartitioner
	saramaConfig.Producer.RequiredAcks = sarama.WaitForLocal
	saramaConfig.Producer.Return.Errors = true

	return saramaConfig
}
Logs
Kafka logs: CLICK ME

[2023-07-05 14:13:46,099] ERROR [KafkaApi-2] Unexpected error handling request RequestHeader(apiKey=LEAVE_GROUP, apiVersion=0, clientId=sarama, correlationId=657, headerVersion=1) -- LeaveGroupRequestData(groupId='CG1, memberId='sarama-514ac294-383f-4a37-a5d9-aabcbb6acafa', members=[]) with context RequestContext(header=RequestHeader(apiKey=LEAVE_GROUP, apiVersion=0, clientId=sarama, correlationId=657, headerVersion=1), connectionId='172.29.1.60:9092-172.29.0.40:11430-1081', clientAddress=/172.29.0.40, principal=User:ANONYMOUS, listenerName=ListenerName(PLAINTEXT), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=unknown, softwareVersion=unknown), fromPrivilegedListener=true, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@4fea8385]) (kafka.server.KafkaApis) java.util.concurrent.CompletionException: org.apache.kafka.common.errors.UnsupportedVersionException: LeaveGroup response version 0 can only contain one member, got 0 members. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:838) at java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:848) at java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2168) at kafka.server.KafkaApis.handleLeaveGroupRequest(KafkaApis.scala:1794) at kafka.server.KafkaApis.handle(KafkaApis.scala:196) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:75) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.kafka.common.errors.UnsupportedVersionException: LeaveGroup response version 0 can only contain one member, got 0 members.

Problem Description

LeaveGroupRequest still use the old version request.

https://github.com/Shopify/sarama/blob/v1.38.1/leave_group_request.go

// Leaves the cluster, called by Close.
func (c *consumerGroup) leave() error {
	c.lock.Lock()
	defer c.lock.Unlock()
	if c.memberID == "" {
		return nil
	}

	coordinator, err := c.client.Coordinator(c.groupID)
	if err != nil {
		return err
	}

	// KIP-345 if groupInstanceId is set, don not leave group when consumer closed.
	// Since we do not discover ApiVersion for brokers, LeaveGroupRequest still use the old version request for now
	if c.groupInstanceId == nil {
		resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
			GroupId:  c.groupID,
			MemberId: c.memberID,
		})
		if err != nil {
			_ = coordinator.Close()
			return err
		}

		// Unset memberID
		c.memberID = ""

		// Check response
		switch resp.Err {
		case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
			return nil
		default:
			return resp.Err
		}
	} else {
		c.memberID = ""
	}
	return nil
}
type LeaveGroupRequest struct {
	Version  int16
	GroupId  string
	MemberId string           // Removed in Version 3
	Members  []MemberIdentity // Added in Version 3
}
@dnwe dnwe added the bug label Aug 2, 2023
dnwe added a commit that referenced this issue Aug 2, 2023
Use the correct version and format of LeaveGroup as determined by the
configured Version field.

Fixes #2486

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
@dnwe dnwe closed this as completed in #2544 Aug 3, 2023
dnwe added a commit that referenced this issue Aug 3, 2023
Use the correct version and format of LeaveGroup as determined by the
configured Version field.

Fixes #2486

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants