-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
KAFKA-3949: Fix race condition when metadata update arrives during rebalance #1762
Conversation
Ping @guozhangwang and @vahidhashemian for review. Fixing this was a bit more work than expected. I felt that depending on the flag inside Vahid, hopefully this doesn't screw up your work on KIP-70 too much, but let me know. |
@ijuma @hachikuji FYI: I re-triggered jenkins job 5144 - 5150 on https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/ to test |
} else { | ||
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); | ||
int recordsRemaining = maxPollRecords; | ||
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it safe to remove this check now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had to move the check from here and into KafkaConsumer.pollOnce()
. Sort of seems better that way anyway because we check additional causes of rebalance there.
LGTM over all. Just on clarification question. |
@@ -137,12 +139,14 @@ public String protocolType() { | |||
|
|||
@Override | |||
public List<ProtocolMetadata> metadata() { | |||
Set<String> subscribedTopics = new HashSet<>(subscriptions.subscription()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can joinedSubscription
be initialized here (i.e. this.joinedSubscription = new HashSet<>(subscriptions.subscription());
) and used instead of locally defined variable subscribedTopics
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good catch. My intent was actually to use the same object returned from subscriptions.subscription()
, which is why I modified the code to make that object immutable after creation.
@hachikuji Thanks for fixing the problem. I'll update KIP-70 code changes based on this PR. I am also wondering if this update is something that can be verified in a unit test. For example, one that tries the scenario of KAFKA-3949, perhaps a number of times, by validating the generated assignments? Or, if the temporary fix that we put in a while ago can be removed (as I asked inline), then the unit tests for consecutive pattern subscriptions should not intermittently fail anymore (and they could also be considered unit tests for the removal of the race condition). |
@vahidhashemian Thanks for the review. I'll check whether there's a reliable way to unit test this. |
6 consecutive Jenkins builds does not reproduce the failure in |
// refresh which changes the matched subscription set) can occur while another rebalance is | ||
// still in progress. | ||
if (needsJoinPrepare) { | ||
onJoinPrepare(generation.generationId, generation.memberId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit concerned about the overhead here since most implementation of onJoinPrepare
may involve committing offsets etc which is latency sensitive, while most (if not all??) of such functions only need to be triggered once, since consumers will pause fetching during rebalance anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EDIT: nvm.
The latest jenkins still fail on the stream integration test, so I think #1746 is still worth investigating. I will merge this PR as is. |
Merged to trunk. |
public void subscribeFromPattern(Set<String> topics) { | ||
if (subscriptionType != SubscriptionType.AUTO_PATTERN) | ||
throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " + | ||
subscriptionType); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Does this error indicate a different issue that we would normally report using the defined SUBSCRIPTION_EXCEPTION_MESSAGE
in other methods? In subscribe(pattern, listener)
method we already make sure that subscriptionType
is properly set, so I'm guessing what you added above helps with the race condition fix. But it would be great if you can confirm. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vahidhashemian It could use the same message I guess. This is more of a state assertion which we don't expect to reach the user because we control when it's called. I would have used an assert
if that was more commonly used in the codebase.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thanks for clarifying. It seems this method is called only if pattern subscription is used. And state is initially verified in the corresponding subscribe
method here. Will there be a scenario in which that initial verification passes and this state check fails?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the import check is here: https://github.com/hachikuji/kafka/blob/7971c2f45fc399ec8c5d2b628fdd6c575b1b2745/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java#L160. There shouldn't be any reason for this exception to be thrown, supposing we don't break the code in the future.
No description provided.