From 33bba9ef4aac055005b0d927d1cc478dc71ade0c Mon Sep 17 00:00:00 2001 From: "Matthias J. Sax" Date: Fri, 7 Feb 2025 14:41:03 -0800 Subject: [PATCH] MINOR: cleanup KStream JavaDocs (9/N) - flatMap[Values] (#18805) Reviewers: Lucas Brutschy --- .../apache/kafka/streams/kstream/KStream.java | 275 ++++-------------- .../kstream/internals/KStreamImpl.java | 66 ++--- .../kstream/internals/KStreamImplTest.java | 66 ++--- 3 files changed, 127 insertions(+), 280 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 6212e39e0da9e..d3929d72529e8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -124,7 +124,7 @@ public interface KStream { * } * }); * } - * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation + * Setting a new key might result in an internal data redistribution if a key-based operator (like an aggregation * or join) is applied to the result {@code KStream}. * * @param mapper @@ -169,7 +169,7 @@ KStream selectKey(final KeyValueMapper * * Setting a new value preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation + * Thus, no internal data redistribution is required if a key-based operator (like an aggregation * or join) is applied to the result {@code KStream} (cf. {@link #map(KeyValueMapper)}). * * @param mapper @@ -256,14 +256,14 @@ KStream map(final KeyValueMapper} can be transformed into output records {@code , , ...}. + * Create a new {@code KStream} that consists of zero or more records for each record in this stream. + * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records + * (possibly of a different key and/or value type) for it. + * Thus, an input record {@code } can be transformed into output records {@code , , ...}. * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for * stateful record processing). - *

- * The example below splits input records {@code } containing sentences as values into their words + * + *

The example below splits input records {@code } containing sentences as values into their words * and emit a record {@code } for each word. *

{@code
      * KStream inputStream = builder.stream("topic");
@@ -281,89 +281,47 @@  KStream map(final KeyValueMapper
-     * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
-     * and the return value must not be {@code null}.
-     * 

- * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation - * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)}) + * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} + * type) and the return value must not be {@code null}. + * + *

Flat-mapping records might result in an internal data redistribution if a key-based operator (like an + * aggregation or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)}) + * + * @param mapper + * a {@link KeyValueMapper KeyValueMapper<K, V, Iterable<KeyValue<K', V'>>>} that + * computes zero of more new {@link KeyValue} pairs for each input record + * + * @param the key type of the result stream + * @param the value type of the result stream + * + * @return A {@code KStream} that contains more or fewer records with new keys and values (possibly of different types). * - * @param mapper a {@link KeyValueMapper} that computes the new output records - * @param the key type of the result stream - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type) * @see #selectKey(KeyValueMapper) * @see #map(KeyValueMapper) * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) */ - KStream flatMap(final KeyValueMapper>> mapper); + KStream flatMap(final KeyValueMapper>> mapper); /** - * Transform each record of the input stream into zero or more records in the output stream (both key and value type - * can be altered arbitrarily). - * The provided {@link KeyValueMapper} is applied to each input record and computes zero or more output records. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * This is a stateless record-by-record operation (cf. {@link #process(ProcessorSupplier, String...)} for - * stateful record transformation). - *

- * The example below splits input records {@code } containing sentences as values into their words - * and emit a record {@code } for each word. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.flatMap(
-     *     new KeyValueMapper>> {
-     *         Iterable> apply(byte[] key, String value) {
-     *             String[] tokens = value.split(" ");
-     *             List> result = new ArrayList<>(tokens.length);
+     * See {@link #flatMap(KeyValueMapper)}.
      *
-     *             for(String token : tokens) {
-     *                 result.add(new KeyValue<>(token, 1));
-     *             }
-     *
-     *             return result;
-     *         }
-     *     });
-     * }
- * The provided {@link KeyValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) - * and the return value must not be {@code null}. - *

