diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 6908edc10f..cc80a5b106 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -71,6 +71,7 @@ * @author Pawel Lozinski * @author Adrian Chlebosz * @author Soby Chacko + * @author Sanghyeok An * * @since 3.1 */ @@ -532,7 +533,7 @@ public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd List notEmbedded = Arrays.stream(topicsToConsume) .filter(topic -> !this.topics.contains(topic)) .collect(Collectors.toList()); - if (notEmbedded.size() > 0) { + if (!notEmbedded.isEmpty()) { throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); } final AtomicReference> assigned = new AtomicReference<>(); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index 11836591c3..f50c9f214f 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -88,6 +88,7 @@ * @author Pawel Lozinski * @author Adrian Chlebosz * @author Soby Chacko + * @author Sanghyeok An * * @since 2.2 */ @@ -734,7 +735,7 @@ public void consumeFromEmbeddedTopics(Consumer consumer, boolean seekToEnd List notEmbedded = Arrays.stream(topicsToConsume) .filter(topic -> !this.topics.contains(topic)) .collect(Collectors.toList()); - if (notEmbedded.size() > 0) { + if (!notEmbedded.isEmpty()) { throw new IllegalStateException("topic(s):'" + notEmbedded + "' are not in embedded topic list"); } final AtomicReference> assigned = new AtomicReference<>(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java index 1876283145..634eb5c7bf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java @@ -141,6 +141,7 @@ * @author Filip Halemba * @author Tomaz Fernandes * @author Wang Zhiyang + * @author Sanghyeok An * * @see KafkaListener * @see KafkaListenerErrorHandler @@ -349,7 +350,7 @@ private void buildEnhancer() { if (this.applicationContext != null) { Map enhancersMap = this.applicationContext.getBeansOfType(AnnotationEnhancer.class, false, false); - if (enhancersMap.size() > 0) { + if (!enhancersMap.isEmpty()) { List enhancers = enhancersMap.values() .stream() .sorted(new OrderComparator()) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaAvroBeanRegistrationAotProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaAvroBeanRegistrationAotProcessor.java index 7d816a1c64..fb4b727e94 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaAvroBeanRegistrationAotProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/aot/KafkaAvroBeanRegistrationAotProcessor.java @@ -41,6 +41,7 @@ * Detect and register Avro types for Apache Kafka listeners. * * @author Gary Russell + * @author Sagnhyeok An * @since 3.0 * */ @@ -80,7 +81,7 @@ public BeanRegistrationAotContribution processAheadOfTime(RegisteredBean registe } }, method -> method.getName().equals("onMessage")); } - if (avroTypes.size() > 0) { + if (!avroTypes.isEmpty()) { return (generationContext, beanRegistrationCode) -> { ReflectionHints reflectionHints = generationContext.getRuntimeHints().reflection(); avroTypes.forEach(type -> reflectionHints.registerType(type, diff --git a/spring-kafka/src/main/java/org/springframework/kafka/config/TopicBuilder.java b/spring-kafka/src/main/java/org/springframework/kafka/config/TopicBuilder.java index af6c967d22..a979d01cfc 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/config/TopicBuilder.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/config/TopicBuilder.java @@ -30,6 +30,7 @@ * {@link Optional#empty()} indicating the broker defaults will be applied. * * @author Gary Russell + * @author Sanghyeok An * @since 2.3 * */ @@ -132,7 +133,7 @@ public NewTopic build() { NewTopic topic = this.replicasAssignments == null ? new NewTopic(this.name, this.partitions, this.replicas) : new NewTopic(this.name, this.replicasAssignments); - if (this.configs.size() > 0) { + if (!this.configs.isEmpty()) { topic.configs(this.configs); } return topic; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java index 7b332d1d44..532be3298d 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/DefaultKafkaConsumerFactory.java @@ -73,6 +73,7 @@ * @author Chris Gilbert * @author Adrian Gygax * @author Yaniv Nahoum + * @author Sanghyeok An */ public class DefaultKafkaConsumerFactory extends KafkaResourceFactory implements ConsumerFactory, BeanNameAware, ApplicationContextAware { @@ -398,7 +399,7 @@ protected Consumer createKafkaConsumer(@Nullable String groupId, @Nullable boolean shouldModifyClientId = (this.configs.containsKey(ConsumerConfig.CLIENT_ID_CONFIG) && StringUtils.hasText(clientIdSuffix)) || overrideClientIdPrefix; if (groupId == null - && (properties == null || properties.stringPropertyNames().size() == 0) + && (properties == null || properties.stringPropertyNames().isEmpty()) && !shouldModifyClientId) { return createKafkaConsumer(new HashMap<>(this.configs)); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java index 92150cccd7..f4fb291d0e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/core/KafkaAdmin.java @@ -74,6 +74,7 @@ * @author Gary Russell * @author Artem Bilan * @author Adrian Gygax + * @author Sanghyeok An * * @since 1.3 */ @@ -239,7 +240,7 @@ public void afterSingletonsInstantiated() { */ public final boolean initialize() { Collection newTopics = newTopics(); - if (newTopics.size() > 0) { + if (!newTopics.isEmpty()) { AdminClient adminClient = null; try { adminClient = createAdmin(); @@ -399,7 +400,7 @@ protected Map getAdminConfig() { } private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topics) { - if (topics.size() > 0) { + if (!topics.isEmpty()) { Map topicNameToTopic = new HashMap<>(); topics.forEach(t -> topicNameToTopic.compute(t.name(), (k, v) -> t)); DescribeTopicsResult topicInfo = adminClient @@ -409,10 +410,10 @@ private void addOrModifyTopicsIfNeeded(AdminClient adminClient, Collection topicsToAdd = new ArrayList<>(); Map topicsWithPartitionMismatches = checkPartitions(topicNameToTopic, topicInfo, topicsToAdd); - if (topicsToAdd.size() > 0) { + if (!topicsToAdd.isEmpty()) { addTopics(adminClient, topicsToAdd); } - if (topicsWithPartitionMismatches.size() > 0) { + if (!topicsWithPartitionMismatches.isEmpty()) { createMissingPartitions(adminClient, topicsWithPartitionMismatches); } if (this.modifyTopicConfigs) { @@ -457,7 +458,7 @@ private Map> checkTopicsForConfigMismatches( configMismatchesEntries.add(actualConfigParameter); } } - if (configMismatchesEntries.size() > 0) { + if (!configMismatchesEntries.isEmpty()) { configMismatches.put(topicConfig.getKey(), configMismatchesEntries); } } @@ -491,7 +492,7 @@ private void adjustConfigMismatches(AdminClient adminClient, Collection 0) { + if (!alterConfigOperations.isEmpty()) { try { AlterConfigsResult alterConfigsResult = adminClient .incrementalAlterConfigs(Map.of(topicConfigResource, alterConfigOperations)); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java index 8ef258155b..4a9a6ba565 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractMessageListenerContainer.java @@ -70,6 +70,7 @@ * @author Tomaz Fernandes * @author Wang Zhiyang * @author Soby Chacko + * @author Sanghyeok An */ public abstract class AbstractMessageListenerContainer implements GenericMessageListenerContainer, BeanNameAware, ApplicationEventPublisherAware, @@ -570,7 +571,7 @@ protected void checkTopics() { catch (Exception e) { this.logger.error(e, "Failed to check topic existence"); } - if (missing != null && missing.size() > 0) { + if (missing != null && !missing.isEmpty()) { throw new IllegalStateException( "Topic(s) " + missing.toString() + " is/are not present and missingTopicsFatal is true"); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java index 097842cf4a..624a6127e3 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ConsumerProperties.java @@ -35,6 +35,7 @@ * Common consumer properties. * * @author Gary Russell + * @author Sagnhyeok An * @since 2.3 * */ @@ -520,7 +521,7 @@ protected final String renderProperties() { + (this.offsetAndMetadataProvider != null ? "\n offsetAndMetadataProvider=" + this.offsetAndMetadataProvider : "") + "\n syncCommits=" + this.syncCommits + (this.syncCommitTimeout != null ? "\n syncCommitTimeout=" + this.syncCommitTimeout : "") - + (this.kafkaConsumerProperties.size() > 0 ? "\n properties=" + this.kafkaConsumerProperties : "") + + (!this.kafkaConsumerProperties.isEmpty() ? "\n properties=" + this.kafkaConsumerProperties : "") + "\n authExceptionRetryInterval=" + this.authExceptionRetryInterval + "\n commitRetries=" + this.commitRetries + "\n fixTxOffsets" + this.fixTxOffsets; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java index fdb2ad1d75..1e02ebf844 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerGroupSequencer.java @@ -37,6 +37,7 @@ * idle. * * @author Gary Russell + * @author Sanghyeok An * @since 2.7.3 * */ @@ -186,7 +187,7 @@ public void initialize() { for (String group : this.groupNames) { this.groups.add(this.applicationContext.getBean(group + ".group", ContainerGroup.class)); } - if (this.groups.size() > 0) { + if (!this.groups.isEmpty()) { this.iterator = this.groups.iterator(); this.currentGroup = this.iterator.next(); this.groups.forEach(grp -> { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java index f14114f0f8..7f422bde3e 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/DefaultAfterRollbackProcessor.java @@ -51,6 +51,7 @@ * @author Gary Russell * @author Francois Rosiere * @author Wang Zhiyang + * @author Sanghyeok An * * @since 1.3.5 * @@ -210,7 +211,7 @@ public void processBatch(ConsumerRecords records, List offsets = new HashMap<>(); records.forEach(rec -> offsets.put(new TopicPartition(rec.topic(), rec.partition()), ListenerUtils.createOffsetAndMetadata(container, rec.offset() + 1))); - if (offsets.size() > 0 && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) { + if (!offsets.isEmpty() && this.kafkaTemplate != null && this.kafkaTemplate.isTransactional()) { this.kafkaTemplate.sendOffsetsToTransaction(offsets, consumer.groupMetadata()); } clearThreadState(); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java index e9ad1ccbfc..3bc231f8fe 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/BatchMessagingMessageListenerAdapter.java @@ -54,6 +54,7 @@ * @author Artem Bilan * @author Venil Noronha * @author Wang ZhiYang + * @author Sanghyeok An * @since 1.1 */ public class BatchMessagingMessageListenerAdapter extends MessagingMessageListenerAdapter @@ -95,6 +96,9 @@ public void setBatchMessageConverter(BatchMessageConverter messageConverter) { if (recordMessageConverter != null) { setMessageConverter(recordMessageConverter); } + else { + logger.warn("No batch message converter is set. because record message converter is null."); + } } /** diff --git a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java index ba63cabc37..69d6a39301 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/requestreply/AggregatingReplyingKafkaTemplate.java @@ -50,6 +50,7 @@ * @param the reply data type. * * @author Gary Russell + * @author Sanghyeok An * @since 2.3 * */ @@ -162,7 +163,7 @@ public void onMessage(List>>> } } }); - if (completed.size() > 0) { + if (!completed.isEmpty()) { super.onMessage(completed); } } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 3725b2ff0a..f8676fe825 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -31,6 +31,7 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Adrian Chlebosz + * @author Sanghyeok An * @since 2.7 * */ @@ -46,8 +47,7 @@ public DestinationTopic(String destinationName, Properties properties) { } public DestinationTopic(String destinationName, DestinationTopic sourceDestinationtopic, String suffix, Type type) { - this.destinationName = destinationName; - this.properties = new Properties(sourceDestinationtopic.properties, suffix, type); + this(destinationName, new Properties(sourceDestinationtopic.properties, suffix, type)); } public Long getDestinationDelay() { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java index d7ac37e200..8aa86ddc78 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/AbstractKafkaHeaderMapper.java @@ -43,6 +43,7 @@ * * @author Gary Russell * @author Artem Bilan + * @author Sanghyeok An * * @since 2.1.3 * @@ -216,7 +217,7 @@ protected boolean matchesForInbound(String header) { if (this.outbound) { return true; } - if (this.matchers.size() == 0) { + if (this.matchers.isEmpty()) { return true; } return doesMatch(header); diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java index 7afea7779e..9315ceac47 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/converter/BatchMessagingMessageConverter.java @@ -60,12 +60,14 @@ * @author Gary Russell * @author Dariusz Szablinski * @author Biju Kunjummen + * @author Sanghyeok An * @since 1.1 */ public class BatchMessagingMessageConverter implements BatchMessageConverter { protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); // NOSONAR + @Nullable private final RecordMessageConverter recordConverter; private boolean generateMessageId = false; @@ -123,6 +125,7 @@ public void setHeaderMapper(KafkaHeaderMapper headerMapper) { this.headerMapper = headerMapper; } + @Nullable @Override public RecordMessageConverter getRecordMessageConverter() { return this.recordConverter; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java index 5cc0fe63e1..851a1e9067 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/DelegatingByTopicSerialization.java @@ -39,6 +39,7 @@ * * @author Gary Russell * @author Wang Zhiyang + * @author Sanghyeok An * * @since 2.8 * @@ -108,7 +109,7 @@ public void setCaseSensitive(boolean caseSensitive) { @SuppressWarnings(UNCHECKED) protected void configure(Map configs, boolean isKey) { - if (this.delegates.size() > 0) { + if (!this.delegates.isEmpty()) { this.delegates.values().forEach(delegate -> configureDelegate(configs, isKey, delegate)); } this.forKeys = isKey; diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java index a1355692c4..a3521f6df4 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/KafkaMessageListenerContainerTests.java @@ -145,6 +145,7 @@ * @author Wang Zhiyang * @author Mikael Carlstedt * @author Borahm Lee + * @author Sanghyeok An */ @EmbeddedKafka(topics = { KafkaMessageListenerContainerTests.topic1, KafkaMessageListenerContainerTests.topic2, KafkaMessageListenerContainerTests.topic3, KafkaMessageListenerContainerTests.topic4, @@ -815,7 +816,7 @@ else if (polled.get() == 2) { latch1.countDown(); latch2.countDown(); acks.add(ack); - if (latch1.getCount() == 0 && records1.values().size() > 0 + if (latch1.getCount() == 0 && !records1.isEmpty() && records1.values().iterator().next().size() == 4) { acks.get(3).acknowledge(); acks.get(2).acknowledge();