Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Make ConcurrentKafkaListenerContainerFactoryConfigurer Generic #44638

Conversation

lipiridi
Copy link

@lipiridi lipiridi commented Mar 7, 2025

Summary

This PR updates ConcurrentKafkaListenerContainerFactoryConfigurer to be generic, allowing it to configure any container factory instead of being limited to <Object, Object>.

Motivation

  • Previously, the class was restricted to <Object, Object>, which prevented reuse for other container factories with different key/value types. Making it generic improves flexibility and reusability across various Kafka container configurations.
  • This change also enables configuring programmatically created container factories using Kafka listener properties from application.yml, providing greater control and consistency in Kafka consumer setup.

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 7, 2025
…port configuring any container factory

Signed-off-by: Dimitrii Lipiridi <dimitrii.lipiridi@delasport.com>
@lipiridi lipiridi force-pushed the generic-kafka-container-configurer branch from 4ca1986 to 3e32cea Compare March 7, 2025 09:27
@wilkinsona wilkinsona changed the title Enhancement: Make ConcurrentKafkaListenerContainerFactoryConfigurer Generic Make ConcurrentKafkaListenerContainerFactoryConfigurer Generic Mar 7, 2025
@wilkinsona
Copy link
Member

When we're reviewing this we should keep in mind #40174 and #19221. Both issues are related to Kafka auto-configuration and generics.

@snicoll
Copy link
Member

snicoll commented Mar 7, 2025

Unfortunately, I am not sure that we should be making this change. The configurer is mostly for applying auto-configuration to custom container that are to be exposed for management of the ApplicationContext. I also wonder if the additional complexity is worth it given the primary goal of this class.

This PR doesn't introduce any tests and isn't actionable as such. Before you amend that, I'd like to better understand what you actually mean in the two motivations. Can you provide examples of both and show how the current class is relevant?

@snicoll snicoll added the status: waiting-for-feedback We need additional information before we can continue label Mar 7, 2025
@lipiridi
Copy link
Author

lipiridi commented Mar 7, 2025

@snicoll thanks for your response!
We are currently creating container factories manually, but we want to configure them using spring.kafka.listener properties in a more convenient way, instead of writing boilerplate code.
Another risk is that the listener properties might be extended in the future, and we could miss these changes, failing to update the property injection code accordingly.

Here is the a real example of usage I'm trying to achieve:

@Configuration
@RequiredArgsConstructor
public class KafkaConfiguration {

    private final KafkaProperties kafkaProperties;

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> buildFactory() {
        var buildProperties = this.kafkaProperties.buildConsumerProperties(null);

        DefaultKafkaConsumerFactory<String, Object> consumerFactory =
                new DefaultKafkaConsumerFactory<>(buildProperties, new StringDeserializer(), new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer()));

        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        ConcurrentKafkaListenerContainerFactoryConfigurer<String, GenericRecord> configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();
        configurer.setKafkaProperties(this.kafkaProperties);
        configurer.configure(factory, consumerFactory);

        return factory;
    }
}

@spring-projects-issues spring-projects-issues added status: feedback-provided Feedback has been provided and removed status: waiting-for-feedback We need additional information before we can continue labels Mar 7, 2025
Signed-off-by: Dimitrii Lipiridi <dimitrii.lipiridi@delasport.com>
@snicoll snicoll added the for: team-attention An issue we'd like other members of the team to review label Mar 12, 2025
@snicoll
Copy link
Member

snicoll commented Mar 17, 2025

@lipiridi we won't make the setters public, for sure as this is meant to be injected in user's code and they can't mutate what's been configured for them. Rather, they should apply the configurer and adapt whatever code is necessary.

I had a look to your sample, and we'll take it to the team. This part of the API always was a bit strange to me, perhaps @artembilan has an opinion about this change?

@artembilan
Copy link
Member

I think this is wrong:

 ConcurrentKafkaListenerContainerFactoryConfigurer<String, GenericRecord> configurer = new ConcurrentKafkaListenerContainerFactoryConfigurer<>();

