Skip to content

Commit

Permalink
Restart/abort req timeout scan if broker went down to avoid crash (#2326
Browse files Browse the repository at this point in the history
)
  • Loading branch information
edenhill committed May 11, 2020
1 parent 2a5aabf commit c586da3
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 2 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ librdkafka.

## Fixes

### General fixes

* The client could crash in rare circumstances on ApiVersion or
SaslHandshake request timeouts (#2326)


### Consumer fixes

* The roundrobin partition assignor could crash if subscriptions
Expand Down
27 changes: 25 additions & 2 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -720,9 +720,14 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
rd_kafka_buf_t *rkbuf, *tmp;
int cnt = 0;
int idx = -1;
const rd_kafka_buf_t *holb = TAILQ_FIRST(&rkbq->rkbq_bufs);
const rd_kafka_buf_t *holb;

restart:
holb = TAILQ_FIRST(&rkbq->rkbq_bufs);

TAILQ_FOREACH_SAFE(rkbuf, &rkbq->rkbq_bufs, rkbuf_link, tmp) {
rd_kafka_broker_state_t pre_state, post_state;

idx++;

if (likely(now && rkbuf->rkbuf_ts_timeout > now))
Expand Down Expand Up @@ -784,8 +789,26 @@ static int rd_kafka_broker_bufq_timeout_scan (rd_kafka_broker_t *rkb,
&& rd_atomic32_sub(&rkb->rkb_blocking_request_cnt, 1) == 0)
rd_kafka_brokers_broadcast_state_change(rkb->rkb_rk);

pre_state = rd_kafka_broker_get_state(rkb);

rd_kafka_buf_callback(rkb->rkb_rk, rkb, err, NULL, rkbuf);
cnt++;
cnt++;

/* If the buf_callback() triggered a broker state change
* (typically through broker_fail()) we can't trust the
* queue we are scanning to not have been touched, so we
* either restart the scan or bail out (if broker is now down),
* depending on the new state. #2326 */
post_state = rd_kafka_broker_get_state(rkb);
if (pre_state != post_state) {
/* If the new state is DOWN it means broker_fail()
* was called which may have modified the queues,
* to keep things safe we stop scanning this queue. */
if (post_state == RD_KAFKA_BROKER_STATE_DOWN)
break;
/* Else start scanning the queue from the beginning. */
goto restart;
}
}

return cnt;
Expand Down

0 comments on commit c586da3

Please # to comment.