Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Deserialization error for Acknowledgement in Spring Kafka Listener #3400

Closed
purbarunc opened this issue Aug 1, 2024 · 11 comments · Fixed by #3401
Closed

Deserialization error for Acknowledgement in Spring Kafka Listener #3400

purbarunc opened this issue Aug 1, 2024 · 11 comments · Fixed by #3401
Assignees
Milestone

Comments

@purbarunc
Copy link

I am getting message deserialization exception when I attempt to acknowledge the message using acknowledge(). If I remove this part of acknowledgement everything is working i.e. messages are getting consumed without any error.

Given below is the last exception in the stack trace.

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.purbarun.kafka.model.OrderMessage] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=OrderMessage[orderRequest=OrderRequest[item=iphone, quantity=4, price=90000], messageId=238394fd-6c85-4463-a2ee-b777806d4188], headers={kafka_offset=19, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@71e72ebe, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=NEW_ORDER, kafka_receivedTimestamp=1722484594330, kafka_groupId=NEW_ORDER_group}]
	at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:151) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.kafka.listener.adapter.KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaNullAwarePayloadArgumentResolver.java:48) ~[spring-kafka-3.2.2.jar:3.2.2]
	at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:118) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:147) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:115) ~[spring-messaging-6.1.11.jar:6.1.11]
	at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:70) ~[spring-kafka-3.2.2.jar:3.2.2]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:420) ~[spring-kafka-3.2.2.jar:3.2.2]
	... 16 common frames omitted

Below is my Java based configuration where I have configured the listener and all that stuff:

@Bean
ConsumerFactory<String, OrderMessage> consumerFactory() {

// Creating a map of string-object type
Map<String, Object> config = new HashMap<>();

// Adding the Configuration
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "NEW_ORDER_group");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);

// Returning message in JSON format
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
		new JsonDeserializer<>(OrderMessage.class));
}

// Creating a Listener
@Bean
ConcurrentKafkaListenerContainerFactory<String, OrderMessage> orderListener() {
ConcurrentKafkaListenerContainerFactory<String, OrderMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

I have tried JsonDeserializer of type GenericMessage<OrderMessage> but configuration seems to be not correct as per the requirement.

Can anyone help to fix the same.

I have referred the code from https://github.com/spring-tips/kafka/blob/4d1d09caed751f94419b3229a3a65bef9a57ec0b/producer/src/main/java/com/example/analytics/ProducerApplication.java#L71

My code for producer and consumer is present in the given link:
https://github.com/purbarunc/Spring-Boot-Kafka

There is no reproduction steps as such. Just need to create the containers using docker compose and once producer and consumer app is up use the below curl to publish message to the Kafka Topic.

curl --request POST \
  --url http://localhost:8080/order \
  --header 'Content-Type: application/json' \
  --header 'User-Agent: insomnia/8.6.1' \
  --cookie JSESSIONID=14321649AD00C61688D54D493B5F2E10 \
  --data '{
	"item": "iphone",
	"quantity": 4,
	"price": 90000
}'
@sobychacko
Copy link
Contributor

@purbarunc Thanks for the report. We will triage the issue soon and get back to you.

@sobychacko sobychacko self-assigned this Aug 1, 2024
@artembilan
Copy link
Member

The Acknowledgment argument makes sense (and resolved respectively) only if ContainerProperties.AckMode is MANUAL or MANUAL_IMMEDIATE.
I don't see anywhere in your configuration such an option applied.
See more info in docs:
https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/listener-annotation.html#manual-acknowledgment
https://docs.spring.io/spring-kafka/reference/kafka/receiving-messages/message-listener-container.html#committing-offsets

The issue has nothing to do with deserialziation. Just because the MessageListener is not an AcknowledgingMessageListener, there is no way to resolve this Acknowledgment argument from some other place, so it falls back to the mentioned PayloadMethodArgumentResolver.
You see, the mentioned project does configure respective property before using this feature: https://github.com/spring-tips/kafka/blob/main/producer/src/main/resources/application.properties#L3

Perhaps we can improve the logic somehow to indicate such an illegal state and reject such a configuration when no manual ack mode is configured.

@purbarunc
Copy link
Author

@artembilan Is this the only change that is needed purbarunc/Spring-Boot-Kafka@e0ef274 or there should also be some change in Java Configuration since I am still getting MessageConversionException even after adding the property. Please let me know if any more changes are needed as from the docs it seems this is the only missing piece.

