Skip to content

Commit

Permalink
[#noissue] Fixed redis pubsub test
Browse files Browse the repository at this point in the history
  • Loading branch information
smilu97 committed May 18, 2023
1 parent f99c438 commit c97943a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,8 @@ public Flux<S> request(D demand) {

static class LongTermSubConsumer<S> implements SubConsumer<SupplyMessage<S>> {

private static final Comparator<SupplyMessage<?>> supplyComparator = new Comparator<>() {
@Override
public int compare(SupplyMessage<?> o1, SupplyMessage<?> o2) {
return o1.getSequence() - o2.getSequence();
}
};
private static final Comparator<SupplyMessage<?>> supplyComparator =
Comparator.comparing(el -> el.getSequence());

final Sinks.Many<S> sink;
final Identifier demandId;
Expand Down Expand Up @@ -94,15 +90,15 @@ public boolean consume(SupplyMessage<S> supply) {
if (supply.getSequence() == nextSequence) {
consume0(supply);
nextSequence += 1;
} else {
supplies.add(supply);
while (supplies.peek() != null && supplies.peek().getSequence() == nextSequence) {
final SupplyMessage<S> pended = supplies.poll();
if (pended != null) {
consume0(pended);
nextSequence += 1;
}
}
} else {
supplies.add(supply);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServerFactory;
import com.navercorp.pinpoint.pubsub.endpoint.PubSubServiceDescriptor;
import com.navercorp.pinpoint.redis.stream.RedisStreamConfig;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.context.ApplicationContext;
Expand All @@ -41,6 +43,28 @@
@DisplayName("req/res based on redis")
public class RedisReqResTest {

private static GenericContainer<?> redisContainer;

@BeforeAll
@SuppressWarnings("resource")
public static void beforeAll() {
redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
.withExposedPorts(6379)
.withReuse(true);
redisContainer.start();

System.setProperty("spring.data.redis.host", redisContainer.getHost());
System.setProperty("spring.redis.host", redisContainer.getHost());
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());
}

@AfterAll
public static void afterAll() {
redisContainer.stop();
redisContainer.close();
}

@DisplayName("req/res based on redis pubsub")
@Test
public void testRedisPubSub() {
Expand All @@ -54,31 +78,11 @@ public void testRedisStreamPubSub() {
}

private void testConfigClass(Class<?> configClass) {
runWithRedisContainer(() -> {
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
testServerClientFactory(
context.getBean(PubSubServerFactory.class),
context.getBean(PubSubClientFactory.class)
);
});
}

@SuppressWarnings("resource")
private void runWithRedisContainer(Runnable r) {
try (final GenericContainer<?> redisContainer = new GenericContainer<>(DockerImageName.parse("redis:7.0"))
.withExposedPorts(6379)
.withReuse(true)
) {
redisContainer.start();
System.setProperty("spring.data.redis.host", redisContainer.getHost());
System.setProperty("spring.redis.host", redisContainer.getHost());
System.setProperty("spring.data.redis.port", redisContainer.getMappedPort(6379).toString());
System.setProperty("spring.redis.port", redisContainer.getMappedPort(6379).toString());

r.run();

redisContainer.stop();
}
final ApplicationContext context = new AnnotationConfigApplicationContext(configClass);
testServerClientFactory(
context.getBean(PubSubServerFactory.class),
context.getBean(PubSubClientFactory.class)
);
}

private void testServerClientFactory(
Expand All @@ -99,6 +103,7 @@ private void testServerClientFactory(
PubSubServiceDescriptor.flux("range", Integer.class, Integer.class);
serverFactory.build(el -> Flux.range(0, el), rangeService).afterPropertiesSet();
assertThat(syncRequestFlux(clientFactory, rangeService, 5)).isEqualTo(List.of(0, 1, 2, 3, 4));
assertThat(syncRequestFlux(clientFactory, rangeService, 3)).isEqualTo(List.of(0, 1, 2));
}

private <D, S> S syncRequestMono(
Expand Down

0 comments on commit c97943a

Please # to comment.