Skip to content

Commit

Permalink
[#noissue] Fixed redis pubsub test
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 18, 2023
1 parent f99c438 commit a6de716
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 53 deletions.
10 changes: 8 additions & 2 deletions redis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,16 @@
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.17.6</version>
</dependency>

<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ public Flux<S> request(D demand) {

static class LongTermSubConsumer<S> implements SubConsumer<SupplyMessage<S>> {

private static final Comparator<SupplyMessage<?>> supplyComparator = new Comparator<>() {
@Override
public int compare(SupplyMessage<?> o1, SupplyMessage<?> o2) {
return o1.getSequence() - o2.getSequence();
}
};
private static final Comparator<SupplyMessage<?>> supplyComparator =
Comparator.comparing(el -> el.getSequence());

final Sinks.Many<S> sink;
final Identifier demandId;
Expand Down Expand Up @@ -94,15 +90,15 @@ public boolean consume(SupplyMessage<S> supply) {
if (supply.getSequence() == nextSequence) {
consume0(supply);
nextSequence += 1;
} else {
supplies.add(supply);
while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) {
final SupplyMessage<S> pended = supplies.poll();
if (pended != null) {
consume0(pended);
nextSequence += 1;
}
}
} else {
supplies.add(supply);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2023 NAVER Corp.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.navercorp.pinpoint.redis.pubsub;

import com.navercorp.pinpoint.pubsub.endpoint.PubSubClientFactory;
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

import static com.navercorp.pinpoint.redis.pubsub.RedisStreamReqResTest.testPubSubServerClient;

/**
* @author youngjin.kim2
*/
@DisplayName("req/res based on redis pubsub")
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {RedisPubSubConfig.class})
@Testcontainers
public class RedisPubSubReqResTest {

@Container
@SuppressWarnings("resource")
private static final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
.withExposedPorts(6379)
.withReuse(true);

@Autowired
private PubSubServerFactory serverFactory;

@Autowired
private PubSubClientFactory clientFactory;

@BeforeAll
public static void beforeAll() {
System.setProperty("spring.data.redis.host", redisContainer.getHost());
System.setProperty("spring.redis.host", redisContainer.getHost());
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
}

@DisplayName("req/res based on redis pubsub")
@Test
public void testRedisPubSub() {
testPubSubServerClient(this.serverFactory, this.clientFactory);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,16 @@
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor;
import com.navercorp.pinpoint.redis.stream.RedisStreamConfig;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -38,53 +43,39 @@
/**
* @author youngjin.kim2
*/
@DisplayName("req/res based on redis")
public class RedisReqResTest {
@DisplayName("req/res based on redis stream")
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {RedisStreamConfig.class})
@Testcontainers
public class RedisStreamReqResTest {

@DisplayName("req/res based on redis pubsub")
@Test
public void testRedisPubSub() {
testConfigClass(RedisPubSubConfig.class);
@Container
@SuppressWarnings("resource")
private static final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
.withExposedPorts(6379)
.withReuse(true);

@Autowired
private PubSubServerFactory serverFactory;

@Autowired
private PubSubClientFactory clientFactory;

@BeforeAll
public static void beforeAll() {
System.setProperty("spring.data.redis.host", redisContainer.getHost());
System.setProperty("spring.redis.host", redisContainer.getHost());
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
}

@DisplayName("req/res based on redis stream")
@Test
public void testRedisStreamPubSub() {
testConfigClass(RedisStreamConfig.class);
}

private void testConfigClass(Class<?> configClass) {
runWithRedisContainer(() -> {
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
testServerClientFactory(
context.getBean(PubSubServerFactory.class),
context.getBean(PubSubClientFactory.class)
);
});
}

@SuppressWarnings("resource")
private void runWithRedisContainer(Runnable r) {
try (final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
.withExposedPorts(6379)
.withReuse(true)
) {
redisContainer.start();
System.setProperty("spring.data.redis.host", redisContainer.getHost());
System.setProperty("spring.redis.host", redisContainer.getHost());
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());

r.run();

redisContainer.stop();
}
testPubSubServerClient(this.serverFactory, this.clientFactory);
}

private void testServerClientFactory(
PubSubServerFactory serverFactory,
PubSubClientFactory clientFactory
) {
static void testPubSubServerClient(PubSubServerFactory serverFactory, PubSubClientFactory clientFactory) {
final PubSubMonoServiceDescriptor<String, String> greeterService =
PubSubServiceDescriptor.mono("greeter", String.class, String.class);
serverFactory.build(name -> Mono.just("Hello, " + name), greeterService).afterPropertiesSet();
Expand All @@ -99,9 +90,10 @@ private void testServerClientFactory(
PubSubServiceDescriptor.flux("range", Integer.class, Integer.class);
serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet();
assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4));
assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2));
}

private <D, S> S syncRequestMono(
static <D, S> S syncRequestMono(
PubSubClientFactory clientFactory,
PubSubMonoServiceDescriptor<D, S> descriptor,
D demand
Expand All @@ -111,7 +103,7 @@ private <D, S> S syncRequestMono(
.block();
}

private <D, S> List<S> syncRequestFlux(
static <D, S> List<S> syncRequestFlux(
PubSubClientFactory clientFactory,
PubSubFluxServiceDescriptor<D, S> descriptor,
D demand
Expand Down

0 comments on commit a6de716

Please # to comment.