diff --git a/src/rdkafka_topic.c b/src/rdkafka_topic.c index 4341637bc0..6ec9fb5fce 100644 --- a/src/rdkafka_topic.c +++ b/src/rdkafka_topic.c @@ -681,9 +681,7 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, } } - if (rktp->rktp_fetch_state == RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) - need_epoch_validation = rd_true; - else if (leader_epoch > rktp->rktp_leader_epoch) { + if (leader_epoch > rktp->rktp_leader_epoch) { rd_kafka_dbg(rktp->rktp_rkt->rkt_rk, TOPIC, "BROKER", "%s [%" PRId32 "]: leader %" PRId32 " epoch %" PRId32 " -> leader %" PRId32 @@ -693,7 +691,9 @@ static int rd_kafka_toppar_leader_update(rd_kafka_topic_t *rkt, rktp->rktp_leader_epoch, leader_id, leader_epoch); rktp->rktp_leader_epoch = leader_epoch; need_epoch_validation = rd_true; - } + } else if (rktp->rktp_fetch_state == + RD_KAFKA_TOPPAR_FETCH_VALIDATE_EPOCH_WAIT) + need_epoch_validation = rd_true; fetching_from_follower = leader != NULL && rktp->rktp_broker != NULL && diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index d1619634b1..f81b9fb46d 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -212,6 +212,95 @@ static void do_test_ssl_error_retried(void) { } +/** + * @brief If there's an OffsetForLeaderEpoch request which fails, and the leader + * changes meanwhile, we end up in an infinite loop of OffsetForLeaderEpoch + * requests. + * Specifically: + * a. Leader Change - causes OffsetForLeaderEpoch + * request 'A'. + * b. Request 'A' fails with a retriable error, and we retry it. + * c. While waiting for Request 'A', the leader changes again, and we send a + * Request 'B', but the leader epoch is not updated correctly in this + * request, causing a loop. + * + * See #4425. + */ +static void do_test_two_leader_changes(void) { + const char *topic = test_mk_topic_name(__FUNCTION__, 1); + const char *c1_groupid = topic; + rd_kafka_t *c1; + const char *bootstraps; + rd_kafka_mock_cluster_t *mcluster; + int msg_cnt = 5; + uint64_t testid = test_id_generate(); + rd_kafka_conf_t *conf; + + SUB_TEST_QUICK(); + + mcluster = test_mock_cluster_new(2, &bootstraps); + rd_kafka_mock_topic_create(mcluster, topic, 1, 2); + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + + /* Seed the topic with messages */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_conf_init(&conf, NULL, 60); + + test_conf_set(conf, "bootstrap.servers", bootstraps); + test_conf_set(conf, "auto.offset.reset", "earliest"); + test_conf_set(conf, "debug", "protocol"); + + c1 = test_create_consumer(c1_groupid, NULL, conf, NULL); + test_consumer_subscribe(c1, topic); + + /* Consume initial messages and join the group, etc. */ + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + /* The leader will change from 1->2, and the OffsetForLeaderEpoch will + * be sent to broker 2. We need to first fail it with + * an error, and then give enough time to change the leader before + * returning a success. */ + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_KAFKA_STORAGE_ERROR, 900); + + rd_kafka_mock_broker_push_request_error_rtts( + mcluster, 2, RD_KAFKAP_OffsetForLeaderEpoch, 1, + RD_KAFKA_RESP_ERR_NO_ERROR, 1000); + + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 2); + rd_kafka_poll(c1, 1000); + /* Enough time to make a request, fail with a retriable error, and + * retry. */ + rd_sleep(1); + + /* Reset leader. */ + rd_kafka_mock_partition_set_leader(mcluster, topic, 0, 1); + rd_kafka_poll(c1, 1000); + rd_sleep(1); + + /* There should be no infinite loop of OffsetForLeaderEpoch, and + * consequently, we should be able to consume these messages as a sign + * of success. */ + test_produce_msgs_easy_v(topic, testid, 0, 0, msg_cnt, 10, + "bootstrap.servers", bootstraps, + "batch.num.messages", "1", NULL); + + test_consumer_poll("MSG_INIT", c1, testid, 0, 0, msg_cnt, NULL); + + + rd_kafka_destroy(c1); + + test_mock_cluster_destroy(mcluster); + + TEST_LATER_CHECK(); + SUB_TEST_PASS(); +} + + int main_0139_offset_validation_mock(int argc, char **argv) { if (test_needs_auth()) { @@ -223,5 +312,7 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_ssl_error_retried(); + do_test_two_leader_changes(); + return 0; }