Skip to content

Commit

Permalink
Make the QuicChannelDatagramTest a bit more robust (java-native-acces…
Browse files Browse the repository at this point in the history
…s#109)

Motivation:

We should make the test a bit more robust by trying to minimize the chance of loosing datagrams when sending too fast.

Modifications:

- Add small sleep between writes
- Include some details when timeout

Result:

Fix CI flakiness
  • Loading branch information
normanmaurer authored Dec 18, 2020
1 parent e4f17ba commit 9e89497
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import java.net.InetSocketAddress;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;

public class QuicChannelDatagramTest extends AbstractQuicTest {

Expand Down Expand Up @@ -183,13 +185,13 @@ private void testDatagramNoAutoRead(int maxMessagesPerRead, boolean readLater) t
Promise<ByteBuf> clientPromise = ImmediateEventExecutor.INSTANCE.newPromise();

int numDatagrams = 5;
AtomicInteger serverReadCount = new AtomicInteger();
CountDownLatch latch = new CountDownLatch(numDatagrams);
Channel server = QuicTestUtils.newServer(QuicTestUtils.newQuicServerBuilder()
.option(ChannelOption.AUTO_READ, false)
.option(ChannelOption.MAX_MESSAGES_PER_READ, maxMessagesPerRead)
.datagram(10, 10),
InsecureQuicTokenHandler.INSTANCE, new ChannelInboundHandlerAdapter() {
private int readCount;
private int readPerLoop;

@Override
Expand All @@ -200,15 +202,14 @@ public void channelActive(ChannelHandlerContext ctx) {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
readCount++;
readPerLoop++;

ctx.writeAndFlush(msg).addListener(future -> {
if (future.isSuccess()) {
latch.countDown();
}
});
if (readCount == numDatagrams) {
if (serverReadCount.incrementAndGet() == numDatagrams) {
serverPromise.trySuccess(null);
}
} else {
Expand All @@ -225,7 +226,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) {
return;
}
readPerLoop = 0;
if (readCount < numDatagrams) {
if (serverReadCount.get() < numDatagrams) {
if (readLater) {
ctx.executor().execute(ctx::read);
} else {
Expand All @@ -238,16 +239,16 @@ public void channelReadComplete(ChannelHandlerContext ctx) {

Channel channel = QuicTestUtils.newClient(QuicTestUtils.newQuicClientBuilder()
.datagram(10, 10));
AtomicInteger clientReadCount = new AtomicInteger();
try {
QuicChannel quicChannel = QuicChannel.newBootstrap(channel)
.handler(new ChannelInboundHandlerAdapter() {
int received;

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof ByteBuf) {
received++;
if (received == numDatagrams) {

if (clientReadCount.incrementAndGet() == numDatagrams) {
if (!clientPromise.trySuccess((ByteBuf) msg)) {
ReferenceCountUtil.release(msg);
}
Expand All @@ -269,9 +270,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
.get();
for (int i = 0; i < numDatagrams; i++) {
quicChannel.writeAndFlush(Unpooled.copiedBuffer(data)).sync();
// Let's add some sleep in between as this is UDP so we may loose some data otherwise.
Thread.sleep(50);
}
assertTrue("Server received: " + serverReadCount.get() +
", Client received: " + clientReadCount.get(), serverPromise.await(3000));
serverPromise.sync();

assertTrue("Server received: " + serverReadCount.get() +
", Client received: " + clientReadCount.get(), clientPromise.await(3000));
ByteBuf buffer = clientPromise.get();
ByteBuf expected = Unpooled.wrappedBuffer(data);
assertEquals(expected, buffer);
Expand Down

0 comments on commit 9e89497

Please # to comment.