Skip to content

Commit

Permalink
GH-3695: Fix MessagingMessageListenerAdapter to not ack when sync (#…
Browse files Browse the repository at this point in the history
…3696)

Fixes: #3695

Even if th `@KafkaHandler` method is `void` the `DelegatingInvocableHandler` returns an empty `InvocationResult`.
That triggers a `MessagingMessageListenerAdapter.handleResult()` logic.
On the `completableFutureResult.whenComplete()` we call `acknowledge()` which is not expected for `void` POJO methods.

* Fix `MessagingMessageListenerAdapter` to check for `isAsyncReplies()` before calling `acknowledge()`

This is a regression after #3528
  • Loading branch information
artembilan authored Jan 6, 2025
1 parent c9e7edc commit 8d7ce48
Showing 1 changed file with 7 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
import org.springframework.util.TypeUtils;

/**
* An abstract {@link org.springframework.kafka.listener.MessageListener} adapter
* An abstract {@link MessageListener} adapter
* providing the necessary infrastructure to extract the payload of a
* {@link org.springframework.messaging.Message}.
* {@link Message}.
*
* @param <K> the key type.
* @param <V> the value type.
Expand Down Expand Up @@ -205,9 +205,9 @@ public void setMessageConverter(RecordMessageConverter messageConverter) {

/**
* Return the {@link MessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
* being able to convert {@link Message}.
* @return the {@link MessagingMessageConverter} for this listener,
* being able to convert {@link org.springframework.messaging.Message}.
* being able to convert {@link Message}.
*/
protected final RecordMessageConverter getMessageConverter() {
return this.messageConverter;
Expand Down Expand Up @@ -550,7 +550,9 @@ else if (!(result instanceof CompletableFuture<?>)) {
try {
if (t == null) {
asyncSuccess(r, replyTopic, source, messageReturnType);
acknowledge(acknowledgment);
if (isAsyncReplies()) {
acknowledge(acknowledgment);
}
}
else {
Throwable cause = t instanceof CompletionException ? t.getCause() : t;
Expand Down

0 comments on commit 8d7ce48

Please # to comment.