Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Spring Boot Kafka Issue With Transactionality When Using ReplyingKafkaTemplate #3560

Open
periplanomenos opened this issue Oct 16, 2024 · 0 comments

Comments

@periplanomenos
Copy link

Version(s) of Spring for Apache Kafka

3.4

Description

I want to use ReplyingKafkaTemplate in order to implement Request-Reply messaging pattern with Kafka having end-to-end transactionality. I have tried many solutions and I concluded to the following code having the following issue:

Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @transactional before invoking the template method, run in a transaction started by a listener container when consuming a record

With using KafkaTemplate I am able to send event to the target topic and many others with transactionality. The issues arise with ReplyingKafkaTemplate.

Reproduce

Create and send events to the topic that I want to get a synchronous reply.

Expected behavior

I want the listener to process the event and send back the reponse. I suppose that as the proper @transactional annotations are in place there should not be and issue with transactionality.

Code

Configuration

@Bean
public ProducerFactory<String, Event> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    configProps.put(ProducerConfig.ACKS_CONFIG, acks);
    configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionId);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    configProps.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
    configProps.put(ProducerConfig.RETRIES_CONFIG, retries);
    configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    configProps.put(ProducerConfig.LINGER_MS_CONFIG, lingerMs);
    configProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
    configProps.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, requestTimeoutMs);
    configProps.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, deliveryTimeoutMs);

    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Event> kafkaTemplate(ProducerFactory<String, Event> producerFactory) {
    return new KafkaTemplate<>(producerFactory);
}

@Bean
public ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate(ProducerFactory<String, Event> producerFactory,
                                                                         ConcurrentMessageListenerContainer<String, Event> repliesContainer) {
    ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate = new ReplyingKafkaTemplate<>(producerFactory, repliesContainer);
    replyingKafkaTemplate.setSharedReplyTopic(true);

    return replyingKafkaTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory(
        KafkaTemplate<String, Event> kafkaTemplate) {
    Map<String, Object> consumerFactoryConfigProps = new HashMap<>();
    consumerFactoryConfigProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    consumerFactoryConfigProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerFactoryConfigProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    consumerFactoryConfigProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    ConsumerFactory<String, Event> consumerFactory = new DefaultKafkaConsumerFactory<>(
            consumerFactoryConfigProps, new StringDeserializer(), new JsonDeserializer<>(Event.class));

    ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();
    kafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
    kafkaListenerContainerFactory.setReplyTemplate(kafkaTemplate);

    return kafkaListenerContainerFactory;
}

@Bean
public ConcurrentMessageListenerContainer<String, Event> repliesContainer(
        ConcurrentKafkaListenerContainerFactory<String, Event> kafkaListenerContainerFactory,
        @Value("${application.kafka.replies.topicName}") String topicName) {
    Properties repliesContainerConfigProps = new Properties();
    repliesContainerConfigProps.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

    ConcurrentMessageListenerContainer<String, Event> repliesContainer =
            kafkaListenerContainerFactory.createContainer(topicName);
    repliesContainer.getContainerProperties().setGroupId(groupId);
    repliesContainer.getContainerProperties().setKafkaConsumerProperties(repliesContainerConfigProps);

    return repliesContainer;
}

KafkaListener

@Transactional(value = "kafkaTransactionManager")
@KafkaListener(topics = "${application.kafka.synchronousTest.topicName}",
        groupId = "${application.kafka.group-id}",
        concurrency = "${application.kafka.synchronousTest.concurrency}")
@SendTo
public SynchronousTestEvent receiveAndForwardMessageFromTopic(SynchronousTestEvent incomingSynchronousTestEvent) {
    log.debug("Received message: {}", incomingSynchronousTestEvent);

    SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventHandler.handleRequestReplyEvent(incomingSynchronousTestEvent);

    return outgoingSynchronousTestEvent;
}

SynchronousTestEventService processing event from KafkaListener

@Transactional(value = "kafkaTransactionManager")
@Override
public SynchronousTestEvent handleRequestReplyEvent(SynchronousTestEvent synchronousTestEvent) {
    try {
        log.debug("Received message: {}", synchronousTestEvent);

        SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventFactory
                .createEvent(synchronousTestEvent.getIncomingFileDto(), true, "Success");

        return outgoingSynchronousTestEvent;
    } catch (Exception ex) {
        log.error("Error", ex);

        synchronousTestEvent.setProcessed(false);
        synchronousTestEvent.setMessage(ex.getMessage());

        return synchronousTestEvent;
    }
}

Method generating an event to be sent using ReplyingKafkaTemplate

@Transactional("kafkaTransactionManager")
public void sendSynchronousTestEvent(SubmissionDto submissionDto) throws ExecutionException, InterruptedException, TimeoutException {
    SynchronousTestEvent outgoingSynchronousTestEvent = synchronousTestEventFactory
            .createEvent(submissionDto.getIncomingFileDto(), false, null);

    synchronousTestRequestReplyService.sendRequestReplyEvent(outgoingSynchronousTestEvent);
}

Method sending event to using ReplyingKafkaTemplate

private final ReplyingKafkaTemplate<String, Event, Event> replyingKafkaTemplate;

@Transactional(value = "kafkaTransactionManager")
@Override
public Event sendRequestReplyEvent(Event event) throws ExecutionException, InterruptedException, TimeoutException {
    ProducerRecord<String, Event> record = new ProducerRecord<>(synchronousTestΤopicName, event.key(), event);
    record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, repliesΤopicName.getBytes()));
    RequestReplyFuture<String, Event, Event> replyFuture = replyingKafkaTemplate.sendAndReceive(record);
    SendResult<String, Event> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
    ConsumerRecord<String, Event> consumerRecord = replyFuture.get(20, TimeUnit.SECONDS);

    return consumerRecord.value();
}

