You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Originally posted by rbraeunlich February 13, 2025
Hi everyone,
I was wondering about an error in my application (not part of my question). While trying to simplyfy my code, e.g. by using a suspend function in my listener, which is now being supported, I tried to find out if I have to do the ack by myself. I checked the documentation and also found this earlier question: #3290
This indicates that I don't have to do the ack myself. Nevertheless, when debugging my integration tests I could see that the first message never was acknowledged. When doing it myself, all tests pass.
The critical part seems to be this line: https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java#L553
My suspend function doesn't return anything and therefore the adapter doesn't do the ack for me. The documentation only states "return types include CompletableFuture, Mono and Kotlin suspend functions". Not that my suspend function has to return anything.
Additionally, if I was to return a kotlinx.coroutines.Job or kotlinx.coroutines.Deferred the helper in org.springframework.kafka.listener.adapter.AdapterUtils#isAsyncReply wouldn't recognize them because it only checks for Mono and CompletableFuture.
So, I wonder if the documentation here needs to be more specific or if the code is wrong.
Cheers,
Ronny
The text was updated successfully, but these errors were encountered:
… functions
Fixes: spring-projects#3740
Issue link: spring-projects#3740
Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called
because this kind of method is not treated as an `asyncReplies` mode
* Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()`
in addition to `CompletableFuture` & `Mono`
* Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called
by the Framework
**Auto-cherry-pick to `3.3.x` & `3.2.x`**
Fixes: #3740
Issue link: #3740
Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called
because this kind of method is not treated as an `asyncReplies` mode
* Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()`
in addition to `CompletableFuture` & `Mono`
* Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called
by the Framework
**Auto-cherry-pick to `3.2.x`**
# Conflicts:
# spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java
Fixes: #3740
Issue link: #3740
Even if Kotlin `suspend` functions are called properly, the acknowledgement is not called
because this kind of method is not treated as an `asyncReplies` mode
* Fix `HandlerAdapter` to check for `KotlinDetector.isSuspendingFunction()`
in addition to `CompletableFuture` & `Mono`
* Adjust `EnableKafkaKotlinCoroutinesTests.kt` to verify that `acknowledgement` has been called
by the Framework
# Conflicts:
# spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/HandlerAdapter.java
(cherry picked from commit 914cf62)
Discussed in #3739
Originally posted by rbraeunlich February 13, 2025
Hi everyone,
I was wondering about an error in my application (not part of my question). While trying to simplyfy my code, e.g. by using a
suspend
function in my listener, which is now being supported, I tried to find out if I have to do theack
by myself. I checked the documentation and also found this earlier question: #3290This indicates that I don't have to do the
ack
myself. Nevertheless, when debugging my integration tests I could see that the first message never was acknowledged. When doing it myself, all tests pass.The critical part seems to be this line:
https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java#L553
My suspend function doesn't return anything and therefore the adapter doesn't do the ack for me. The documentation only states "return types include CompletableFuture, Mono and Kotlin suspend functions". Not that my suspend function has to return anything.
Additionally, if I was to return a
kotlinx.coroutines.Job
orkotlinx.coroutines.Deferred
the helper inorg.springframework.kafka.listener.adapter.AdapterUtils#isAsyncReply
wouldn't recognize them because it only checks forMono
andCompletableFuture
.So, I wonder if the documentation here needs to be more specific or if the code is wrong.
Cheers,
Ronny
The text was updated successfully, but these errors were encountered: