We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
3.3-SNAPSHOT
From this issue, spring-kafka supports async retry with retry topic. However, IMHO, spring-kafka has a potential bug described below.
spring-kafka
spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Lines 1467 to 1469 in 70a11a6
We can imagine this scenario. (Thread A is thread in executor for Mono or CompletableFuture)
Thread A
Mono
CompletableFuture
Main Thread
failedRecords
failedRecords.size()
failedRecords.clear()
In this scenario, Main thread has 100 failed records to retry. But, Main Thread removed 101 failed records. Therefore, 1 failed record will be missed.
Main thread
The KafkaMessageListenerContainer should not miss any failedRecords during handleAsyncFailure.
KafkaMessageListenerContainer
handleAsyncFailure
The text was updated successfully, but these errors were encountered:
spring-projectsGH-3638: Fixes bug caused by race condition during han…
aa24b47
…dleAsyncFailure().
c155336
Successfully merging a pull request may close this issue.
In what version(s) of Spring for Apache Kafka are you seeing this issue?
3.3-SNAPSHOT
Describe the bug
From this issue,
spring-kafka
supports async retry with retry topic.However, IMHO,
spring-kafka
has a potential bug described below.spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java
Lines 1467 to 1469 in 70a11a6
We can imagine this scenario. (
Thread A
is thread in executor forMono
orCompletableFuture
)Main Thread
: copy records fromfailedRecords
. In this time,failedRecords.size()
is 100. so, Main Thread has 100 failed records to retry.Thread A
: Oops! I encounter an exception during operation. Add this record tofailedRecords
. then,failedRecords.size()
is 101.Main Thread
: clearfailedRecords
by executingfailedRecords.clear()
.In this scenario,
Main thread
has 100 failed records to retry.But,
Main Thread
removed 101 failed records.Therefore, 1 failed record will be missed.
To Reproduce
Expected behavior
The
KafkaMessageListenerContainer
should not miss anyfailedRecords
duringhandleAsyncFailure
.Sample
The text was updated successfully, but these errors were encountered: