diff --git a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java index 24095aa7e4231..08bbf16c3bc94 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java +++ b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java @@ -60,12 +60,8 @@ public Flux request(D demand) { static class LongTermSubConsumer implements SubConsumer> { - private static final Comparator> supplyComparator = new Comparator<>() { - @Override - public int compare(SupplyMessage o1, SupplyMessage o2) { - return o1.getSequence() - o2.getSequence(); - } - }; + private static final Comparator> supplyComparator = + Comparator.comparing(el -> el.getSequence()); final Sinks.Many sink; final Identifier demandId; @@ -94,8 +90,6 @@ public boolean consume(SupplyMessage supply) { if (supply.getSequence() == nextSequence) { consume0(supply); nextSequence += 1; - } else { - supplies.add(supply); while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) { final SupplyMessage pended = supplies.poll(); if (pended != null) { @@ -103,6 +97,8 @@ public boolean consume(SupplyMessage supply) { nextSequence += 1; } } + } else { + supplies.add(supply); } } return true; diff --git a/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java index be18043ddf289..c659ebd08acc2 100644 --- a/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java +++ b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java @@ -21,6 +21,8 @@ import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory; import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor; import com.navercorp.pinpoint.redis.stream.RedisStreamConfig; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.context.ApplicationContext; @@ -41,6 +43,28 @@ @DisplayName("req/res based on redis") public class RedisReqResTest { + private static GenericContainer redisContainer; + + @BeforeAll + @SuppressWarnings("resource") + public static void beforeAll() { + redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0")) + .withExposedPorts(6379) + .withReuse(true); + redisContainer.start(); + + System.setProperty("spring.data.redis.host", redisContainer.getHost()); + System.setProperty("spring.redis.host", redisContainer.getHost()); + System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString()); + System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString()); + } + + @AfterAll + public static void afterAll() { + redisContainer.stop(); + redisContainer.close(); + } + @DisplayName("req/res based on redis pubsub") @Test public void testRedisPubSub() { @@ -54,31 +78,11 @@ public void testRedisStreamPubSub() { } private void testConfigClass(Class configClass) { - runWithRedisContainer(() -> { - final ApplicationContext context = new AnnotationConfigApplicationContext(configClass); - testServerClientFactory( - context.getBean(PubSubServerFactory.class), - context.getBean(PubSubClientFactory.class) - ); - }); - } - - @SuppressWarnings("resource") - private void runWithRedisContainer(Runnable r) { - try (final GenericContainer redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0")) - .withExposedPorts(6379) - .withReuse(true) - ) { - redisContainer.start(); - System.setProperty("spring.data.redis.host", redisContainer.getHost()); - System.setProperty("spring.redis.host", redisContainer.getHost()); - System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString()); - System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString()); - - r.run(); - - redisContainer.stop(); - } + final ApplicationContext context = new AnnotationConfigApplicationContext(configClass); + testServerClientFactory( + context.getBean(PubSubServerFactory.class), + context.getBean(PubSubClientFactory.class) + ); } private void testServerClientFactory( @@ -99,6 +103,7 @@ private void testServerClientFactory( PubSubServiceDescriptor.flux("range", Integer.class, Integer.class); serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet(); assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4)); + assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2)); } private S syncRequestMono(