Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Restart/abort req timeout scan if broker went down to avoid crash (#2326) #2877

Merged
merged 1 commit into from
May 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ librdkafka.

## Fixes

### General fixes

* The client could crash in rare circumstances on ApiVersion or
SaslHandshake request timeouts (#2326)


### Consumer fixes

* The roundrobin partition assignor could crash if subscriptions
Expand Down
27 changes: 25 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,14 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf, *tmp;
int cnt = 0;
int idx = -1;
const rd_kafka_buf_t *holb = TAILQ_FIRST(&rkbq->rkbq_bufs);
const rd_kafka_buf_t *holb;

restart:
holb = TAILQ_FIRST(&rkbq->rkbq_bufs);

TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
rd_kafka_broker_state_t pre_state, post_state;

idx++;

if (likely(now && rkbuf->rkbuf_ts_timeout > now))
Expand Down Expand Up @@ -784,8 +789,26 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
&& rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0)
rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);

pre_state = rd_kafka_broker_get_state(rkb);

rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
cnt++;
cnt++;

/* If the buf_callback() triggered a broker state change
* (typically through broker_fail()) we can't trust the
* queue we are scanning to not have been touched, so we
* either restart the scan or bail out (if broker is now down),
* depending on the new state. #2326 */
post_state = rd_kafka_broker_get_state(rkb);
if (pre_state != post_state) {
/* If the new state is DOWN it means broker_fail()
* was called which may have modified the queues,
* to keep things safe we stop scanning this queue. */
if (post_state == RD_KAFKA_BROKER_STATE_DOWN)
break;
/* Else start scanning the queue from the beginning. */
goto restart;
}
}

return cnt;
Expand Down