There is no need in creating this object manually, rather do this:

 @Bean
    public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> buildFactory(ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        var buildProperties = this.kafkaProperties.buildConsumerProperties(null);

        DefaultKafkaConsumerFactory<String, Object> consumerFactory =
                new DefaultKafkaConsumerFactory<>(buildProperties, new StringDeserializer(), new ErrorHandlingDeserializer<>(new KafkaAvroDeserializer()));

        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        configurer.configure(factory, consumerFactory);

        return factory;
    }

I agree with @snicoll , that those generics for key/value on all the Apache Kafka API is painful and useless.

Tell us, please, why current ConcurrentKafkaListenerContainerFactoryConfigurer API is out of use for you?

@lipiridi
Copy link
Author

@artembilan Hello!
I cannot use it because the configure method expects <Object, Object> listener and consumer factories, whereas my factory is typified with GenericRecord as the value.
image

@artembilan
Copy link
Member

Hm. OK. My bad.
So, you have to cast it into expectations of that method.
However, let's see now how those generics on your container factory bean are useful afterwards?
Cannot it just be <Object, Object> as well?

@lipiridi
Copy link
Author

@artembilan the question is how is it supposed to cast the typified StringDeserializer to Object?
Anyway, I believe erasing types is not the way we want to write code.
image

I'm okay with not making all methods public, especially since I customize the configurer only by setting my own error handler, and the setCommonErrorHandler method is already public. I just thought it could be useful for many others.

@snicoll
Copy link
Member

snicoll commented Mar 18, 2025

It's unfortunate that the Kafka API is done this way, but I don't think this is the right place to fix the problem. Introducing the generics would complicate the code for no good reason.

The current situation is far from ideal but I'd rather open this against the Kafka project.

@snicoll snicoll closed this Mar 18, 2025
@snicoll snicoll added status: declined A suggestion or change that we don't feel we should currently apply and removed for: team-attention An issue we'd like other members of the team to review status: waiting-for-triage An issue we've not yet triaged status: feedback-provided Feedback has been provided labels Mar 18, 2025
@nosan
Copy link
Contributor

nosan commented Mar 18, 2025

the question is how is it supposed to cast the typified StringDeserializer to Object?

as a workaround, you can use the following:

	@SuppressWarnings({ "unchecked", "rawtypes" })
	private ConcurrentKafkaListenerContainerFactory<Object, Object> buildFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
		Map<String, Object> kafkaProperties = this.properties.buildConsumerProperties();
		ErrorHandlingDeserializer<Object> avroDeserializer = new ErrorHandlingDeserializer<>(
				new KafkaAvroDeserializer());
		ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
		configurer.configure(factory,
				new DefaultKafkaConsumerFactory(kafkaProperties, new StringDeserializer(), avroDeserializer));
		return factory;
	}
	

@artembilan
Copy link
Member

You can do just like this instead:

	@Bean
	public ConcurrentKafkaListenerContainerFactory<Object, Object> buildFactory(
			ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
		
		var buildProperties = this.kafkaProperties.buildConsumerProperties(null);

		DefaultKafkaConsumerFactory<Object, Object> consumerFactory =
				new DefaultKafkaConsumerFactory<>(buildProperties, null, new JsonDeserializer<>());

		ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
				new ConcurrentKafkaListenerContainerFactory<>();

		configurer.configure(factory, consumerFactory);

		return factory;
	}

That StringDeserializer is a default value for the ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG.

I believe erasing types is not the way we want to write code.

I understand your pain and really don't see reason in those generics on the ListenerContainerFactory and ConsumerFactory.
Same applies for the producer side.
We really even don't need those generics in Kafka Client itself.
But the cat is out of the bag and we have to live with whatever is there so far.

Perhaps with the current 4.0.0 generation in the work we may reconsider these generics in Spring Kafka and remove them altogether to avoid confusion like this.
It would be huge effort for Spring Kafka itself.
Then this Spring Boot auto-configuration has to be adjusted as well.
And so on for all the projects using Spring Kafka.
I cannot predict all the impact such a breaking change would cause.
Plus right now I don't see an easy migration path from this generics-aware API.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
status: declined A suggestion or change that we don't feel we should currently apply
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants