Event-driven architecture is a software architecture paradigm promoting the production, detection, consumption of, and reaction to events. It was created to help developers have a decoupled and responsive application. Because of this, it has been widely used in applications that have been broken down from monoliths to microservices.
Spring Cloud Stream improves your productivity when working with Apache Kafka, RabbitMQ, Azure Event Hub, and more, providing three key abstractions to simplify your code.
Spring Cloud Function enables you to write functions once and run them anywhere (AWS, Azure, etc.), while continuing to use all the familiar and comprehensive Spring APIs. You can chain multiple functions together to create new capabilities
The main goal of this repository is to demonstrate how to create an event driven system which is simplified by using Spring Cloud Stream/Function. In particular, we will be using Spring Cloud Stream for Apache Kafka Binder. Please visit my previous tutorial to learn the basic of producer-consumer using Apache Kafka with Spring.
Note: We will be using Kafdrop which is a Web UI for viewing Kafka topics and browsing consumer groups. The tool displays information such as brokers, topics, partitions, consumers, and lets you view messages.
- Make sure to install Docker on your machine
- Go to the root directory of the project where docker-compose.yml is located.
- Run the docker compose by
docker-compose up
Note: Make sure no errors are present on the logs such connection refused etc. Now go to your browser and access http://localhost:9000/ to see an overview of your Kafka cluster, brokers and topics
To use Apache Kafka binder, we need to add below dependency to our application:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
The Apache Kafka Binder implementation maps each destination to an Apache Kafka topic. The consumer group maps directly to the same Apache Kafka concept. Partitioning also maps directly to Apache Kafka partitions as well.
If you are developing on your local setup, there will be no additional steps to setup the Kafka Configuration Options as long as your Apache Kafka properties are the defaults used by Spring Cloud Streams for Apache Kafka Binder.
Note: The docker-compose.yml ensures that the default configurations used by Spring Cloud Stream for Apache Kafka Binder are already configured such as port 9092 etc.
Our main class exposes three bean definitions:
@SpringBootApplication
@Log4j2
public class SpringcloudfuncApplication {
public static void main(String[] args) {
SpringApplication.run(SpringcloudfuncApplication.class, args);
}
@Bean
public Function<Message<String>, String> greeter() {
return (input) -> {
log.info("Hello {}", input.getPayload());
return "Hello " + input.getPayload();
};
}
@Bean
public CreateAccount createAccount() {
return new CreateAccount();
}
@Bean
public SendEmail sendEmail() {
return new SendEmail();
}
}
Hence, if we run the application - Spring must be able to identify what functions we intend to run and how do we want them to be composed of. For this tutorial, we will have the following configuration on our application.yml file:
spring:
cloud:
function:
definition: greeter;createAccount|sendEmail
Based from this, we have two callable functions:
- The greeter function which accepts a String message from Kafka topic greeter-in-0 and outputs the result to Kafka topic greeter-out-0.
- The createAccount|sendEmail function composition wherein the createAccount function accepts a JSON message from Kafka topic createAccountsendEmail-in-0 then provide its result as an input to sendEmail function which then outputs the result to Kafka topic createAccountsendEmail-out-0.
Note: The Kafka topics I have mentioned above will all be generated automatically by Spring during startup. It uses the function name with in-out tagging during discovery.
- Download STS version 3.4.* (or better) from the Spring website. STS is a free Eclipse bundle with many features useful for Spring developers.
- Right-click on the project or the main application class then select "Run As" > "Spring Boot App"
We will now test our application by attempting to produce messages on each topic and check the results via Kafdrop Web or via server console (since we enabled logging).
Note: We will use the docker interactive shell by connecting to the container running the Kafka image.
- Run docker ps command and take note of the container ID running the Apache Kafka image
- Run below command to connect to docker interactive shell.
winpty docker exec -it <containerId> bash
I have used winpty above since my local setup is running on Windows. If you are running on Linux or MacOS then remove the winpty and just run below command:
docker exec -it <containerId> bash
- Create/produce a String message to the target topic greeter-in-0
kafka-console-producer --broker-list localhost:19092 --topic greeter-in-0
- On the line > Enter your desired String message (e.g., Jun King Minon)
- Check your console, one should see someting like this:
2021-03-01 18:31:30.115 INFO 22048 --- [container-0-C-1] c.j.s.s.SpringcloudfuncApplication : Hello Jun King Minon
- Or you can go to Kafdrop URL (http://localhost:9000/) and click on its output topic greeter-out-0 > View Messages then on the filter section click View Messages. One should see something similar to below:
Offset: 1 Key: empty Timestamp: 2021-03-01 10:31:27.941 Headers: empty
Jun King Minon
- Run docker ps command and take note of the container ID running the Apache Kafka image
- Run below command to connect to docker interactive shell.
winpty docker exec -it <containerId> bash
I have used winpty above since my local setup is running on Windows. If you are running on Linux or MacOS then remove the winpty and just run below command:
docker exec -it <containerId> bash
- Create/produce a JSON message to the target topic createAccountsendEmail-in-0
kafka-console-producer --broker-list localhost:19092 --topic createAccountsendEmail-in-0 --property value.serializer=custom.class.serialization.JsonSerializer
- On the line > Enter your desired JSON message (e.g., { "name": "Jun King Minon", "balance": 12000} )
- Check your console, one should see someting like this:
2021-03-01 18:37:34.259 INFO 22048 --- [container-0-C-1] c.j.s.s.functions.CreateAccount : Creating account with payload: Account(id=null, name=Jun King Minon, balance=12000)
2021-03-01 18:37:34.260 INFO 22048 --- [container-0-C-1] c.j.s.s.functions.SendEmail : Sending email to account: Account(id=1, name=Jun King Minon, balance=12000)
- Or you can go to Kafdrop URL (http://localhost:9000/) and click on the topic createAccountsendEmail-out-0 > View Messages then on the filter section click View Messages. One should see something similar to below:
Offset: 1 Key: empty Timestamp: 2021-03-01 10:37:34.261 Headers: contentType: application/json, spring_json_header_types: {"contentType":"java.lang.String","target-protocol":"java.lang.String"}, target-protocol: kafka
Hi Jun King Minon! Thank you for opening a new account with us amounting to 12000. Please click below link to confirm your online banking account. Cheers!
Contact me at junbetterway
Happy coding!!!