-
Notifications
You must be signed in to change notification settings - Fork 1.6k
propagate scope in async failures #3950
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
base: main
Are you sure you want to change the base?
propagate scope in async failures #3950
Conversation
acknowledge(acknowledgment); | ||
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) { | ||
@SuppressWarnings("unchecked") | ||
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request; | ||
this.asyncRetryCallback.accept(record, (RuntimeException) ex); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annoying receiving this error if we can recover from that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure what your comment means, but that is indeed not our code style.
See this formatting config to be provided for IntelliJ IDEA: https://github.com/spring-projects/spring-kafka/blob/main/src/idea/spring-framework.xml
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just that we are receiving these errors in the log, but we still can recover from that, so I moved this log here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The else
sentence has to be from a new line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd like to see the fix issued against main
.
And please, follow a DCO requirements.
bfb8f6d
to
b4be8a3
Compare
b4be8a3
to
73aeaaf
Compare
done! |
// copyFailedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord)); | ||
// } else { | ||
invokeErrorHandlerBySingleRecord(copyFailedRecord); | ||
// } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happened to scoped()
wrapper?
Why all of these commenting?
And what is the point of that Observation
property in the FailedRecordTuple
if it is out of use?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, submit code that i was trying to fix the test. SOrry @artembilan
acknowledge(acknowledgment); | ||
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) { | ||
@SuppressWarnings("unchecked") | ||
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request; | ||
this.asyncRetryCallback.accept(record, (RuntimeException) ex); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The else
sentence has to be from a new line.
spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java
Show resolved
Hide resolved
@@ -72,6 +86,9 @@ | |||
import org.springframework.kafka.KafkaException; | |||
import org.springframework.kafka.annotation.EnableKafka; | |||
import org.springframework.kafka.annotation.KafkaListener; | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think all of these black lines in imports are correct.
Please, run gradlew check
and fix all the Checkstyle violations before pushing to PR.
...rc/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java
Show resolved
Hide resolved
@@ -3433,7 +3438,7 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords | |||
} | |||
|
|||
private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) { | |||
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex)); | |||
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex, this.observationRegistry.getCurrentObservation())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not correct.
That this.observationRegistry.getCurrentObservation()
is going to be called when this whole method is called from the completableFutureResult.whenComplet
. And this, in turn, might be called from totally different thread.
So, not a fact that observation is there.
I'm not sure what is the real problem, so I cannot think of something how to help you to propagate that one down to that exception handler for async callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @artembilan while doing the tests realised that.
i refactored everything, if you can have another look i would really appreciate. I will update the description of the PR to clearly exemplify the problem. Also, the tests that i added is pretty clear of the expectations!
afc0a09
to
00190ef
Compare
Signed-off-by: Igor Macedo Quintanilha <igor.quintanilha@teya.com>
00190ef
to
2fea4fd
Compare
Fix trace context loss in async Kafka error handling
This PR addresses an issue where the trace context is lost when handling Kafka message failures asynchronously.
Problem
When async returns are enabled and a consumer failure occurs, the trace context from the original message is not propagated. This leads to each step of the retry/DLT flow starting a new trace instead of continuing the original one.
Example (current behavior):
• Producer → trace 1
• Consumer → trace 1, fails → message goes to retry topic
• Retry listener → trace 2, fails → message goes to DLT topic
• DLT listener → trace 3
This breaks end-to-end traceability, as each listener receives a new trace ID.
Root cause
The issue stems from the handleAsyncFailure method, which runs in a different thread but does not propagate the original Observation (trace) context associated with the failed record.
Fix
Ensure that the observation context is correctly propagated when handling async failures. This preserves the trace ID across retry and DLT flows.
🔧 Tested using version 3.3.6 so I could build and validate the JAR in a real-world project.