Skip to content

Commit

Permalink
Fix loop of OffsetForLeaderEpoch requests on quick leader changes
Browse files Browse the repository at this point in the history
Fixes #4425
  • Loading branch information
milindl committed Sep 14, 2023
1 parent 49f180a commit e465fd9
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
1 change: 0 additions & 1 deletion src/rdkafka_offset.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,6 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk,

err = rd_kafka_handle_OffsetForLeaderEpoch(rk, rkb, err, rkbuf, request,
&parts);

rd_kafka_toppar_lock(rktp);

if (rktp->rktp_fetch_state != RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
Expand Down
8 changes: 4 additions & 4 deletions src/rdkafka_topic.c
Original file line number Diff line number Diff line change
Expand Up @@ -681,9 +681,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
}
}

if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;
else if (leader_epoch > rktp->rktp_leader_epoch) {
if (leader_epoch > rktp->rktp_leader_epoch) {
rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER",
"%s [%" PRId32 "]: leader %" PRId32
" epoch %" PRId32 " -> leader %" PRId32
Expand All @@ -693,7 +691,9 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt,
rktp->rktp_leader_epoch, leader_id, leader_epoch);
rktp->rktp_leader_epoch = leader_epoch;
need_epoch_validation = rd_true;
}
} else if (rktp->rktp_fetch_state ==
RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT)
need_epoch_validation = rd_true;

fetching_from_follower =
leader != NULL && rktp->rktp_broker != NULL &&
Expand Down
91 changes: 91 additions & 0 deletions tests/0139-offset_validation_mock.c
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,95 @@ static void do_test_ssl_error_retried(void) {
}


/**
* @brief If there's an OffsetForLeaderEpoch request which fails, and the leader
* changes meanwhile, we end up in an infinite loop of OffsetForLeaderEpoch
* requests.
* Specifically:
* a. Leader Change - causes OffsetForLeaderEpoch
* request 'A'.
* b. Request 'A' fails with a retriable error, and we retry it.
* c. While waiting for Request 'A', the leader changes again, and we send a
* Request 'B', but the leader epoch is not updated correctly in this
* request, causing a loop.
*
* See #4425.
*/
static void do_test_two_leader_changes(void) {
const char *topic = test_mk_topic_name(__FUNCTION__, 1);
const char *c1_groupid = topic;
rd_kafka_t *c1;
const char *bootstraps;
rd_kafka_mock_cluster_t *mcluster;
int msg_cnt = 5;
uint64_t testid = test_id_generate();
rd_kafka_conf_t *conf;

SUB_TEST_QUICK();

mcluster = test_mock_cluster_new(2, &bootstraps);
rd_kafka_mock_topic_create(mcluster, topic, 1, 2);
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);

/* Seed the topic with messages */
test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "1", NULL);

test_conf_init(&conf, NULL, 60);

test_conf_set(conf, "bootstrap.servers", bootstraps);
test_conf_set(conf, "auto.offset.reset", "earliest");
test_conf_set(conf, "debug", "protocol");

c1 = test_create_consumer(c1_groupid, NULL, conf, NULL);
test_consumer_subscribe(c1, topic);

/* Consume initial messages and join the group, etc. */
test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL);

/* The leader will change from 1->2, and the OffsetForLeaderEpoch will
* be sent to broker 2. We need to first fail it with
* an error, and then give enough time to change the leader before
* returning a success. */
rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1,
RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900);

rd_kafka_mock_broker_push_request_error_rtts(
mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1,
RD_KAFKA_RESP_ERR_NO_ERROR, 1000);

rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2);
rd_kafka_poll(c1, 1000);
/* Enough time to make a request, fail with a retriable error, and
* retry. */
rd_sleep(1);

/* Reset leader. */
rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1);
rd_kafka_poll(c1, 1000);
rd_sleep(1);

/* There should be no infinite loop of OffsetForLeaderEpoch, and
* consequently, we should be able to consume these messages as a sign
* of success. */
test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10,
"bootstrap.servers", bootstraps,
"batch.num.messages", "1", NULL);

test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL);


rd_kafka_destroy(c1);

test_mock_cluster_destroy(mcluster);

TEST_LATER_CHECK();
SUB_TEST_PASS();
}


int main_0139_offset_validation_mock(int argc, char **argv) {

if (test_needs_auth()) {
Expand All @@ -223,5 +312,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) {

do_test_ssl_error_retried();

do_test_two_leader_changes();

return 0;
}

0 comments on commit e465fd9

Please # to comment.