Skip to content

Commit

Permalink
Fix to metadata refresh interruption (#4679)
Browse files Browse the repository at this point in the history
Metadata refreshes without partition
leader change could lead to a loop of
metadata calls at fixed intervals.
Solved by stopping metadata refresh
when all existing metadata is non-stale.
Happening since 2.3.0
  • Loading branch information
emasab authored and anchitj committed Jun 10, 2024
1 parent c0434f7 commit 355c06d
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 22 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ librdkafka v2.4.0 is a feature release:
* Fix to metadata cache expiration on full metadata refresh (#4677).
* Fix for a wrong error returned on full metadata refresh before joining
a consumer group (#4678).
* Fix to metadata refresh interruption (#4679).


## Upgrade considerations
Expand Down Expand Up @@ -58,6 +59,10 @@ librdkafka v2.4.0 is a feature release:
could lead to an `UNKNOWN_TOPIC_OR_PART` error. Solved by updating
the consumer group following a metadata refresh only in safe states.
Happening since 2.1.0 (#4678).
* Issues: #4577.
Metadata refreshes without partition leader change could lead to a loop of
metadata calls at fixed intervals. Solved by stopping metadata refresh when
all existing metadata is non-stale. Happening since 2.3.0 (#4679).

### Consumer fixes

Expand Down
12 changes: 10 additions & 2 deletions src/rdkafka_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -2673,8 +2673,16 @@ rd_kafka_mock_request_copy(rd_kafka_mock_request_t *mrequest) {
return request;
}

void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *element) {
rd_free(element);
void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mrequest) {
rd_free(mrequest);
}

void rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mrequests,
size_t mrequest_cnt) {
size_t i;
for (i = 0; i < mrequest_cnt; i++)
rd_kafka_mock_request_destroy(mrequests[i]);
rd_free(mrequests);
}

static void rd_kafka_mock_request_free(void *element) {
Expand Down
8 changes: 8 additions & 0 deletions src/rdkafka_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
* librdkafka - Apache Kafka C library
*
* Copyright (c) 2019-2022, Magnus Edenhill
* 2023, Confluent Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
Expand Down Expand Up @@ -388,6 +389,13 @@ typedef struct rd_kafka_mock_request_s rd_kafka_mock_request_t;
*/
RD_EXPORT void rd_kafka_mock_request_destroy(rd_kafka_mock_request_t *mreq);

/**
* @brief Destroy a rd_kafka_mock_request_t * array and deallocate it.
*/
RD_EXPORT void
rd_kafka_mock_request_destroy_array(rd_kafka_mock_request_t **mreqs,
size_t mreq_cnt);

/**
* @brief Get the broker id to which \p mreq was sent.
*/
Expand Down
12 changes: 6 additions & 6 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -1278,8 +1278,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
rd_kafka_broker_t **partbrokers;
int leader_cnt = 0;
int old_state;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_updated_leader_epoch = rd_false;
rd_bool_t partition_exists_with_no_leader_epoch = rd_false;
rd_bool_t partition_exists_with_stale_leader_epoch = rd_false;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR)
rd_kafka_dbg(rk, TOPIC | RD_KAFKA_DBG_METADATA, "METADATA",
Expand Down Expand Up @@ -1353,7 +1353,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,

/* Update leader for each partition */
for (j = 0; j < mdt->partition_cnt; j++) {
int r;
int r = 0;
rd_kafka_broker_t *leader;
int32_t leader_epoch = mdit->partitions[j].leader_epoch;
rd_kafka_toppar_t *rktp =
Expand All @@ -1372,8 +1372,8 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* set to -1, we assume that metadata is not stale. */
if (leader_epoch == -1)
partition_exists_with_no_leader_epoch = rd_true;
else if (rktp->rktp_leader_epoch < leader_epoch)
partition_exists_with_updated_leader_epoch = rd_true;
else if (leader_epoch < rktp->rktp_leader_epoch)
partition_exists_with_stale_leader_epoch = rd_true;


/* Update leader for partition */
Expand All @@ -1396,7 +1396,7 @@ rd_kafka_topic_metadata_update(rd_kafka_topic_t *rkt,
* stale, we can turn off fast leader query. */
if (mdt->partition_cnt > 0 && leader_cnt == mdt->partition_cnt &&
(partition_exists_with_no_leader_epoch ||
partition_exists_with_updated_leader_epoch))
!partition_exists_with_stale_leader_epoch))
rkt->rkt_flags &= ~RD_KAFKA_TOPIC_F_LEADER_UNAVAIL;

if (mdt->err != RD_KAFKA_RESP_ERR_NO_ERROR && rkt->rkt_partition_cnt) {
Expand Down
17 changes: 5 additions & 12 deletions tests/0143-exponential_backoff_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,6 @@
const int32_t retry_ms = 100;
const int32_t retry_max_ms = 1000;

static void free_mock_requests(rd_kafka_mock_request_t **requests,
size_t request_cnt) {
size_t i;
for (i = 0; i < request_cnt; i++)
rd_kafka_mock_request_destroy(requests[i]);
rd_free(requests);
}
/**
* @brief find_coordinator test
* We fail the request with RD_KAFKA_RESP_ERR_GROUP_COORDINATOR_NOT_AVAILABLE,
Expand Down Expand Up @@ -112,7 +105,7 @@ static void test_find_coordinator(rd_kafka_mock_cluster_t *mcluster,
rd_kafka_mock_request_timestamp(requests[i]);
}
rd_kafka_destroy(consumer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
SUB_TEST_PASS();
}
Expand Down Expand Up @@ -166,7 +159,7 @@ static void helper_exponential_backoff(rd_kafka_mock_cluster_t *mcluster,
previous_request_ts =
rd_kafka_mock_request_timestamp(requests[i]);
}
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
}
/**
* @brief offset_commit test
Expand Down Expand Up @@ -297,7 +290,7 @@ static void helper_find_coordinator_trigger(rd_kafka_mock_cluster_t *mcluster,
}
}
}
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
if (num_request != 1)
TEST_FAIL("No request was made.");
}
Expand Down Expand Up @@ -451,7 +444,7 @@ static void test_produce_fast_leader_query(rd_kafka_mock_cluster_t *mcluster,
}
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(producer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
SUB_TEST_PASS();
}
Expand Down Expand Up @@ -511,7 +504,7 @@ static void test_fetch_fast_leader_query(rd_kafka_mock_cluster_t *mcluster,
previous_request_was_Fetch = rd_false;
}
rd_kafka_destroy(consumer);
free_mock_requests(requests, request_cnt);
rd_kafka_mock_request_destroy_array(requests, request_cnt);
rd_kafka_mock_clear_requests(mcluster);
TEST_ASSERT(
Metadata_after_Fetch,
Expand Down
57 changes: 57 additions & 0 deletions tests/0146-metadata_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@

#include "test.h"

#include "../src/rdkafka_proto.h"

static rd_bool_t is_metadata_request(rd_kafka_mock_request_t *request,
void *opaque) {
return rd_kafka_mock_request_api_key(request) == RD_KAFKAP_Metadata;
}

/**
* @brief Metadata should persists in cache after
Expand Down Expand Up @@ -86,6 +92,55 @@ static void do_test_metadata_persists_in_cache(const char *assignor) {

SUB_TEST_PASS();
}

/**
* @brief No loop of metadata requests should be started
* when a metadata request is made without leader epoch change.
* See issue #4577
*/
static void do_test_fast_metadata_refresh_stops(void) {
rd_kafka_t *rk;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
rd_kafka_conf_t *conf;
int metadata_requests;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(3, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 1);

test_conf_init(&conf, NULL, 10);
test_conf_set(conf, "bootstrap.servers", bootstraps);
rd_kafka_conf_set_dr_msg_cb(conf, test_dr_msg_cb);

rk = test_create_handle(RD_KAFKA_PRODUCER, conf);

/* This error triggers a metadata refresh but no leader change
* happened */
rd_kafka_mock_push_request_errors(
mcluster, RD_KAFKAP_Produce, 1,
RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR);

rd_kafka_mock_start_request_tracking(mcluster);
test_produce_msgs2(rk, topic, 0, 0, 0, 1, NULL, 5);

/* First call is for getting initial metadata,
* second one happens after the error,
* it should stop refreshing metadata after that. */
metadata_requests = test_mock_wait_maching_requests(
mcluster, 2, 500, is_metadata_request, NULL);
TEST_ASSERT(metadata_requests == 2,
"Expected 2 metadata request, got %d", metadata_requests);
rd_kafka_mock_stop_request_tracking(mcluster);

rd_kafka_destroy(rk);
test_mock_cluster_destroy(mcluster);

SUB_TEST_PASS();
}

/**
* @brief A metadata call for an existing topic, just after subscription,
* must not cause a UNKNOWN_TOPIC_OR_PART error.
Expand Down Expand Up @@ -134,5 +189,7 @@ int main_0146_metadata_mock(int argc, char **argv) {

do_test_metadata_call_before_join();

do_test_fast_metadata_refresh_stops();

return 0;
}
56 changes: 56 additions & 0 deletions tests/test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7137,7 +7137,63 @@ rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
return mcluster;
}

/**
* @brief Get current number of matching requests,
* received by mock cluster \p mcluster, matching
* function \p match , called with opaque \p opaque .
*/
static size_t test_mock_get_matching_request_cnt(
rd_kafka_mock_cluster_t *mcluster,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque) {
size_t i;
size_t request_cnt;
rd_kafka_mock_request_t **requests;
size_t matching_request_cnt = 0;

requests = rd_kafka_mock_get_requests(mcluster, &request_cnt);

for (i = 0; i < request_cnt; i++) {
if (match(requests[i], opaque))
matching_request_cnt++;
}

rd_kafka_mock_request_destroy_array(requests, request_cnt);
return matching_request_cnt;
}

/**
* @brief Wait that at least \p expected_cnt matching requests
* have been received by the mock cluster,
* using match function \p match ,
* plus \p confidence_interval_ms has passed
*
* @param expected_cnt Number of expected matching request
* @param confidence_interval_ms Time to wait after \p expected_cnt matching
* requests have been seen
* @param match Match function that takes a request and \p opaque
* @param opaque Opaque value needed by function \p match
*
* @return Number of matching requests received.
*/
size_t test_mock_wait_maching_requests(
rd_kafka_mock_cluster_t *mcluster,
size_t expected_cnt,
int confidence_interval_ms,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque) {
size_t matching_request_cnt = 0;

while (matching_request_cnt < expected_cnt) {
matching_request_cnt =
test_mock_get_matching_request_cnt(mcluster, match, opaque);
if (matching_request_cnt < expected_cnt)
rd_usleep(100 * 1000, 0);
}

rd_usleep(confidence_interval_ms * 1000, 0);
return test_mock_get_matching_request_cnt(mcluster, match, opaque);
}

/**
* @name Sub-tests
Expand Down
8 changes: 6 additions & 2 deletions tests/test.h
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,12 @@ rd_kafka_resp_err_t test_delete_all_test_topics(int timeout_ms);
void test_mock_cluster_destroy(rd_kafka_mock_cluster_t *mcluster);
rd_kafka_mock_cluster_t *test_mock_cluster_new(int broker_cnt,
const char **bootstraps);


size_t test_mock_wait_maching_requests(
rd_kafka_mock_cluster_t *mcluster,
size_t num,
int confidence_interval_ms,
rd_bool_t (*match)(rd_kafka_mock_request_t *request, void *opaque),
void *opaque);

int test_error_is_not_fatal_cb(rd_kafka_t *rk,
rd_kafka_resp_err_t err,
Expand Down

0 comments on commit 355c06d

Please # to comment.