-
Notifications
You must be signed in to change notification settings - Fork 152
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add Spring Cloud Event Externalization support
Signed-off-by: Ivan Garcia Sainz-Aja <ivangsa@gmail.com>
- Loading branch information
Showing
27 changed files
with
3,959 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
132 changes: 132 additions & 0 deletions
132
spring-modulith-events/spring-modulith-events-scs/README.md
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,132 @@ | ||
# Spring-Modulith Events Externalizer for Spring Cloud Stream | ||
|
||
[](https://search.maven.org/artifact/io.zenwave360.sdk/spring-modulith-events-scs) | ||
[](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml) | ||
[](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml) | ||
[](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/actions/workflows/build.yml) | ||
[](https://github.com/ZenWave360/spring-modulith-events-spring-cloud-stream/blob/main/LICENSE) | ||
|
||
Spring-Modulith Events Externalizer that uses Spring Cloud Stream supporting both JSON and Avro serialization formats. | ||
|
||
## Getting Started | ||
|
||
### Dependency | ||
Add the following Maven dependency to your project: | ||
|
||
```xml | ||
<dependency> | ||
<groupId>io.zenwave360.sdk</groupId> | ||
<artifactId>spring-modulith-events-scs</artifactId> | ||
<version>${spring-modulith-events-scs.version}</version> | ||
</dependency> | ||
``` | ||
|
||
### Configuration | ||
Use `@EnableSpringCloudStreamEventExternalization` annotation to enable Spring Cloud Stream event externalization in your Spring configuration: | ||
|
||
```java | ||
@Configuration | ||
@EnableSpringCloudStreamEventExternalization | ||
public class SpringCloudStreamEventsConfig { | ||
// Additional configurations (if needed) | ||
} | ||
``` | ||
|
||
This configuration ensures that, in addition to events annotated with `@Externalized`, all events of type `org.springframework.messaging.Message` with a header named `SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_EVENT_HEADER` will be externalized and routed to their specified destination using the value of this header as the routing target. | ||
|
||
--- | ||
|
||
## Event Serialization | ||
|
||
Using the transactional event publication log requires serializing events to a format that can be stored in a database. Since the generic type of `Message<?>` payload is lost when using the default `JacksonEventSerializer`, this library adds an extra `_class` field to preserve payload type information, allowing for complete deserialization to its original type. | ||
|
||
This library provides support for POJO (JSON) and Avro serialization formats for `Message<?>` payloads. | ||
|
||
### Avro Serialization | ||
|
||
Avro serialization needs `com.fasterxml.jackson.dataformat.avro.AvroMapper` class present in the classpath. In order to use Avro serialization, you need to add the following dependency to your project: | ||
|
||
```xml | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-avro</artifactId> | ||
</dependency> | ||
``` | ||
|
||
--- | ||
|
||
## Routing Events | ||
|
||
### Programmatic Routing for `Message<?`> events | ||
|
||
You can define routing targets programmatically using a Message header: | ||
|
||
```java | ||
public class CustomerEventsProducer implements ICustomerEventsProducer { | ||
|
||
private final ApplicationEventPublisher applicationEventPublisher; | ||
|
||
public void onCustomerCreated(CustomerCreated event) { | ||
Message<CustomerCreated> message = MessageBuilder.withPayload(event) | ||
.setHeader( | ||
SpringCloudStreamEventExternalizer.SPRING_CLOUD_STREAM_SENDTO_DESTINATION_HEADER, | ||
"customer-created") // <- target binding name | ||
.build(); | ||
applicationEventPublisher.publishEvent(message); | ||
} | ||
} | ||
``` | ||
|
||
### Annotation-Based Routing for POJO Events | ||
|
||
Leverage the `@Externalized` annotation to define the target binding name and routing key: | ||
|
||
```java | ||
@Externalized("customer-created::#{#this.getLastname()}") | ||
class CustomerCreated { | ||
|
||
public String getLastname() { | ||
// Return the customer's last name | ||
} | ||
} | ||
``` | ||
|
||
### Configure Spring Cloud Stream destination | ||
|
||
Configure Spring Cloud Stream destination for your bindings as usual in `application.yml`: | ||
|
||
```yaml | ||
spring: | ||
cloud: | ||
stream: | ||
bindings: | ||
customer-created: | ||
destination: customer-created-topic | ||
``` | ||
### Routing Key | ||
`SpringCloudStreamEventExternalizer` dynamically sets the appropriate Message header (e.g., `kafka_messageKey` or `rabbit_routingKey`) from your routing key based on the channel binder type, if the routing header is not already present. | ||
|
||
- KafkaMessageChannelBinder: `kafka_messageKey` | ||
- RabbitMessageChannelBinder: `rabbit_routingKey` | ||
- KinesisMessageChannelBinder: `partitionKey` | ||
- PubSubMessageChannelBinder: `pubsub_orderingKey` | ||
- EventHubsMessageChannelBinder: `partitionKey` | ||
- SolaceMessageChannelBinder: `solace_messageKey` | ||
- PulsarMessageChannelBinder: `pulsar_key` | ||
|
||
--- | ||
|
||
## Using Snapshot Versions | ||
In order to test snapshot versions of this library, add the following repository to your Maven configuration: | ||
|
||
```xml | ||
<repository> | ||
<id>gh</id> | ||
<url>https://raw.githubusercontent.com/ZenWave360/maven-snapshots/refs/heads/main</url> | ||
<snapshots> | ||
<enabled>true</enabled> | ||
</snapshots> | ||
</repository> | ||
``` |
143 changes: 143 additions & 0 deletions
143
spring-modulith-events/spring-modulith-events-scs/pom.xml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,143 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
|
||
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<parent> | ||
<groupId>org.springframework.modulith</groupId> | ||
<artifactId>spring-modulith-events</artifactId> | ||
<version>1.4.0-SNAPSHOT</version> | ||
</parent> | ||
|
||
<name>Spring Modulith - Events - Spring Cloud Stream support</name> | ||
<artifactId>spring-modulith-events-scs</artifactId> | ||
|
||
<properties> | ||
<module.name>org.springframework.modulith.events.scs</module.name> | ||
|
||
<!-- integration testing versions --> | ||
<spring-boot.version>3.4.0</spring-boot.version> | ||
<spring-cloud.version>2024.0.0</spring-cloud.version> | ||
<spring-cloud-stream-schema.version>2.2.1.RELEASE</spring-cloud-stream-schema.version> | ||
<avro.version>1.11.4</avro.version> | ||
|
||
<maven-surefire-plugin.version>3.0.0-M5</maven-surefire-plugin.version> | ||
</properties> | ||
|
||
<developers> | ||
<developer> | ||
<name>Ivan Garcia Sainz-Aja</name> | ||
<email>ivangsa@gmail.com</email> | ||
<organization>ZenWave360</organization> | ||
<organizationUrl>https://github.com/ZenWave360</organizationUrl> | ||
</developer> | ||
</developers> | ||
|
||
<dependencyManagement> | ||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-dependencies</artifactId> | ||
<version>${spring-boot.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-dependencies</artifactId> | ||
<version>${spring-cloud.version}</version> | ||
<type>pom</type> | ||
<scope>import</scope> | ||
</dependency> | ||
</dependencies> | ||
</dependencyManagement> | ||
|
||
<dependencies> | ||
<dependency> | ||
<groupId>org.springframework.modulith</groupId> | ||
<artifactId>spring-modulith-api</artifactId> | ||
<version>${project.parent.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.modulith</groupId> | ||
<artifactId>spring-modulith-events-core</artifactId> | ||
<version>${project.parent.version}</version> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-stream</artifactId> | ||
</dependency> | ||
|
||
<!-- optional --> | ||
<dependency> | ||
<groupId>org.apache.avro</groupId> | ||
<artifactId>avro</artifactId> | ||
<version>${avro.version}</version> | ||
<optional>true</optional> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.dataformat</groupId> | ||
<artifactId>jackson-dataformat-avro</artifactId> | ||
<optional>true</optional> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.fasterxml.jackson.core</groupId> | ||
<artifactId>jackson-databind</artifactId> | ||
<optional>true</optional> | ||
</dependency> | ||
|
||
<!-- Testing --> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-test</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.boot</groupId> | ||
<artifactId>spring-boot-starter-jdbc</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>com.h2database</groupId> | ||
<artifactId>h2</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.springframework.cloud</groupId> | ||
<artifactId>spring-cloud-starter-stream-kafka</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.springframework.kafka</groupId> | ||
<artifactId>spring-kafka-test</artifactId> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.springframework.modulith</groupId> | ||
<artifactId>spring-modulith-starter-jdbc</artifactId> | ||
<version>${parent.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
</dependencies> | ||
|
||
<build> | ||
<plugins> | ||
<plugin> | ||
<groupId>org.apache.maven.plugins</groupId> | ||
<artifactId>maven-surefire-plugin</artifactId> | ||
<version>${maven-surefire-plugin.version}</version> | ||
<configuration> | ||
<!-- Force alphabetical order to have a reproducible build --> | ||
<runOrder>alphabetical</runOrder> | ||
<excludes> | ||
<exclude>**/*IT*</exclude> | ||
<exclude>**/*IntTest*</exclude> | ||
</excludes> | ||
</configuration> | ||
</plugin> | ||
</plugins> | ||
</build> | ||
|
||
</project> |
30 changes: 30 additions & 0 deletions
30
...events-scs/src/main/java/org/springframework/modulith/events/scs/AvroEventSerializer.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
package org.springframework.modulith.events.scs; | ||
|
||
import com.fasterxml.jackson.databind.ObjectMapper; | ||
import com.fasterxml.jackson.databind.node.ObjectNode; | ||
import com.fasterxml.jackson.dataformat.avro.AvroMapper; | ||
import org.springframework.modulith.events.core.EventSerializer; | ||
|
||
import java.util.Map; | ||
|
||
public class AvroEventSerializer extends MessageEventSerializer implements EventSerializer { | ||
|
||
private AvroMapper avroMapper; | ||
|
||
public AvroEventSerializer(ObjectMapper jacksonMapper) { | ||
super(jacksonMapper); | ||
this.avroMapper = AvroMapper.builder().build(); | ||
} | ||
|
||
public AvroEventSerializer(AvroMapper avroMapper, ObjectMapper jacksonMapper) { | ||
super(jacksonMapper); | ||
this.avroMapper = avroMapper; | ||
} | ||
|
||
protected Map<String, Object> serializeToMap(Object payload) { | ||
ObjectNode objectNode = avroMapper.valueToTree(payload); | ||
objectNode.remove("specificData"); // TODO: remove this recursively | ||
return avroMapper.convertValue(objectNode, Map.class); | ||
} | ||
|
||
} |
Oops, something went wrong.