diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java index ac49dd7af4a6..51fff672c263 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java @@ -44,9 +44,10 @@ * @author Eddú Meléndez * @author Thomas Kåsene * @author Moritz Halbritter + * @author Dimitrii Lipiridi * @since 1.5.0 */ -public class ConcurrentKafkaListenerContainerFactoryConfigurer { +public class ConcurrentKafkaListenerContainerFactoryConfigurer { private KafkaProperties properties; @@ -54,21 +55,21 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { private RecordMessageConverter recordMessageConverter; - private RecordFilterStrategy recordFilterStrategy; + private RecordFilterStrategy recordFilterStrategy; - private KafkaTemplate replyTemplate; + private KafkaTemplate replyTemplate; - private KafkaAwareTransactionManager transactionManager; + private KafkaAwareTransactionManager transactionManager; private ConsumerAwareRebalanceListener rebalanceListener; private CommonErrorHandler commonErrorHandler; - private AfterRollbackProcessor afterRollbackProcessor; + private AfterRollbackProcessor afterRollbackProcessor; - private RecordInterceptor recordInterceptor; + private RecordInterceptor recordInterceptor; - private BatchInterceptor batchInterceptor; + private BatchInterceptor batchInterceptor; private Function threadNameSupplier; @@ -78,7 +79,7 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer { * Set the {@link KafkaProperties} to use. * @param properties the properties */ - void setKafkaProperties(KafkaProperties properties) { + public void setKafkaProperties(KafkaProperties properties) { this.properties = properties; } @@ -86,7 +87,7 @@ void setKafkaProperties(KafkaProperties properties) { * Set the {@link BatchMessageConverter} to use. * @param batchMessageConverter the message converter */ - void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) { + public void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) { this.batchMessageConverter = batchMessageConverter; } @@ -94,7 +95,7 @@ void setBatchMessageConverter(BatchMessageConverter batchMessageConverter) { * Set the {@link RecordMessageConverter} to use. * @param recordMessageConverter the message converter */ - void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) { + public void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) { this.recordMessageConverter = recordMessageConverter; } @@ -102,7 +103,7 @@ void setRecordMessageConverter(RecordMessageConverter recordMessageConverter) { * Set the {@link RecordFilterStrategy} to use to filter incoming records. * @param recordFilterStrategy the record filter strategy */ - void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { + public void setRecordFilterStrategy(RecordFilterStrategy recordFilterStrategy) { this.recordFilterStrategy = recordFilterStrategy; } @@ -110,7 +111,7 @@ void setRecordFilterStrategy(RecordFilterStrategy recordFilterSt * Set the {@link KafkaTemplate} to use to send replies. * @param replyTemplate the reply template */ - void setReplyTemplate(KafkaTemplate replyTemplate) { + public void setReplyTemplate(KafkaTemplate replyTemplate) { this.replyTemplate = replyTemplate; } @@ -118,7 +119,7 @@ void setReplyTemplate(KafkaTemplate replyTemplate) { * Set the {@link KafkaAwareTransactionManager} to use. * @param transactionManager the transaction manager */ - void setTransactionManager(KafkaAwareTransactionManager transactionManager) { + public void setTransactionManager(KafkaAwareTransactionManager transactionManager) { this.transactionManager = transactionManager; } @@ -127,7 +128,7 @@ void setTransactionManager(KafkaAwareTransactionManager transact * @param rebalanceListener the rebalance listener. * @since 2.2 */ - void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) { + public void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) { this.rebalanceListener = rebalanceListener; } @@ -144,7 +145,7 @@ public void setCommonErrorHandler(CommonErrorHandler commonErrorHandler) { * Set the {@link AfterRollbackProcessor} to use. * @param afterRollbackProcessor the after rollback processor */ - void setAfterRollbackProcessor(AfterRollbackProcessor afterRollbackProcessor) { + public void setAfterRollbackProcessor(AfterRollbackProcessor afterRollbackProcessor) { this.afterRollbackProcessor = afterRollbackProcessor; } @@ -152,7 +153,7 @@ void setAfterRollbackProcessor(AfterRollbackProcessor afterRollb * Set the {@link RecordInterceptor} to use. * @param recordInterceptor the record interceptor. */ - void setRecordInterceptor(RecordInterceptor recordInterceptor) { + public void setRecordInterceptor(RecordInterceptor recordInterceptor) { this.recordInterceptor = recordInterceptor; } @@ -160,7 +161,7 @@ void setRecordInterceptor(RecordInterceptor recordInterceptor) { * Set the {@link BatchInterceptor} to use. * @param batchInterceptor the batch interceptor. */ - void setBatchInterceptor(BatchInterceptor batchInterceptor) { + public void setBatchInterceptor(BatchInterceptor batchInterceptor) { this.batchInterceptor = batchInterceptor; } @@ -168,7 +169,7 @@ void setBatchInterceptor(BatchInterceptor batchInterceptor) { * Set the thread name supplier to use. * @param threadNameSupplier the thread name supplier to use */ - void setThreadNameSupplier(Function threadNameSupplier) { + public void setThreadNameSupplier(Function threadNameSupplier) { this.threadNameSupplier = threadNameSupplier; } @@ -176,7 +177,7 @@ void setThreadNameSupplier(Function threadName * Set the executor for threads that poll the consumer. * @param listenerTaskExecutor task executor */ - void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) { + public void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) { this.listenerTaskExecutor = listenerTaskExecutor; } @@ -187,14 +188,14 @@ void setListenerTaskExecutor(SimpleAsyncTaskExecutor listenerTaskExecutor) { * to configure * @param consumerFactory the {@link ConsumerFactory} to use */ - public void configure(ConcurrentKafkaListenerContainerFactory listenerFactory, - ConsumerFactory consumerFactory) { + public void configure(ConcurrentKafkaListenerContainerFactory listenerFactory, + ConsumerFactory consumerFactory) { listenerFactory.setConsumerFactory(consumerFactory); configureListenerFactory(listenerFactory); configureContainer(listenerFactory.getContainerProperties()); } - private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory factory) { + private void configureListenerFactory(ConcurrentKafkaListenerContainerFactory factory) { PropertyMapper map = PropertyMapper.get().alwaysApplyingWhenNonNull(); Listener properties = this.properties.getListener(); map.from(properties::getConcurrency).to(factory::setConcurrency); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java index 94a746901d35..33e19a55915e 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java @@ -114,23 +114,23 @@ class KafkaAnnotationDrivenConfiguration { @Bean @ConditionalOnMissingBean @ConditionalOnThreading(Threading.PLATFORM) - ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { + ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() { return configurer(); } @Bean(name = "kafkaListenerContainerFactoryConfigurer") @ConditionalOnMissingBean @ConditionalOnThreading(Threading.VIRTUAL) - ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() { - ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer(); + ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurerVirtualThreads() { + ConcurrentKafkaListenerContainerFactoryConfigurer configurer = configurer(); SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor("kafka-"); executor.setVirtualThreads(true); configurer.setListenerTaskExecutor(executor); return configurer; } - private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { - ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); + private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { + ConcurrentKafkaListenerContainerFactoryConfigurer configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>(); configurer.setKafkaProperties(this.properties); configurer.setBatchMessageConverter(this.batchMessageConverter); configurer.setRecordMessageConverter(this.recordMessageConverter); @@ -149,7 +149,7 @@ private ConcurrentKafkaListenerContainerFactoryConfigurer configurer() { @Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory( - ConcurrentKafkaListenerContainerFactoryConfigurer configurer, + ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ObjectProvider> kafkaConsumerFactory, ObjectProvider>> kafkaContainerCustomizer) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); diff --git a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java index 42d9c1a90370..8e559b192d28 100644 --- a/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java +++ b/spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurerTests.java @@ -39,7 +39,7 @@ */ class ConcurrentKafkaListenerContainerFactoryConfigurerTests { - private ConcurrentKafkaListenerContainerFactoryConfigurer configurer; + private ConcurrentKafkaListenerContainerFactoryConfigurer configurer; private ConcurrentKafkaListenerContainerFactory factory; @@ -50,7 +50,7 @@ class ConcurrentKafkaListenerContainerFactoryConfigurerTests { @BeforeEach @SuppressWarnings("unchecked") void setUp() { - this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer(); + this.configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>(); this.properties = new KafkaProperties(); this.configurer.setKafkaProperties(this.properties); this.factory = spy(new ConcurrentKafkaListenerContainerFactory<>());