diff --git a/src/test/java/io/netty/incubator/codec/quic/QuicChannelDatagramTest.java b/src/test/java/io/netty/incubator/codec/quic/QuicChannelDatagramTest.java index 83ac0b4cb..28ed8808c 100644 --- a/src/test/java/io/netty/incubator/codec/quic/QuicChannelDatagramTest.java +++ b/src/test/java/io/netty/incubator/codec/quic/QuicChannelDatagramTest.java @@ -32,10 +32,12 @@ import java.net.InetSocketAddress; import java.util.Random; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; public class QuicChannelDatagramTest extends AbstractQuicTest { @@ -183,13 +185,13 @@ private void testDatagramNoAutoRead(int maxMessagesPerRead, boolean readLater) t Promise clientPromise = ImmediateEventExecutor.INSTANCE.newPromise(); int numDatagrams = 5; + AtomicInteger serverReadCount = new AtomicInteger(); CountDownLatch latch = new CountDownLatch(numDatagrams); Channel server = QuicTestUtils.newServer(QuicTestUtils.newQuicServerBuilder() .option(ChannelOption.AUTO_READ, false) .option(ChannelOption.MAX_MESSAGES_PER_READ, maxMessagesPerRead) .datagram(10, 10), InsecureQuicTokenHandler.INSTANCE, new ChannelInboundHandlerAdapter() { - private int readCount; private int readPerLoop; @Override @@ -200,7 +202,6 @@ public void channelActive(ChannelHandlerContext ctx) { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof ByteBuf) { - readCount++; readPerLoop++; ctx.writeAndFlush(msg).addListener(future -> { @@ -208,7 +209,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) { latch.countDown(); } }); - if (readCount == numDatagrams) { + if (serverReadCount.incrementAndGet() == numDatagrams) { serverPromise.trySuccess(null); } } else { @@ -225,7 +226,7 @@ public void channelReadComplete(ChannelHandlerContext ctx) { return; } readPerLoop = 0; - if (readCount < numDatagrams) { + if (serverReadCount.get() < numDatagrams) { if (readLater) { ctx.executor().execute(ctx::read); } else { @@ -238,16 +239,16 @@ public void channelReadComplete(ChannelHandlerContext ctx) { Channel channel = QuicTestUtils.newClient(QuicTestUtils.newQuicClientBuilder() .datagram(10, 10)); + AtomicInteger clientReadCount = new AtomicInteger(); try { QuicChannel quicChannel = QuicChannel.newBootstrap(channel) .handler(new ChannelInboundHandlerAdapter() { - int received; @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { if (msg instanceof ByteBuf) { - received++; - if (received == numDatagrams) { + + if (clientReadCount.incrementAndGet() == numDatagrams) { if (!clientPromise.trySuccess((ByteBuf) msg)) { ReferenceCountUtil.release(msg); } @@ -269,9 +270,15 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { .get(); for (int i = 0; i < numDatagrams; i++) { quicChannel.writeAndFlush(Unpooled.copiedBuffer(data)).sync(); + // Let's add some sleep in between as this is UDP so we may loose some data otherwise. + Thread.sleep(50); } + assertTrue("Server received: " + serverReadCount.get() + + ", Client received: " + clientReadCount.get(), serverPromise.await(3000)); serverPromise.sync(); + assertTrue("Server received: " + serverReadCount.get() + + ", Client received: " + clientReadCount.get(), clientPromise.await(3000)); ByteBuf buffer = clientPromise.get(); ByteBuf expected = Unpooled.wrappedBuffer(data); assertEquals(expected, buffer);