- * Flat-mapping records might result in an internal data redistribution if a key based operator (like an aggregation - * or join) is applied to the result {@code KStream}. (cf. {@link #flatMapValues(ValueMapper)}) - * - * @param mapper a {@link KeyValueMapper} that computes the new output records - * @param named a {@link Named} config used to name the processor in the topology - * @param the key type of the result stream - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with new key and value (possibly of different type) - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #flatMapValues(ValueMapper) - * @see #flatMapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream flatMap(final KeyValueMapper>> mapper, - final Named named); + KStream flatMap(final KeyValueMapper>> mapper, + final Named named); /** - * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values - * with the same key in the new stream. - * Transform the value of each input record into zero or more records with the same (unmodified) key in the output - * stream (value type can be altered arbitrarily). - * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. + * Create a new {@code KStream} that consists of zero or more records with modified value for each record + * in this stream. + * The provided {@link ValueMapper} is applied to each input record value and computes zero or more output values + * (possibly of a different type) for it. + * Thus, an input record {@code } can be transformed into output records {@code , , ...}. + * If you need read access to the input record key, use {@link #flatMapValues(ValueMapperWithKey)}. * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} * for stateful value processing). - *

- * The example below splits input records {@code } containing sentences as values into their words. + * + *

The example below splits input records {@code } containing sentences as values into their words. *

{@code
      * KStream inputStream = builder.stream("topic");
      * KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
@@ -374,159 +332,48 @@  KStream flatMap(final KeyValueMapper
      * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type)
      * and the return value must not be {@code null}.
-     * 

- * Splitting a record into multiple records with the same key preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)}) * - * @param mapper a {@link ValueMapper} the computes the new output values - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type + *

Splitting a record into multiple records with the same key preserves data co-location with respect to the key. + * Thus, no internal data redistribution is required if a key-based operator (like an aggregation or join) + * is applied to the result {@code KStream} (cf. {@link #flatMap(KeyValueMapper)}). + * + * @param mapper + * a {@link ValueMapper ValueMapper<V, Iterable<V>>} that computes zero or more new values + * for each input record + * + * @param the value type of the result stream + * + * @return A {@code KStream} that contains more or fewer records with unmodified keys but new values (possibly of a different type). + * * @see #selectKey(KeyValueMapper) * @see #map(KeyValueMapper) * @see #flatMap(KeyValueMapper) * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) */ - KStream flatMapValues(final ValueMapper> mapper); + KStream flatMapValues(final ValueMapper> mapper); /** - * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values - * with the same key in the new stream. - * Transform the value of each input record into zero or more records with the same (unmodified) key in the output - * stream (value type can be altered arbitrarily). - * The provided {@link ValueMapper} is applied to each input record and computes zero or more output values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful value processing). - *

- * The example below splits input records {@code } containing sentences as values into their words. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
-     *     Iterable apply(String value) {
-     *         return Arrays.asList(value.split(" "));
-     *     }
-     * });
-     * }
- * The provided {@link ValueMapper} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) - * and the return value must not be {@code null}. - *

