From c586da3a6588c0ee02cc267e9babc6b930d3161e Mon Sep 17 00:00:00 2001 From: Magnus Edenhill Date: Fri, 8 May 2020 10:51:33 +0200 Subject: [PATCH] Restart/abort req timeout scan if broker went down to avoid crash (#2326) --- CHANGELOG.md | 6 ++++++ src/rdkafka_broker.c | 27 +++++++++++++++++++++++++-- 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea3688c55a..930169e943 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -43,6 +43,12 @@ librdkafka. ## Fixes +### General fixes + + * The client could crash in rare circumstances on ApiVersion or + SaslHandshake request timeouts (#2326) + + ### Consumer fixes * The roundrobin partition assignor could crash if subscriptions diff --git a/src/rdkafka_broker.c b/src/rdkafka_broker.c index ce38ca2988..3a4f986689 100644 --- a/src/rdkafka_broker.c +++ b/src/rdkafka_broker.c @@ -720,9 +720,14 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb, rd_kafka_buf_t *rkbuf, *tmp; int cnt = 0; int idx = -1; - const rd_kafka_buf_t *holb = TAILQ_FIRST(&rkbq->rkbq_bufs); + const rd_kafka_buf_t *holb; + + restart: + holb = TAILQ_FIRST(&rkbq->rkbq_bufs); TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) { + rd_kafka_broker_state_t pre_state, post_state; + idx++; if (likely(now && rkbuf->rkbuf_ts_timeout > now)) @@ -784,8 +789,26 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb, && rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0) rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk); + pre_state = rd_kafka_broker_get_state(rkb); + rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf); - cnt++; + cnt++; + + /* If the buf_callback() triggered a broker state change + * (typically through broker_fail()) we can't trust the + * queue we are scanning to not have been touched, so we + * either restart the scan or bail out (if broker is now down), + * depending on the new state. #2326 */ + post_state = rd_kafka_broker_get_state(rkb); + if (pre_state != post_state) { + /* If the new state is DOWN it means broker_fail() + * was called which may have modified the queues, + * to keep things safe we stop scanning this queue. */ + if (post_state == RD_KAFKA_BROKER_STATE_DOWN) + break; + /* Else start scanning the queue from the beginning. */ + goto restart; + } } return cnt;