Skip to content

Commit

Permalink
KAFKA-3949: Fix race condition when metadata update arrives during re…
Browse files Browse the repository at this point in the history
…balance

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Vahid Hashemian, Guozhang Wang

Closes #1762 from hachikuji/KAFKA-3949
  • Loading branch information
hachikuji authored and guozhangwang committed Aug 20, 2016
1 parent c5d26c4 commit 317c4fd
Show file tree
Hide file tree
Showing 11 changed files with 354 additions and 308 deletions.
13 changes: 10 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,15 @@ else if (expireMs <= now) {
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);

// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
this.cluster = cluster;
}

notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
Expand Down Expand Up @@ -287,7 +294,7 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.<String>emptySet();
Set<String> internalTopics = Collections.emptySet();
if (cluster != null) {
internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(topics, listener);
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
Expand Down Expand Up @@ -914,7 +914,7 @@ public void assign(Collection<TopicPartition> partitions) {
}

log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(partitions);
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
}
} finally {
Expand Down Expand Up @@ -1007,6 +1007,12 @@ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {

long now = time.milliseconds();
client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();

return fetcher.fetchedRecords();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ public void subscribe(Collection<String> topics) {
public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(pattern, listener);
List<String> topicsToSubscribe = new ArrayList<>();
Set<String> topicsToSubscribe = new HashSet<>();
for (String topic: partitions.keySet()) {
if (pattern.matcher(topic).matches() &&
!subscriptions.subscription().contains(topic))
topicsToSubscribe.add(topic);
}
ensureNotClosed();
this.subscriptions.changeSubscription(topicsToSubscribe);
this.subscriptions.subscribeFromPattern(topicsToSubscribe);
}

@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(topics, listener);
this.subscriptions.subscribe(new HashSet<>(topics), listener);
}

@Override
public void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
this.subscriptions.assignFromUser(partitions);
this.subscriptions.assignFromUser(new HashSet<>(partitions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,6 @@ public synchronized void ensureActiveGroup() {
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();

if (!needRejoin())
return;

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}

if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
Expand All @@ -288,6 +278,16 @@ public synchronized void ensureActiveGroup() {
while (needRejoin()) {
ensureCoordinatorReady();

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}

// ensure that there are no pending requests to the coordinator. This is important
// in particular to avoid resending a pending JoinGroup request.
if (client.pendingRequestCount(this.coordinator) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// of offset commit requests, which may be invoked from the heartbeat thread
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;

private boolean isLeader = false;
private Set<String> joinedSubscription;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
private long nextAutoCommitDeadline;
Expand Down Expand Up @@ -137,9 +139,10 @@ public String protocolType() {

@Override
public List<ProtocolMetadata> metadata() {
this.joinedSubscription = subscriptions.subscription();
List<ProtocolMetadata> metadataList = new ArrayList<>();
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
Subscription subscription = assignor.subscription(joinedSubscription);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
Expand All @@ -155,26 +158,26 @@ public void onMetadataUpdate(Cluster cluster) {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));

if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
final Set<String> topicsToSubscribe = new HashSet<>();

for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
!(excludeInternalTopics && cluster.internalTopics().contains(topic)))
topicsToSubscribe.add(topic);

subscriptions.changeSubscription(topicsToSubscribe);
subscriptions.subscribeFromPattern(topicsToSubscribe);

// note we still need to update the topics contained in the metadata. Although we have
// specified that all topics should be fetched, only those set explicitly will be retained
metadata.setTopics(subscriptions.groupSubscription());
}

// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
if (!snapshot.equals(metadataSnapshot)) {
if (!snapshot.equals(metadataSnapshot))
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}

}
});
}
Expand All @@ -192,12 +195,9 @@ protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// if we were the assignor, then we need to make sure that there have been no metadata updates
// since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
Expand Down Expand Up @@ -246,13 +246,10 @@ public void poll(long now) {
now = time.milliseconds();
}

if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
// the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
// while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
// track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
// ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
// rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();

Expand Down Expand Up @@ -303,6 +300,8 @@ protected Map<String, ByteBuffer> performAssignment(String leaderId,
// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();

isLeader = true;
assignmentSnapshot = metadataSnapshot;

log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
Expand Down Expand Up @@ -339,14 +338,24 @@ protected void onJoinPrepare(int generation, String memberId) {
listener.getClass().getName(), groupId, e);
}

assignmentSnapshot = null;
subscriptions.needReassignment();
isLeader = false;
subscriptions.resetGroupSubscription();
}

@Override
protected boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;

// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;

// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;

return super.needRejoin();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,22 @@ private long listOffset(TopicPartition partition, long timestamp) {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;

while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;

nextInLineRecords = parseFetchedData(completedFetch);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
nextInLineRecords = parseFetchedData(completedFetch);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}

return drained;
}

return drained;
}

private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
Expand Down
Loading

0 comments on commit 317c4fd

Please # to comment.