diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java index 41bd32b196..e9ee234488 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaKraftBroker.java @@ -560,6 +560,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java index a4e3761acd..47244f1d87 100644 --- a/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java +++ b/spring-kafka-test/src/main/java/org/springframework/kafka/test/EmbeddedKafkaZKBroker.java @@ -763,6 +763,8 @@ public void onPartitionsAssigned(Collection partitions) { + (seekToEnd ? "end; " : "beginning")); if (seekToEnd) { consumer.seekToEnd(assigned.get()); + // seekToEnd is asynchronous. query the position to force the seek to happen now. + assigned.get().forEach(consumer::position); } else { consumer.seekToBeginning(assigned.get()); diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java index 50fe009357..ddc39b6052 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaKraftBrokerTests.java @@ -16,8 +16,13 @@ package org.springframework.kafka.test; +import java.util.Map; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import org.springframework.util.StringUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -37,4 +42,21 @@ void testUpDown() { kafka.destroy(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaKraftBroker kafka = new EmbeddedKafkaKraftBroker(1, 1, "seekTestTopic"); + kafka.afterPropertiesSet(); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java index 4688e38e2d..32a77b43e9 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/EmbeddedKafkaZKBrokerTests.java @@ -16,7 +16,14 @@ package org.springframework.kafka.test; +import java.util.Map; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.TopicPartition; import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; import static org.assertj.core.api.Assertions.assertThat; @@ -42,4 +49,22 @@ void testUpDown() { assertThat(System.getProperty(EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS)).isNull(); } + @Test + void testConsumeFromEmbeddedWithSeekToEnd() { + EmbeddedKafkaZKBroker kafka = new EmbeddedKafkaZKBroker(1); + kafka.afterPropertiesSet(); + kafka.addTopics("seekTestTopic"); + Map producerProps = KafkaTestUtils.producerProps(kafka); + KafkaProducer producer = new KafkaProducer<>(producerProps); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "beforeSeekToEnd")); + Map consumerProps = KafkaTestUtils.consumerProps("seekTest", "false", kafka); + KafkaConsumer consumer = new KafkaConsumer<>(consumerProps); + kafka.consumeFromAnEmbeddedTopic(consumer, true /* seekToEnd */, "seekTestTopic"); + producer.send(new ProducerRecord<>("seekTestTopic", 0, 1, "afterSeekToEnd")); + producer.close(); + assertThat(KafkaTestUtils.getSingleRecord(consumer, "seekTestTopic").value()) + .isEqualTo("afterSeekToEnd"); + consumer.close(); + } + } diff --git a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java index e5be343800..ac89494b8f 100644 --- a/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java +++ b/spring-kafka-test/src/test/java/org/springframework/kafka/test/utils/KafkaTestUtilsTests.java @@ -45,7 +45,7 @@ * */ @EmbeddedKafka(topics = {"singleTopic1", "singleTopic2", "singleTopic3", "singleTopic4", "singleTopic5", - "multiTopic1"}) + "multiTopic1", "seekToEndTopic1"}) public class KafkaTestUtilsTests { @Test