[consumer] Trigger a rejoin on partition racks' change [KIP-881] #4291
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This change handles the final part of KIP-881: triggering a rejoin, in case we detect that one of the topics we are subscribed to, has a change in the set of racks of one of its partitions.
More precisely,
trigger a rejoin, if
As for the implementation details:
rd_kafka_metadata_partition_internal_t
now contain an racks and their count.rd_kafka_topic_info_t
now contains a list ofrd_kafka_metadata_partition_internal_t*
For non-regex case (where we look up the topic information from the topic metadata cache), the topic metadata cache entry is internally, using
rd_kafka_metadata_topic_internal_t
. We just populate the racks inside therd_kafka_metadata_partition_internal_t
from the broker/rack mapping.This operation allocates strings, since there's no linkage of the lifetime between the cache entry and the broker/rack mapping.
For the regex case (where we look up the topic information from the full cache), we use the
rd_kafka_metadata_partition_internal_t
inside the full cache. We just populate the racks inside therd_kafka_metadata_partition_internal_t
from the broker/rack mapping. However, since the full cache also contains the broker/rack mapping, we don't allocate extra space for the string, just point inside the broker/rack mapping.All the allocation/deduplication/sorting costs are only paid if the client rack is set. And when the replica racks are set. Otherwise, it's avoided.
A test is also added, and some fixes/changes to the mock broker to facilitate that.