- * Splitting a record into multiple records with the same key preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)}) + * See {@link #flatMapValues(ValueMapper)}. * - * @param mapper a {@link ValueMapper} the computes the new output values - * @param named a {@link Named} config used to name the processor in the topology - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream flatMapValues(final ValueMapper> mapper, - final Named named); + KStream flatMapValues(final ValueMapper> mapper, + final Named named); + /** - * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values - * with the same key in the new stream. - * Transform the value of each input record into zero or more records with the same (unmodified) key in the output - * stream (value type can be altered arbitrarily). - * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful value processing). - *

- * The example below splits input records {@code }, with key=1, containing sentences as values - * into their words. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
-     *     Iterable apply(Integer readOnlyKey, String value) {
-     *         if(readOnlyKey == 1) {
-     *             return Arrays.asList(value.split(" "));
-     *         } else {
-     *             return Arrays.asList(value);
-     *         }
-     *     }
-     * });
-     * }
- * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) - * and the return value must not be {@code null}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)}) + * See {@link #flatMapValues(ValueMapper)}. * - * @param mapper a {@link ValueMapperWithKey} the computes the new output values - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) + *

Note that the key is read-only and must not be modified, as this can lead to corrupt partitioning. */ - KStream flatMapValues(final ValueMapperWithKey> mapper); + KStream flatMapValues(final ValueMapperWithKey> mapper); /** - * Create a new {@code KStream} by transforming the value of each record in this stream into zero or more values - * with the same key in the new stream. - * Transform the value of each input record into zero or more records with the same (unmodified) key in the output - * stream (value type can be altered arbitrarily). - * The provided {@link ValueMapperWithKey} is applied to each input record and computes zero or more output values. - * Thus, an input record {@code } can be transformed into output records {@code , , ...}. - * This is a stateless record-by-record operation (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)} - * for stateful value processing). - *

- * The example below splits input records {@code }, with key=1, containing sentences as values - * into their words. - *

{@code
-     * KStream inputStream = builder.stream("topic");
-     * KStream outputStream = inputStream.flatMapValues(new ValueMapper> {
-     *     Iterable apply(Integer readOnlyKey, String value) {
-     *         if(readOnlyKey == 1) {
-     *             return Arrays.asList(value.split(" "));
-     *         } else {
-     *             return Arrays.asList(value);
-     *         }
-     *     }
-     * });
-     * }
- * The provided {@link ValueMapperWithKey} must return an {@link Iterable} (e.g., any {@link java.util.Collection} type) - * and the return value must not be {@code null}. - *

- * Note that the key is read-only and should not be modified, as this can lead to corrupt partitioning. - * So, splitting a record into multiple records with the same key preserves data co-location with respect to the key. - * Thus, no internal data redistribution is required if a key based operator (like an aggregation or join) - * is applied to the result {@code KStream}. (cf. {@link #flatMap(KeyValueMapper)}) + * See {@link #flatMapValues(ValueMapperWithKey)}. * - * @param mapper a {@link ValueMapperWithKey} the computes the new output values - * @param named a {@link Named} config used to name the processor in the topology - * @param the value type of the result stream - * @return a {@code KStream} that contains more or less records with unmodified keys and new values of different type - * @see #selectKey(KeyValueMapper) - * @see #map(KeyValueMapper) - * @see #flatMap(KeyValueMapper) - * @see #mapValues(ValueMapper) - * @see #mapValues(ValueMapperWithKey) - * @see #process(ProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, String...) - * @see #processValues(FixedKeyProcessorSupplier, Named, String...) + *

Takes an additional {@link Named} parameter that is used to name the processor in the topology. */ - KStream flatMapValues(final ValueMapperWithKey> mapper, - final Named named); + KStream flatMapValues(final ValueMapperWithKey> mapper, + final Named named); /** * Print the records of this {@code KStream} using the options provided by {@link Printed}. @@ -2427,7 +2274,7 @@ KStream leftJoin(final GlobalKTable globalTable, * Even if any upstream operation was key-changing, no auto-repartition is triggered. * If repartitioning is required, a call to {@link #repartition()} should be performed before {@code process()}. *

- * Processing records might result in an internal data redistribution if a key based operator (like an aggregation + * Processing records might result in an internal data redistribution if a key-based operator (like an aggregation * or join) is applied to the result {@code KStream}. * (cf. {@link #processValues(FixedKeyProcessorSupplier, String...)}) * diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 27ab0f9a7d6bb..b5afacc698853 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -149,8 +149,8 @@ public KStream filter(final Predicate predicate) { @Override public KStream filter(final Predicate predicate, final Named named) { - Objects.requireNonNull(predicate, "predicate can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(predicate, "predicate cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); final ProcessorParameters processorParameters = @@ -178,8 +178,8 @@ public KStream filterNot(final Predicate predicate) @Override public KStream filterNot(final Predicate predicate, final Named named) { - Objects.requireNonNull(predicate, "predicate can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(predicate, "predicate cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME); final ProcessorParameters processorParameters = @@ -207,8 +207,8 @@ public KStream selectKey(final KeyValueMapper KStream selectKey(final KeyValueMapper mapper, final Named named) { - Objects.requireNonNull(mapper, "mapper can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(mapper, "mapper cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final ProcessorGraphNode selectKeyProcessorNode = internalSelectKey(mapper, new NamedInternal(named)); selectKeyProcessorNode.setKeyChangingOperation(true); @@ -255,8 +255,8 @@ public KStream mapValues(final ValueMapperWithKey KStream mapValues(final ValueMapperWithKey valueMapperWithKey, final Named named) { - Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(valueMapperWithKey, "valueMapperWithKey cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAPVALUES_NAME); final ProcessorParameters processorParameters = @@ -286,8 +286,8 @@ public KStream map(final KeyValueMapper KStream map(final KeyValueMapper> mapper, final Named named) { - Objects.requireNonNull(mapper, "mapper can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(mapper, "mapper cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME); final ProcessorParameters processorParameters = @@ -310,18 +310,18 @@ public KStream map(final KeyValueMapper KStream flatMap(final KeyValueMapper>> mapper) { + public KStream flatMap(final KeyValueMapper>> mapper) { return flatMap(mapper, NamedInternal.empty()); } @Override - public KStream flatMap(final KeyValueMapper>> mapper, - final Named named) { - Objects.requireNonNull(mapper, "mapper can't be null"); - Objects.requireNonNull(named, "named can't be null"); + public KStream flatMap(final KeyValueMapper>> mapper, + final Named named) { + Objects.requireNonNull(mapper, "mapper cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAP_NAME); - final ProcessorParameters processorParameters = + final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMap<>(mapper), name); final ProcessorGraphNode flatMapNode = new ProcessorGraphNode<>(name, processorParameters); @@ -334,29 +334,29 @@ public KStream flatMap(final KeyValueMapper KStream flatMapValues(final ValueMapper> mapper) { + public KStream flatMapValues(final ValueMapper> mapper) { return flatMapValues(withKey(mapper)); } @Override - public KStream flatMapValues(final ValueMapper> mapper, - final Named named) { + public KStream flatMapValues(final ValueMapper> mapper, + final Named named) { return flatMapValues(withKey(mapper), named); } @Override - public KStream flatMapValues(final ValueMapperWithKey> mapper) { + public KStream flatMapValues(final ValueMapperWithKey> mapper) { return flatMapValues(mapper, NamedInternal.empty()); } @Override - public KStream flatMapValues(final ValueMapperWithKey> valueMapper, - final Named named) { - Objects.requireNonNull(valueMapper, "valueMapper can't be null"); - Objects.requireNonNull(named, "named can't be null"); + public KStream flatMapValues(final ValueMapperWithKey> valueMapper, + final Named named) { + Objects.requireNonNull(valueMapper, "valueMapper cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FLATMAPVALUES_NAME); - final ProcessorParameters processorParameters = + final ProcessorParameters processorParameters = new ProcessorParameters<>(new KStreamFlatMapValues<>(valueMapper), name); final ProcessorGraphNode flatMapValuesNode = new ProcessorGraphNode<>(name, processorParameters); @@ -377,7 +377,7 @@ public KStream flatMapValues(final ValueMapperWithKey printed) { - Objects.requireNonNull(printed, "printed can't be null"); + Objects.requireNonNull(printed, "printed cannot be null"); final PrintedInternal printedInternal = new PrintedInternal<>(printed); final String name = new NamedInternal(printedInternal.name()).orElseGenerateWithPrefix(builder, PRINTING_NAME); @@ -397,8 +397,8 @@ public void foreach(final ForeachAction action) { @Override public void foreach(final ForeachAction action, final Named named) { - Objects.requireNonNull(action, "action can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(action, "action cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, FOREACH_NAME); final ProcessorParameters processorParameters = @@ -417,8 +417,8 @@ public KStream peek(final ForeachAction action) { @Override public KStream peek(final ForeachAction action, final Named named) { - Objects.requireNonNull(action, "action can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(action, "action cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); final String name = new NamedInternal(named).orElseGenerateWithPrefix(builder, PEEK_NAME); final ProcessorParameters processorParameters = @@ -445,7 +445,7 @@ public BranchedKStream split() { @Override public BranchedKStream split(final Named named) { - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(named, "named cannot be null"); return new BranchedKStreamImpl<>(this, repartitionRequired, new NamedInternal(named)); } @@ -457,8 +457,8 @@ public KStream merge(final KStream stream) { @Override public KStream merge(final KStream stream, final Named named) { - Objects.requireNonNull(stream, "stream can't be null"); - Objects.requireNonNull(named, "named can't be null"); + Objects.requireNonNull(stream, "stream cannot be null"); + Objects.requireNonNull(named, "named cannot be null"); return merge(builder, stream, new NamedInternal(named)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index a351a6a812c39..b5645a02d4a36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -134,7 +134,7 @@ public void shouldNotAllowNullPredicateOnFilter() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filter(null)); - assertThat(exception.getMessage(), equalTo("predicate can't be null")); + assertThat(exception.getMessage(), equalTo("predicate cannot be null")); } @Test @@ -142,7 +142,7 @@ public void shouldNotAllowNullPredicateOnFilterWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filter(null, Named.as("filter"))); - assertThat(exception.getMessage(), equalTo("predicate can't be null")); + assertThat(exception.getMessage(), equalTo("predicate cannot be null")); } @Test @@ -150,7 +150,7 @@ public void shouldNotAllowNullNamedOnFilter() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filter((k, v) -> true, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -158,7 +158,7 @@ public void shouldNotAllowNullPredicateOnFilterNot() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filterNot(null)); - assertThat(exception.getMessage(), equalTo("predicate can't be null")); + assertThat(exception.getMessage(), equalTo("predicate cannot be null")); } @Test @@ -166,7 +166,7 @@ public void shouldNotAllowNullPredicateOnFilterNotWithName() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filterNot(null, Named.as("filter"))); - assertThat(exception.getMessage(), equalTo("predicate can't be null")); + assertThat(exception.getMessage(), equalTo("predicate cannot be null")); } @Test @@ -174,7 +174,7 @@ public void shouldNotAllowNullNamedOnFilterNot() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.filterNot((k, v) -> true, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -182,7 +182,7 @@ public void shouldNotAllowNullMapperOnSelectKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.selectKey(null)); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -190,7 +190,7 @@ public void shouldNotAllowNullMapperOnSelectKeyWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.selectKey(null, Named.as("keySelector"))); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -198,7 +198,7 @@ public void shouldNotAllowNullNamedOnSelectKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.selectKey((k, v) -> k, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -206,7 +206,7 @@ public void shouldNotAllowNullMapperOnMap() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.map(null)); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -214,7 +214,7 @@ public void shouldNotAllowNullMapperOnMapWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.map(null, Named.as("map"))); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -222,7 +222,7 @@ public void shouldNotAllowNullNamedOnMap() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.map(KeyValue::pair, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -238,7 +238,7 @@ public void shouldNotAllowNullMapperOnMapValuesWithKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.mapValues((ValueMapperWithKey) null)); - assertThat(exception.getMessage(), equalTo("valueMapperWithKey can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapperWithKey cannot be null")); } @Test @@ -256,7 +256,7 @@ public void shouldNotAllowNullMapperOnMapValuesWithKeyWithNamed() { () -> testStream.mapValues( (ValueMapperWithKey) null, Named.as("valueMapperWithKey"))); - assertThat(exception.getMessage(), equalTo("valueMapperWithKey can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapperWithKey cannot be null")); } @Test @@ -264,7 +264,7 @@ public void shouldNotAllowNullNamedOnMapValues() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.mapValues(v -> v, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -272,7 +272,7 @@ public void shouldNotAllowNullNamedOnMapValuesWithKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.mapValues((k, v) -> v, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -280,7 +280,7 @@ public void shouldNotAllowNullMapperOnFlatMap() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMap(null)); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -288,7 +288,7 @@ public void shouldNotAllowNullMapperOnFlatMapWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMap(null, Named.as("flatMapper"))); - assertThat(exception.getMessage(), equalTo("mapper can't be null")); + assertThat(exception.getMessage(), equalTo("mapper cannot be null")); } @Test @@ -296,7 +296,7 @@ public void shouldNotAllowNullNamedOnFlatMap() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMap((k, v) -> Collections.singleton(new KeyValue<>(k, v)), null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -312,7 +312,7 @@ public void shouldNotAllowNullMapperOnFlatMapValuesWithKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMapValues((ValueMapperWithKey>) null)); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -332,7 +332,7 @@ public void shouldNotAllowNullMapperOnFlatMapValuesWithKeyWithNamed() { () -> testStream.flatMapValues( (ValueMapperWithKey>) null, Named.as("flatValueMapperWithKey"))); - assertThat(exception.getMessage(), equalTo("valueMapper can't be null")); + assertThat(exception.getMessage(), equalTo("valueMapper cannot be null")); } @Test @@ -340,7 +340,7 @@ public void shouldNotAllowNullNameOnFlatMapValues() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMapValues(v -> Collections.emptyList(), null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -348,7 +348,7 @@ public void shouldNotAllowNullNameOnFlatMapValuesWithKey() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.flatMapValues((k, v) -> Collections.emptyList(), null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -356,7 +356,7 @@ public void shouldNotAllowNullPrintedOnPrint() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.print(null)); - assertThat(exception.getMessage(), equalTo("printed can't be null")); + assertThat(exception.getMessage(), equalTo("printed cannot be null")); } @Test @@ -364,7 +364,7 @@ public void shouldNotAllowNullActionOnForEach() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.foreach(null)); - assertThat(exception.getMessage(), equalTo("action can't be null")); + assertThat(exception.getMessage(), equalTo("action cannot be null")); } @Test @@ -372,7 +372,7 @@ public void shouldNotAllowNullActionOnForEachWithName() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.foreach(null, Named.as("foreach"))); - assertThat(exception.getMessage(), equalTo("action can't be null")); + assertThat(exception.getMessage(), equalTo("action cannot be null")); } @Test @@ -380,7 +380,7 @@ public void shouldNotAllowNullNamedOnForEach() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.foreach((k, v) -> { }, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -388,7 +388,7 @@ public void shouldNotAllowNullActionOnPeek() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.peek(null)); - assertThat(exception.getMessage(), equalTo("action can't be null")); + assertThat(exception.getMessage(), equalTo("action cannot be null")); } @Test @@ -396,7 +396,7 @@ public void shouldNotAllowNullActionOnPeekWithName() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.peek(null, Named.as("peek"))); - assertThat(exception.getMessage(), equalTo("action can't be null")); + assertThat(exception.getMessage(), equalTo("action cannot be null")); } @Test @@ -404,7 +404,7 @@ public void shouldNotAllowNullNamedOnPeek() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.peek((k, v) -> { }, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test @@ -412,7 +412,7 @@ public void shouldNotAllowNullKStreamOnMerge() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.merge(null)); - assertThat(exception.getMessage(), equalTo("stream can't be null")); + assertThat(exception.getMessage(), equalTo("stream cannot be null")); } @Test @@ -420,7 +420,7 @@ public void shouldNotAllowNullKStreamOnMergeWithNamed() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.merge(null, Named.as("merge"))); - assertThat(exception.getMessage(), equalTo("stream can't be null")); + assertThat(exception.getMessage(), equalTo("stream cannot be null")); } @Test @@ -428,7 +428,7 @@ public void shouldNotAllowNullNamedOnMerge() { final NullPointerException exception = assertThrows( NullPointerException.class, () -> testStream.merge(testStream, null)); - assertThat(exception.getMessage(), equalTo("named can't be null")); + assertThat(exception.getMessage(), equalTo("named cannot be null")); } @Test