Logs

2024-10-16T11:00:58.402+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-10-16T11:00:58.430+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 1 records
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .a.RecordMessagingMessageListenerAdapter : Processing [GenericMessage [payload=SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null), headers={kafka_offset=4, kafka_consumer=org.springframework.kafka.core.DefaultKafkaConsumerFactory$ExtendedKafkaConsumer@50dbcb28, kafka_correlationId=[B@7f70fe7d, kafka_timestampType=CREATE_TIME, kafka_replyTopic=[B@4c1f883c, kafka_receivedPartitionId=0, kafka_receivedMessageKey=181fe61d-ad5f-464b-a048-8919ce0eb116, kafka_receivedTopic=elfund-synchronous-test, kafka_receivedTimestamp=1729065658399, kafka_groupId=elfund-group}]]
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager          : Creating new transaction with name [gr.test.integration.kafka.events.synchronous_test.SynchronousTestEventListener.receiveAndForwardMessageFromTopic]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT; 'kafkaTransactionManager'
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a] beginTransaction()
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager          : Created Kafka transaction on producer [CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a]]
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] s.e.i.k.e.s.SynchronousTestEventListener : Received message: SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null)
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager          : Participating in existing transaction
2024-10-16T11:00:58.431+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .b.i.s.e.p.r.SynchronousTestEventService : Received message: SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=false, message=null)
2024-10-16T11:01:04.686+03:00 DEBUG 22368 --- [elfund-processing] [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Received: 0 records
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-10-16T11:01:04.687+03:00 DEBUG 22368 --- [elfund-processing] [Container-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.t.KafkaTransactionManager          : Initiating transaction commit
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.core.DefaultKafkaProducerFactory   : CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@635db03a] commitTransaction()
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] .a.RecordMessagingMessageListenerAdapter : Listener method returned result [SynchronousTestEvent(incomingFileDto=IncomingFileDto(id=null, uuid=181fe61d-ad5f-464b-a048-8919ce0eb116, externalEntityId=45, externalEntityInternalCode=011, subsystemCode=6, irisUserId=null, fileExchangeNumber=0, fileName=MFMC_T0_custom.xlsx, filePath=C:\Temp\elfund\incoming\, fileType=XLSX, messageId=null, messageTimeStamp=0, workbookDataAsCompressedData=null, jsonAsCompressedData=null, createdDate=2024-10-16T08:00:58.284047800Z, modifiedDate=2024-10-16T08:00:58.284047800Z), processed=true, message=Success)] - generating response message for it
2024-10-16T11:01:04.689+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[kafka_correlationId] WILL be mapped, matched pattern=*
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[id] WILL NOT be mapped, matched pattern=!id
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] erMapper$SimplePatternBasedHeaderMatcher : headerName=[timestamp] WILL NOT be mapped, matched pattern=!timestamp
2024-10-16T11:01:04.690+03:00 DEBUG 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Commit list: {}
2024-10-16T11:01:04.707+03:00 ERROR 22368 --- [elfund-processing] [ntainer#1-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Error handler threw an exception

org.springframework.kafka.KafkaException: Seek to current after exception
	at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:227) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.DefaultErrorHandler.handleRemaining(DefaultErrorHandler.java:168) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2836) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2713) ~[spring-kafka-3.2.4.jar:3.2.4]
	at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.4.jar:1.13.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.4.jar:3.2.4]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
	at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java) ~[na:na]
	at java.base/java.lang.Thread.run(Thread.java:842) ~[na:na]
Caused by: org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2873) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2814) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.4.jar:3.2.4]
	... 11 common frames omitted
Caused by: java.lang.IllegalStateException: No transaction is in process; possible solutions: run the template operation within the scope of a template.executeInTransaction() operation, start a transaction with @Transactional before invoking the template method, run in a transaction started by a listener container when consuming a record
	at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-6.1.13.jar:6.1.13]
	at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:938) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:793) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:609) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendReplyForMessageSource(MessagingMessageListenerAdapter.java:641) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendSingleResult(MessagingMessageListenerAdapter.java:608) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.sendResponse(MessagingMessageListenerAdapter.java:572) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.handleResult(MessagingMessageListenerAdapter.java:503) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invoke(MessagingMessageListenerAdapter.java:386) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:85) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50) ~[spring-kafka-3.2.4.jar:3.2.4]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.4.jar:3.2.4]
	... 13 common frames omitted
# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

2 participants