Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add a way to pass objects as values for @KafkaListerner properties #3580

Open
geertvb opened this issue Oct 21, 2024 · 3 comments
Open

Add a way to pass objects as values for @KafkaListerner properties #3580

geertvb opened this issue Oct 21, 2024 · 3 comments

Comments

@geertvb
Copy link

geertvb commented Oct 21, 2024

Expected Behavior

Have a way to pass objects or bean references as values for @KafkaListener properties (KafkaConsumer properties).

For example to use the bean reference #{__listener.filterPredicate} as a configuration property for a kafka-client ConsumerInterceptor. See also Wiring Spring Beans into Producer/Consumer Interceptors
https://docs.spring.io/spring-kafka/reference/kafka/interceptors.html

@Slf4j
@Component
@RequiredArgsConstructor
public class MessageListener {

    @Getter
    private final Predicate<ConsumerRecord<?, ?>> filterPredicate;

    @KafkaListener(
        topics = "events",
        properties = {
            "interceptor.classes=org.example.FilterInterceptor",
            "filter.predicate=#{__listener.filterPredicate}"})
    public void handleEvent(ConsumerRecord<String, String> record) {
        log.info("Handling record: {}", record.value());
    }

}

Current Behavior

KafkaListener properties only accept string values. See javadoc SpEL expressions must resolve to a String, a @{link String[]} or a Collection<String>

Attempting to use a bean reference will result in an exception with the message.
No converter found capable of converting from type [eu.europa.ec.test.kafka.FilterRecordPredicate] to type [java.lang.String]

Context

Currently we use either a consumer factory customizer (In case it applies to all consumers) or custom consumer factory beans (if it is consumer specific). Especially in the latter case we prefer to use the default spring boot consumer factory and annotations on the kafka listener.

    @Override
    public void customize(DefaultKafkaConsumerFactory<?, ?> consumerFactory) {
        consumerFactory.updateConfigs(Map.of(
                    INTERCEPTOR_CLASSES_CONFIG, FilterRecordInterceptor.class.getName(),
                    "filter.predicate", filterPredicate));
    }
@Ravikiran26
Copy link

Instead of directly passing the bean reference in the @KafkaListener properties, you can define a custom property resolver that maps a string property to the desired bean.
@slf4j
@component
@requiredargsconstructor
public class MessageListener {

private final Predicate<ConsumerRecord<?, ?>> filterPredicate;

@KafkaListener(
    topics = "events",
    properties = {
        "interceptor.classes=org.example.FilterInterceptor",
        "filter.predicate=customFilterPredicate"})
public void handleEvent(ConsumerRecord<String, String> record) {
    log.info("Handling record: {}", record.value());
}

@Bean
public KafkaListenerConfigurer kafkaListenerConfigurer() {
    return registrar -> {
        registrar.setPropertyMapper(propertyKey -> {
            if ("filter.predicate".equals(propertyKey)) {
                // Resolve the property to the bean reference or object
                return filterPredicate;
            }
            return null; // Let default behavior handle other properties
        });
    };
}

}

@geertvb
Copy link
Author

geertvb commented Jan 21, 2025

Hi @Ravikiran26, thank you for you answer. I am aware that there are workarounds like this.
I was looking for a simple generic solution that allows to pass any bean for any property, without having to add code.

@artembilan
Copy link
Member

Let me try to summarize the concern!
You have a custom ConsumerInterceptor which supposed to resolve that custom filter.predicate property into a bean from the application context.
That interceptor.classes is resolved by KafkaConsumer without any knowledge about Spring dependency injection.
So, whatever could be reached from its configure(Map<String, ?> configs) method implementation has to be already resolved as an object.
Therefore your "filter.predicate", filterPredicate approach as a customize() implementation is the way to go. At least for now.

From here I believe your request is to improve that KafkaListenerAnnotationBeanPostProcessor.resolveKafkaProperties() algorithm to deal with any object types for the target ConsumerConfig.
Unfortunately, that is not possible since we deal there with a Properties contract where key-values pairs are strings.

And even if we can use a ContainerCustomizer to mutate some specific container instance (especially its getContainerProperties()), there is still a Properties contract for its getKafkaConsumerProperties():

	/**
	 * Get the consumer properties that will be merged with the consumer properties
	 * provided by the consumer factory; properties here will supersede any with the same
	 * name(s) in the consumer factory. You can add non-String-valued properties, but the
	 * property name (hashtable key) must be String; all others will be ignored.
	 * {@code group.id} and {@code client.id} are ignored.
	 * @return the properties.
	 * @see org.apache.kafka.clients.consumer.ConsumerConfig
	 * @see #setGroupId(String)
	 * @see #setClientId(String)
	 */
	public Properties getKafkaConsumerProperties() {

Pay attention to the all others will be ignored. sentence.
And that is indeed a case in our current logic:

		Set<String> stringPropertyNames = propertyOverrides.stringPropertyNames();
		// User might have provided properties as defaults
		stringPropertyNames.forEach((name) -> {
			if (!props.contains(name)) {
				props.setProperty(name, propertyOverrides.getProperty(name));
			}
		});

where:

    public String getProperty(String key) {
        Object oval = map.get(key);
        String sval = (oval instanceof String) ? (String)oval : null;
        Properties defaults;
        return ((sval == null) && ((defaults = this.defaults) != null)) ? defaults.getProperty(key) : sval;
    }

So, if value is not a String, it is returned as null.

Might be the case that we have to revise the logic from Properties to just Map<String, Object> as it is for all the Apache Kafka client configs.

Please, confirm that we are on the same page.

# for free to join this conversation on GitHub. Already have an account? # to comment
Projects
None yet
Development

No branches or pull requests

4 participants