Skip to content

Commit

Permalink
Merge pull request #24 from flazarus1A/feature/Expose-config-data
Browse files Browse the repository at this point in the history
feat(config): open topology config capabilities
  • Loading branch information
flazarus1A authored Jan 31, 2024
2 parents f195229 + 0e6a743 commit b6d5ae2
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,30 @@ public interface Configuration {
*/
void setSourceValueSerde(Serde<?> serde);

/**
* @return The serde for values fed to the {@link Processor}
*/
Serde<?> getSourceValueSerde();

/**
* Serializer for values forwarded by the {@link Processor}. Use the setter if you want
* to override it.
*/
void setSinkValueSerializer(Serializer<?> serializer);

/**
* @return The serializer for values forwarded by the {@link Processor}
*/
Serializer<?> getSinkValueSerializer();

/**
* Store configuration to be used by the {@link Processor}. Use the setter if you want
* to override it.
*
* @param storeConfigurations
*/
void setStoreConfigurations(List<StoreConfiguration> storeConfigurations);

/**
* @return The state store configuration to be used by the {@link Processor}
*/
List<StoreConfiguration> getStoreConfigurations();
}
4 changes: 4 additions & 0 deletions docs/modules/ROOT/pages/index.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,8 @@ It is available through the following dependency:
</dependency>
----

NOTE: Multiple customizers can be defined, and their execution order controlled through `@Priority` annotations.

===== Example

In this example, we implement a processor working with a POJO as value type.
Expand Down Expand Up @@ -601,6 +603,8 @@ It is available through the following dependency:
</dependency>
----

NOTE: Multiple customizers can be defined, and their execution order controlled through `@Priority` annotations.

=== Example

In this example, we implement a processor which is using a State-Store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public class TopologyProducer {
* If not defined the {@link DefaultConfigurationCustomizer} is injected.
* </p>
*/
private final ConfigurationCustomizer configCustomizer;
private final Instance<ConfigurationCustomizer> configCustomizers;

/**
* The source configuration bean which produces the mapping between source and their respective topics
Expand Down Expand Up @@ -101,10 +101,10 @@ public class TopologyProducer {
*/
@Inject
public TopologyProducer(KStreamsProcessorConfig kStreamsProcessorConfig,
ConfigurationCustomizer configCustomizer, SourceToTopicsMappingBuilder sourceToTopicsMappingBuilder,
Instance<ConfigurationCustomizer> configCustomizer, SourceToTopicsMappingBuilder sourceToTopicsMappingBuilder,
SinkToTopicMappingBuilder sinkToTopicMappingBuilder, Instance<ProducerOnSendInterceptor> interceptors) {
this.kStreamsProcessorConfig = kStreamsProcessorConfig;
this.configCustomizer = configCustomizer;
this.configCustomizers = configCustomizer;
this.sourceToTopicsMappingBuilder = sourceToTopicsMappingBuilder;
this.sinkToTopicMappingBuilder = sinkToTopicMappingBuilder;
this.interceptors = interceptors;
Expand Down Expand Up @@ -136,8 +136,8 @@ public TopologyConfigurationImpl configuration(BeanManager beanManager,
TopologyConfigurationImpl configuration = initializeConfiguration(beanManager);
// Step 2: apply default configuration
defaultConfiguration.apply(configuration);
// Step 3: if an alternative customizer is provided, then apply it (default does nothing)
configCustomizer.fillConfiguration(configuration);
// Step 3: if alternative customizers are provided, then apply them (default does nothing)
configCustomizers.forEach(customizer -> customizer.fillConfiguration(configuration));
return configuration;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -71,7 +70,7 @@ class TopologyProducerTest {
SinkToTopicMappingBuilder sinkToTopicMappingBuilder;

@Mock
ConfigurationCustomizer configCustomizer;
Instance<ConfigurationCustomizer> configCustomizer;

@Mock
Instance<ProducerOnSendInterceptor> interceptors;
Expand Down Expand Up @@ -109,7 +108,7 @@ public void setUp() {
private TopologyProducer newTopologyProducer(
Map<String, String[]> sourceToTopicMapping,
Map<String, String> sinkToTopicMapping,
String dlq, List<String> stores) {
String dlq) {
when(kStreamsProcessorConfig.dlq()).thenReturn(dlqConfig);
when(dlqConfig.topic()).thenReturn(Optional.ofNullable(dlq));
when(sourceToTopicsMappingBuilder.sourceToTopicsMapping()).thenReturn(sourceToTopicMapping);
Expand Down Expand Up @@ -186,7 +185,7 @@ void topology_whenMultipleOutputTopics_shouldGenerateTopology() {
TopologyProducer topologyProducer = newTopologyProducer(
Map.of("source", new String[] { "ping-topic" }),
Map.of("pong", "pong-topic", "pang", "pang-topic"),
null, Collections.emptyList());
null);

TopologyDescription topology = topologyProducer.topology(configuration, processorSupplier).describe();

Expand All @@ -201,7 +200,7 @@ void topology_whenMultipleSources_shouldGenerateTopology() {
TopologyProducer topologyProducer = newTopologyProducer(
Map.of("ping", new String[] { "ping-topic", "ping-topic2" }, "pang", new String[] { "pang-topic" }),
Map.of("pong", "pong-topic"),
null, Collections.emptyList());
null);

TopologyDescription topology = topologyProducer.topology(configuration, processorSupplier).describe();

Expand All @@ -217,7 +216,7 @@ void topology_whenMultipleOutputTopicsAndDLQ_shouldGenerateTopology() {
TopologyProducer topologyProducer = newTopologyProducer(
Map.of("source", new String[] { "ping-topic" }),
Map.of("pong", "pong-topic", "pang", "pang-topic"),
"local-dlq", Collections.emptyList());
"local-dlq");

TopologyDescription topology = topologyProducer.topology(configuration, processorSupplier).describe();

Expand All @@ -232,7 +231,7 @@ void topology_whenMultipleOutputTopicsAndDLQAndStores_shouldGenerateTopology() {
TopologyProducer topologyProducer = newTopologyProducer(
Map.of("source", new String[] { "ping-topic" }),
Map.of("pong", "pong-topic", "pang", "pang-topic"),
null, Arrays.asList("ping-indexes", "ping-data"));
null);

List<StoreConfiguration> storeConfigurations = buildStoreConfiguration();

Expand Down

0 comments on commit b6d5ae2

Please # to comment.