From ae569611b13623b751c8c5bab49afdd77f2bb850 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 26 Sep 2023 19:37:09 +0200 Subject: [PATCH 1/4] Failing test --- tests/0139-offset_validation_mock.c | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/0139-offset_validation_mock.c b/tests/0139-offset_validation_mock.c index d1619634b1..9ab81300fc 100644 --- a/tests/0139-offset_validation_mock.c +++ b/tests/0139-offset_validation_mock.c @@ -140,10 +140,11 @@ static void do_test_no_duplicates_during_offset_validation(void) { /** - * @brief Test that an SSL error doesn't cause an offset reset. - * See issue #4293. + * @brief Test that a permanent error doesn't cause an offset reset. + * See issues #4293, #4427. + * @param err The error OffsetForLeaderEpoch fails with. */ -static void do_test_ssl_error_retried(void) { +static void do_test_permanent_error_retried(rd_kafka_resp_err_t err) { rd_kafka_mock_cluster_t *mcluster; rd_kafka_conf_t *conf; const char *bootstraps; @@ -155,7 +156,7 @@ static void do_test_ssl_error_retried(void) { int msg_count = 5; uint64_t testid = test_id_generate(); - SUB_TEST_QUICK(); + SUB_TEST_QUICK("err: %s", rd_kafka_err2name(err)); mcluster = test_mock_cluster_new(3, &bootstraps); rd_kafka_mock_topic_create(mcluster, topic, 1, 1); @@ -165,10 +166,9 @@ static void do_test_ssl_error_retried(void) { "bootstrap.servers", bootstraps, "batch.num.messages", "1", NULL); - /* Make OffsetForLeaderEpoch fail with the _SSL error */ - rd_kafka_mock_push_request_errors(mcluster, - RD_KAFKAP_OffsetForLeaderEpoch, 1, - RD_KAFKA_RESP_ERR__SSL); + /* Make OffsetForLeaderEpoch fail with the corresponding error code */ + rd_kafka_mock_push_request_errors( + mcluster, RD_KAFKAP_OffsetForLeaderEpoch, 1, err); test_conf_init(&conf, NULL, 60); @@ -221,7 +221,8 @@ int main_0139_offset_validation_mock(int argc, char **argv) { do_test_no_duplicates_during_offset_validation(); - do_test_ssl_error_retried(); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__SSL); + do_test_permanent_error_retried(RD_KAFKA_RESP_ERR__RESOLVE); return 0; } From 232a00e3bff3d7ce0d34d01b6fc3cb32f32ab5f2 Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Tue, 26 Sep 2023 19:45:33 +0200 Subject: [PATCH 2/4] Fix for #4427 --- CHANGELOG.md | 12 ++++++++++++ src/rdkafka_offset.c | 28 +++++++++------------------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d02d3dd944..5a2e22e33d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,18 @@ librdkafka v2.2.1 is a maintenance release: are partition leader changes and a stale leader epoch is received (#4429). * Fix a segmentation fault when closing a consumer using the cooperative-sticky assignor before the first assignment (#4381). + * Fix to ensure permanent errors during offset validation continue being retried and + don't cause an offset reset (#). + + +## Fixes + +### Consumer fixes + + * During offset validation a permanent error like host resolution failure + would cause an offset reset. + This isn't what's expected or what the Java implementation does. + Solved by retrying even in case of permanent errors. diff --git a/src/rdkafka_offset.c b/src/rdkafka_offset.c index 00cf8638f5..701a41613d 100644 --- a/src/rdkafka_offset.c +++ b/src/rdkafka_offset.c @@ -991,25 +991,15 @@ static void rd_kafka_toppar_handle_OffsetForLeaderEpoch(rd_kafka_t *rk, rd_kafka_topic_leader_query0(rk, rktp->rktp_rkt, 1, rd_true /* force */); - if (actions & RD_KAFKA_ERR_ACTION_RETRY) { - /* No need for refcnt on rktp for timer opaque - * since the timer resides on the rktp and will be - * stopped on toppar remove. */ - rd_kafka_timer_start_oneshot( - &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, - 500 * 1000 /* 500ms */, - rd_kafka_offset_validate_tmr_cb, rktp); - - } else if (actions & RD_KAFKA_ERR_ACTION_PERMANENT) { - /* Permanent error */ - rd_kafka_offset_reset( - rktp, rd_kafka_broker_id(rkb), - RD_KAFKA_FETCH_POS(RD_KAFKA_OFFSET_INVALID, - rktp->rktp_leader_epoch), - RD_KAFKA_RESP_ERR__LOG_TRUNCATION, - "Unable to validate offset and epoch: %s", - rd_kafka_err2str(err)); - } + /* No need for refcnt on rktp for timer opaque + * since the timer resides on the rktp and will be + * stopped on toppar remove. + * Retries the validation with a new call even in + * case of permanent error. */ + rd_kafka_timer_start_oneshot( + &rk->rk_timers, &rktp->rktp_validate_tmr, rd_false, + 500 * 1000 /* 500ms */, rd_kafka_offset_validate_tmr_cb, + rktp); goto done; } From 2036b729bdd5604a7c4c9fda82ed2efb8a756aca Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 27 Sep 2023 11:18:23 +0200 Subject: [PATCH 3/4] Add PR id --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 5a2e22e33d..6fd2218350 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,7 +9,7 @@ librdkafka v2.2.1 is a maintenance release: * Fix a segmentation fault when closing a consumer using the cooperative-sticky assignor before the first assignment (#4381). * Fix to ensure permanent errors during offset validation continue being retried and - don't cause an offset reset (#). + don't cause an offset reset (#4447). ## Fixes From 48856a7202808858a72e17e1d34b22784f21a6cd Mon Sep 17 00:00:00 2001 From: Emanuele Sabellico Date: Wed, 27 Sep 2023 12:36:24 +0200 Subject: [PATCH 4/4] PR id in description --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6fd2218350..040b348c79 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,7 +19,7 @@ librdkafka v2.2.1 is a maintenance release: * During offset validation a permanent error like host resolution failure would cause an offset reset. This isn't what's expected or what the Java implementation does. - Solved by retrying even in case of permanent errors. + Solved by retrying even in case of permanent errors (#4447).