Skip to content

Commit

Permalink
Fix for an idempotent producer error, with a message batch not recons…
Browse files Browse the repository at this point in the history
…tructed identically when retried (#4750)

Issues: #4736
Fix for an idempotent producer error, with a message batch not reconstructed identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet".
Happening on large batches. Solved by using the same backoff baseline for all messages in the batch.
Happens since 2.2.0
  • Loading branch information
emasab authored Jun 12, 2024
1 parent 47d7c01 commit e2265b6
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 2 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
librdkafka v2.5.0 is a feature release.

* Fix segfault when using long client id because of erased segment when using flexver. (#4689)
* Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried (#4750)


## Enhancements
Expand All @@ -21,6 +23,15 @@ librdkafka v2.5.0 is a feature release.
Happens since 1.x when a portion of the buffer (segment) is erased for flexver or compression.
More likely to happen since 2.1.0, because of the upgrades to flexver, with certain string sizes like a long client id (#4689).

### Idempotent producer fixes

* Issues: #4736
Fix for an idempotent producer error, with a message batch not reconstructed
identically when retried. Caused the error message "Local: Inconsistent state: Unable to reconstruct MessageSet".
Happening on large batches. Solved by using the same backoff baseline for all messages
in the batch.
Happens since 2.2.0 (#4750).



# librdkafka v2.4.0
Expand Down
12 changes: 10 additions & 2 deletions src/rdkafka_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -896,14 +896,22 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
int retry_max_ms) {
rd_kafka_msgq_t retryable = RD_KAFKA_MSGQ_INITIALIZER(retryable);
rd_kafka_msg_t *rkm, *tmp;
rd_ts_t now;
int64_t jitter = rd_jitter(100 - RD_KAFKA_RETRY_JITTER_PERCENT,
100 + RD_KAFKA_RETRY_JITTER_PERCENT);
/* Scan through messages to see which ones are eligible for retry,
* move the retryable ones to temporary queue and
* set backoff time for first message and optionally
* increase retry count for each message.
* Sorted insert is not necessary since the original order
* srcq order is maintained. */
* srcq order is maintained.
*
* Start timestamp for calculating backoff is common,
* to avoid that messages from the same batch
* have different backoff, as they need to be retried
* by reconstructing the same batch, when idempotency is
* enabled. */
now = rd_clock();
TAILQ_FOREACH_SAFE(rkm, &srcq->rkmq_msgs, rkm_link, tmp) {
if (rkm->rkm_u.producer.retries + incr_retry > max_retries)
continue;
Expand All @@ -927,7 +935,7 @@ int rd_kafka_retry_msgq(rd_kafka_msgq_t *destq,
backoff = jitter * backoff * 10;
if (backoff > retry_max_ms * 1000)
backoff = retry_max_ms * 1000;
backoff = rd_clock() + backoff;
backoff = now + backoff;
}
rkm->rkm_u.producer.ts_backoff = backoff;

Expand Down

0 comments on commit e2265b6

Please # to comment.