@artembilan
Copy link
Member

That change would make sense if you rely on the Spring Boot auto-configuration, but apparently you create ConcurrentKafkaListenerContainerFactory manually, so those properties won't have any effect on your application.
Therefore you need to do like this:

factory.getContainerProperties().setAckMode(AckMode.MANUAL);

Not sure, though, why would one use Spring Boot, but still creates all the beans manually.
Your ConsumerFactory also does not look like require any manual creation.

Please, also take a look closer to that stack trace. It has a message like this:

No Acknowledgment available as an argument, the listener container must have a MANUAL AckMode to populate the Acknowledgment.

@purbarunc
Copy link
Author

@artembilan the issue solved by adding this factory.getContainerProperties().setAckMode(AckMode.MANUAL);

But I agree with you that I am not using the power of Spring Boot to any extent and hence I tried eliminating all the Java Config code purbarunc/Spring-Boot-Kafka@f2b5222 I see the below same error over again:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.purbarun.kafka.model.OrderMessage] for GenericMessage [payload={"orderRequest":{"item":"iphone","quantity":4,"price":90000},"messageId":"5fd21348-d6a4-4788-8c2e-b635162fed7a"}, headers={kafka_offset=24, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@57b346f2, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=NEW_ORDER, kafka_receivedTimestamp=1722527413774, kafka_acknowledgment=Acknowledgment for NEW_ORDER-0@24, kafka_groupId=NEW_ORDER_group}]

even after adding the below property:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer

Do you have any solution for the same.

@artembilan
Copy link
Member

I'm not sure what for the same phrase means, but I hear it a lot from the community.
Would you mind to re-phrase it some how else, so I would get a gist of its meaning 😄

Even if exception is the same but the reason is slightly different:

Cannot convert from [org.purbarun.kafka.model.OrderMessage] to [org.springframework.kafka.support.Acknowledgment]

from original one in opposed to your new one:

Cannot convert from [java.lang.String] to [org.purbarun.kafka.model.OrderMessage]

Looks like that spring.kafka.consumer.value-deserializer does not have an effect somehow and default StringDeserializer one is in action.
Or... it ignores all the type info and just returns String for you.

Try to add this property then spring.kafka.consumer.properties[spring.json.use.type.headers]=false.
This way the target type to convert JSON into is going to be inferred from your @KafkaListener method parameters.

@purbarunc
Copy link
Author

@artembilan for the same here was the above mentioned error instead of repeating the same thing over again just like a address/tag 😄

But do you mean this spring.kafka.producer.properties.spring.json.add.type.headers=false ?? Even this doesnt seem to be working neither spring.kafka.consumer.properties[spring.json.use.type.headers]=false

purbarunc/Spring-Boot-Kafka@32051d6

@artembilan
Copy link
Member

artembilan commented Aug 1, 2024

Right, but for the same is not a correct English phrase in this context. It feels like not finished sentence. Something else has to go after same.
That's why I'm always confused what does that mean.
We can ask @sobychacko to judge us, but in my opinionon the matter is the right choice for this kind of situation.

I cannot play with your application right now, but I think this is what you need over there:

spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.use.type.headers]=false

See more info in docs: https://docs.spring.io/spring-boot/reference/messaging/kafka.html#messaging.kafka.additional-properties

Not sure why you have commented out a JsonDeserializer in your last commit.

Either way, this does not look like we use GitHub issues feature properly.
This is more like a support ticket discussion which is out of scope of issue purpose.

I prefer to do support on StackOverflow. GH issues are for bugs and project improvement requests.
We have already realized for this issue that we need to improve the stack trace for this MANUAL ack mode and respective docs.

Everything else sounds like issue hijacking.

Thanks for understanding!

@purbarunc
Copy link
Author

Okay..We can probably close this thread right here.

@sobychacko
Copy link
Contributor

We will still use this issue to improve the exception.

@sobychacko
Copy link
Contributor

FWIW, this the correct configuration you need to run your app:

spring.kafka.listener.ack-mode=manual
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.consumer.properties[spring.json.value.default.type]=org.purbarun.kafka.model.OrderMessage
spring.kafka.consumer.properties[spring.json.trusted.packages]=org.purbarun.kafka.model

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants