From 1d2cdb39d7d1a0b6108170bb270aae1019f2214e Mon Sep 17 00:00:00 2001 From: Aroooba Date: Sat, 15 Jul 2023 23:58:12 +0900 Subject: [PATCH] Move blocking handleMessageEvent to executer thread to keep pipeline reactive --- .../atmosphere/nettosphere/BridgeRuntime.java | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/atmosphere/nettosphere/BridgeRuntime.java b/server/src/main/java/org/atmosphere/nettosphere/BridgeRuntime.java index 1bf8b6b..a4618df 100644 --- a/server/src/main/java/org/atmosphere/nettosphere/BridgeRuntime.java +++ b/server/src/main/java/org/atmosphere/nettosphere/BridgeRuntime.java @@ -95,12 +95,7 @@ import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -123,6 +118,7 @@ @Sharable public class BridgeRuntime extends HttpStaticFileServerHandler { + private final ExecutorService executor = Executors.newSingleThreadExecutor(); public static boolean NETTY_41_PLUS; static { @@ -294,15 +290,21 @@ public AtmosphereFramework framework() { @Override public void channelRead(final ChannelHandlerContext ctx, final Object messageEvent) throws URISyntaxException, IOException { - try { - handleMessageEvent(ctx, messageEvent); - } finally { - if (messageEvent instanceof ReferenceCounted) { - ReferenceCounted refMsg = (ReferenceCounted) messageEvent; - if (refMsg.refCnt() > 0) - refMsg.release(); + executor.execute(() -> { + try { + handleMessageEvent(ctx, messageEvent); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + if (messageEvent instanceof ReferenceCounted) { + ReferenceCounted refMsg = (ReferenceCounted) messageEvent; + if (refMsg.refCnt() > 0) + refMsg.release(); + } } - } + }); } private void handleMessageEvent(final ChannelHandlerContext ctx, final Object messageEvent) throws URISyntaxException, IOException {