diff --git a/redis/pom.xml b/redis/pom.xml index 68507c54ed88..a08e230b144e 100644 --- a/redis/pom.xml +++ b/redis/pom.xml @@ -62,10 +62,16 @@ - junit - junit + org.junit.jupiter + junit-jupiter-api test + + org.testcontainers + junit-jupiter + 1.17.6 + + org.testcontainers testcontainers diff --git a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java index 24095aa7e423..08bbf16c3bc9 100644 --- a/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java +++ b/redis/src/main/java/com/navercorp/pinpoint/pubsub/endpoint/PubSubFluxClientImpl.java @@ -60,12 +60,8 @@ public Flux request(D demand) { static class LongTermSubConsumer implements SubConsumer> { - private static final Comparator> supplyComparator = new Comparator<>() { - @Override - public int compare(SupplyMessage o1, SupplyMessage o2) { - return o1.getSequence() - o2.getSequence(); - } - }; + private static final Comparator> supplyComparator = + Comparator.comparing(el -> el.getSequence()); final Sinks.Many sink; final Identifier demandId; @@ -94,8 +90,6 @@ public boolean consume(SupplyMessage supply) { if (supply.getSequence() == nextSequence) { consume0(supply); nextSequence += 1; - } else { - supplies.add(supply); while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) { final SupplyMessage pended = supplies.poll(); if (pended != null) { @@ -103,6 +97,8 @@ public boolean consume(SupplyMessage supply) { nextSequence += 1; } } + } else { + supplies.add(supply); } } return true; diff --git a/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisPubSubReqResTest.java b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisPubSubReqResTest.java new file mode 100644 index 000000000000..03abf83f778e --- /dev/null +++ b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisPubSubReqResTest.java @@ -0,0 +1,69 @@ +/* + * Copyright 2023 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.redis.pubsub; + +import com.navercorp.pinpoint.pubsub.endpoint.PubSubClientFactory; +import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import static com.navercorp.pinpoint.redis.pubsub.RedisStreamReqResTest.testPubSubServerClient; + +/** + * @author youngjin.kim2 + */ +@DisplayName("req/res based on redis pubsub") +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = {RedisPubSubConfig.class}) +@Testcontainers +public class RedisPubSubReqResTest { + + @Container + @SuppressWarnings("resource") + private static final GenericContainer redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0")) + .withExposedPorts(6379) + .withReuse(true); + + @Autowired + private PubSubServerFactory serverFactory; + + @Autowired + private PubSubClientFactory clientFactory; + + @BeforeAll + public static void beforeAll() { + System.setProperty("spring.data.redis.host", redisContainer.getHost()); + System.setProperty("spring.redis.host", redisContainer.getHost()); + System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString()); + System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString()); + } + + @DisplayName("req/res based on redis pubsub") + @Test + public void testRedisPubSub() { + testPubSubServerClient(this.serverFactory, this.clientFactory); + } + +} diff --git a/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisStreamReqResTest.java similarity index 64% rename from redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java rename to redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisStreamReqResTest.java index be18043ddf28..b2039ba54d0e 100644 --- a/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisReqResTest.java +++ b/redis/src/test/java/com/navercorp/pinpoint/redis/pubsub/RedisStreamReqResTest.java @@ -21,11 +21,16 @@ import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory; import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor; import com.navercorp.pinpoint.redis.stream.RedisStreamConfig; +import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; -import org.springframework.context.ApplicationContext; -import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; import org.testcontainers.containers.GenericContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; import org.testcontainers.utility.DockerImageName; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -38,53 +43,39 @@ /** * @author youngjin.kim2 */ -@DisplayName("req/res based on redis") -public class RedisReqResTest { +@DisplayName("req/res based on redis stream") +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = {RedisStreamConfig.class}) +@Testcontainers +public class RedisStreamReqResTest { - @DisplayName("req/res based on redis pubsub") - @Test - public void testRedisPubSub() { - testConfigClass(RedisPubSubConfig.class); + @Container + @SuppressWarnings("resource") + private static final GenericContainer redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0")) + .withExposedPorts(6379) + .withReuse(true); + + @Autowired + private PubSubServerFactory serverFactory; + + @Autowired + private PubSubClientFactory clientFactory; + + @BeforeAll + public static void beforeAll() { + System.setProperty("spring.data.redis.host", redisContainer.getHost()); + System.setProperty("spring.redis.host", redisContainer.getHost()); + System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString()); + System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString()); } @DisplayName("req/res based on redis stream") @Test public void testRedisStreamPubSub() { - testConfigClass(RedisStreamConfig.class); - } - - private void testConfigClass(Class configClass) { - runWithRedisContainer(() -> { - final ApplicationContext context = new AnnotationConfigApplicationContext(configClass); - testServerClientFactory( - context.getBean(PubSubServerFactory.class), - context.getBean(PubSubClientFactory.class) - ); - }); - } - - @SuppressWarnings("resource") - private void runWithRedisContainer(Runnable r) { - try (final GenericContainer redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0")) - .withExposedPorts(6379) - .withReuse(true) - ) { - redisContainer.start(); - System.setProperty("spring.data.redis.host", redisContainer.getHost()); - System.setProperty("spring.redis.host", redisContainer.getHost()); - System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString()); - System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString()); - - r.run(); - - redisContainer.stop(); - } + testPubSubServerClient(this.serverFactory, this.clientFactory); } - private void testServerClientFactory( - PubSubServerFactory serverFactory, - PubSubClientFactory clientFactory - ) { + static void testPubSubServerClient(PubSubServerFactory serverFactory, PubSubClientFactory clientFactory) { final PubSubMonoServiceDescriptor greeterService = PubSubServiceDescriptor.mono("greeter", String.class, String.class); serverFactory.build(name -> Mono.just("Hello, " + name), greeterService).afterPropertiesSet(); @@ -99,9 +90,10 @@ private void testServerClientFactory( PubSubServiceDescriptor.flux("range", Integer.class, Integer.class); serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet(); assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4)); + assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2)); } - private S syncRequestMono( + static S syncRequestMono( PubSubClientFactory clientFactory, PubSubMonoServiceDescriptor descriptor, D demand @@ -111,7 +103,7 @@ private S syncRequestMono( .block(); } - private List syncRequestFlux( + static List syncRequestFlux( PubSubClientFactory clientFactory, PubSubFluxServiceDescriptor descriptor, D demand