Skip to content

Commit c25aa62

Browse files
committed
Add Kafka AutoConfiguration
Change Common Props to Hyphenated (asciidoc) Only Support HIGH and a Few Other Props
1 parent 6dd8415 commit c25aa62

File tree

13 files changed

+1138
-2
lines changed

13 files changed

+1138
-2
lines changed

spring-boot-autoconfigure/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,11 @@
472472
<artifactId>spring-rabbit</artifactId>
473473
<optional>true</optional>
474474
</dependency>
475+
<dependency>
476+
<groupId>org.springframework.kafka</groupId>
477+
<artifactId>spring-kafka</artifactId>
478+
<optional>true</optional>
479+
</dependency>
475480
<dependency>
476481
<groupId>org.springframework.cloud</groupId>
477482
<artifactId>spring-cloud-spring-service-connector</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
20+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
21+
import org.springframework.context.annotation.Bean;
22+
import org.springframework.context.annotation.Configuration;
23+
import org.springframework.kafka.annotation.EnableKafka;
24+
import org.springframework.kafka.config.KafkaListenerConfigUtils;
25+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
26+
import org.springframework.kafka.config.SimpleKafkaListenerContainerFactory;
27+
import org.springframework.kafka.core.ConsumerFactory;
28+
29+
/**
30+
* Adds {@link EnableKafka} if present on the classpath.
31+
*
32+
* @author Gary Russell
33+
* @since 1.4
34+
*
35+
*/
36+
@Configuration
37+
@ConditionalOnClass(EnableKafka.class)
38+
class KafkaAnnotationDrivenConfiguration {
39+
40+
private final KafkaProperties properties;
41+
42+
KafkaAnnotationDrivenConfiguration(KafkaProperties properties) {
43+
this.properties = properties;
44+
}
45+
46+
@Bean
47+
@ConditionalOnMissingBean
48+
public SimpleKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
49+
SimpleKafkaListenerContainerFactoryConfigurer configurer = new SimpleKafkaListenerContainerFactoryConfigurer();
50+
configurer.setKafkaProperties(this.properties);
51+
return configurer;
52+
}
53+
54+
@Bean
55+
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
56+
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(
57+
SimpleKafkaListenerContainerFactoryConfigurer configurer,
58+
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
59+
SimpleKafkaListenerContainerFactory<Object, Object> factory =
60+
new SimpleKafkaListenerContainerFactory<Object, Object>();
61+
configurer.configure(factory, kafkaConsumerFactory);
62+
return factory;
63+
}
64+
65+
66+
@EnableKafka
67+
@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
68+
protected static class EnableKafkaConfiguration {
69+
70+
}
71+
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Copyright 2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.boot.autoconfigure.kafka;
18+
19+
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
20+
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
21+
import org.springframework.boot.context.properties.EnableConfigurationProperties;
22+
import org.springframework.context.annotation.Bean;
23+
import org.springframework.context.annotation.Configuration;
24+
import org.springframework.context.annotation.Import;
25+
import org.springframework.kafka.core.ConsumerFactory;
26+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
27+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
28+
import org.springframework.kafka.core.KafkaTemplate;
29+
import org.springframework.kafka.core.ProducerFactory;
30+
import org.springframework.kafka.support.LoggingProducerListener;
31+
import org.springframework.kafka.support.ProducerListener;
32+
33+
/**
34+
* Auto-configuration for Spring for Apache Kafka.
35+
*
36+
* @author Gary Russell
37+
* @since 1.4
38+
*
39+
*/
40+
@Configuration
41+
@ConditionalOnClass(KafkaTemplate.class)
42+
@EnableConfigurationProperties(KafkaProperties.class)
43+
@Import(KafkaAnnotationDrivenConfiguration.class)
44+
public class KafkaAutoConfiguration {
45+
46+
private final KafkaProperties properties;
47+
48+
public KafkaAutoConfiguration(KafkaProperties properties) {
49+
this.properties = properties;
50+
}
51+
52+
@Bean
53+
@ConditionalOnMissingBean(KafkaTemplate.class)
54+
public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory,
55+
ProducerListener<Object, Object> kafkaProducerListener) {
56+
KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate<Object, Object>(kafkaProducerFactory);
57+
kafkaTemplate.setProducerListener(kafkaProducerListener);
58+
kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
59+
return kafkaTemplate;
60+
}
61+
62+
@Bean
63+
@ConditionalOnMissingBean(ProducerListener.class)
64+
public ProducerListener<Object, Object> kafkaProducerListener() {
65+
return new LoggingProducerListener<Object, Object>();
66+
}
67+
68+
@Configuration
69+
protected static class ConnectionConfig {
70+
71+
@Bean
72+
@ConditionalOnMissingBean(ConsumerFactory.class)
73+
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
74+
return new DefaultKafkaConsumerFactory<Object, Object>(properties.buildConsumerProperties());
75+
}
76+
77+
@Bean
78+
@ConditionalOnMissingBean(ProducerFactory.class)
79+
public ProducerFactory<?, ?> kafkaProducerFactory(KafkaProperties properties) {
80+
return new DefaultKafkaProducerFactory<Object, Object>(properties.buildProducerProperties());
81+
}
82+
83+
}
84+
85+
}

0 commit comments

Comments
 (0)