From 43fbaabd4ae571f50af102be652b563cc09a56a7 Mon Sep 17 00:00:00 2001 From: fanliang11 <137629448@qq.com> Date: Sun, 23 Jan 2022 12:48:52 +0800 Subject: [PATCH] ChannelHandler adds EventLoop I/O thread for asynchronous processing --- .../Server/Implementation/DefaultServiceHost.cs | 14 ++++---------- .../Surging.Core.DNS/DnsServiceHost.cs | 12 +++--------- .../DotNettyServerMessageListener.cs | 13 +++++-------- .../DefaultHttpServiceHost.cs | 7 ++----- .../HttpServiceHost.cs | 7 ++----- .../Surging.Core.Protocol.Http/HttpServiceHost.cs | 10 ++-------- .../Surging.Core.Protocol.Udp/UdpServiceHost.cs | 12 +++--------- 7 files changed, 21 insertions(+), 54 deletions(-) diff --git a/src/Surging.Core/Surging.Core.CPlatform/Runtime/Server/Implementation/DefaultServiceHost.cs b/src/Surging.Core/Surging.Core.CPlatform/Runtime/Server/Implementation/DefaultServiceHost.cs index 5f0954003..2b723deeb 100644 --- a/src/Surging.Core/Surging.Core.CPlatform/Runtime/Server/Implementation/DefaultServiceHost.cs +++ b/src/Surging.Core/Surging.Core.CPlatform/Runtime/Server/Implementation/DefaultServiceHost.cs @@ -41,25 +41,19 @@ public override async Task StartAsync(EndPoint endPoint) return; _serverMessageListener = await _messageListenerFactory(endPoint); _serverMessageListener.Received += async (sender, message) => - { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + { + await MessageListener.OnReceived(sender, message); }; } - public override async Task StartAsync(string ip,int port) + public override async Task StartAsync(string ip, int port) { if (_serverMessageListener != null) return; _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), port)); _serverMessageListener.Received += async (sender, message) => { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + await MessageListener.OnReceived(sender, message); }; } diff --git a/src/Surging.Core/Surging.Core.DNS/DnsServiceHost.cs b/src/Surging.Core/Surging.Core.DNS/DnsServiceHost.cs index e93753a57..3d81547af 100644 --- a/src/Surging.Core/Surging.Core.DNS/DnsServiceHost.cs +++ b/src/Surging.Core/Surging.Core.DNS/DnsServiceHost.cs @@ -43,11 +43,8 @@ public override async Task StartAsync(EndPoint endPoint) return; _serverMessageListener = await _messageListenerFactory(endPoint); _serverMessageListener.Received += async (sender, message) => - { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + { + await MessageListener.OnReceived(sender, message); }; } @@ -58,10 +55,7 @@ public override async Task StartAsync(string ip, int port) _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip),53)); _serverMessageListener.Received += async (sender, message) => { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + await MessageListener.OnReceived(sender, message); }; } diff --git a/src/Surging.Core/Surging.Core.DotNetty/DotNettyServerMessageListener.cs b/src/Surging.Core/Surging.Core.DotNetty/DotNettyServerMessageListener.cs index 6e3d94ef4..11c208211 100644 --- a/src/Surging.Core/Surging.Core.DotNetty/DotNettyServerMessageListener.cs +++ b/src/Surging.Core/Surging.Core.DotNetty/DotNettyServerMessageListener.cs @@ -88,8 +88,8 @@ public async Task StartAsync(EndPoint endPoint) var pipeline = channel.Pipeline; pipeline.AddLast(new LengthFieldPrepender(4)); pipeline.AddLast(new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4)); - pipeline.AddLast(new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); - pipeline.AddLast(new ServerHandler(async (contenxt, message) => + pipeline.AddLast(workerGroup, "HandlerAdapter", new TransportMessageChannelHandlerAdapter(_transportMessageDecoder)); + pipeline.AddLast(workerGroup, "ServerHandler", new ServerHandler(async (contenxt, message) => { var sender = new DotNettyServerMessageSender(_transportMessageEncoder, contenxt); await OnReceived(sender, message); @@ -145,12 +145,9 @@ public ServerHandler(Action readAction #region Overrides of ChannelHandlerAdapter public override void ChannelRead(IChannelHandlerContext context, object message) - { - Task.Run(() => - { - var transportMessage = (TransportMessage)message; - _readAction(context, transportMessage); - }); + { + var transportMessage = (TransportMessage)message; + _readAction(context, transportMessage); } public override void ChannelReadComplete(IChannelHandlerContext context) diff --git a/src/Surging.Core/Surging.Core.KestrelHttpServer/DefaultHttpServiceHost.cs b/src/Surging.Core/Surging.Core.KestrelHttpServer/DefaultHttpServiceHost.cs index a991e086f..d217453fc 100644 --- a/src/Surging.Core/Surging.Core.KestrelHttpServer/DefaultHttpServiceHost.cs +++ b/src/Surging.Core/Surging.Core.KestrelHttpServer/DefaultHttpServiceHost.cs @@ -23,11 +23,8 @@ public DefaultHttpServiceHost(Func> messageList _messageListenerFactory = messageListenerFactory; _serverMessageListener = httpMessageListener; _serverMessageListener.Received += async (sender, message) => - { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + { + await MessageListener.OnReceived(sender, message); }; } diff --git a/src/Surging.Core/Surging.Core.KestrelHttpServer/HttpServiceHost.cs b/src/Surging.Core/Surging.Core.KestrelHttpServer/HttpServiceHost.cs index 6836bd233..570833f75 100644 --- a/src/Surging.Core/Surging.Core.KestrelHttpServer/HttpServiceHost.cs +++ b/src/Surging.Core/Surging.Core.KestrelHttpServer/HttpServiceHost.cs @@ -25,11 +25,8 @@ public HttpServiceHost(Func> messageListenerFac _messageListenerFactory = messageListenerFactory; _serverMessageListener = httpMessageListener; _serverMessageListener.Received += async (sender, message) => - { - await Task.Run(async () => - { - await MessageListener.OnReceived(sender, message); - }); + { + await MessageListener.OnReceived(sender, message); }; } diff --git a/src/Surging.Core/Surging.Core.Protocol.Http/HttpServiceHost.cs b/src/Surging.Core/Surging.Core.Protocol.Http/HttpServiceHost.cs index 174e10278..7bf7906f1 100644 --- a/src/Surging.Core/Surging.Core.Protocol.Http/HttpServiceHost.cs +++ b/src/Surging.Core/Surging.Core.Protocol.Http/HttpServiceHost.cs @@ -47,10 +47,7 @@ public override async Task StartAsync(EndPoint endPoint) _serverMessageListener = await _messageListenerFactory(endPoint); _serverMessageListener.Received += async (sender, message) => { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + await MessageListener.OnReceived(sender, message); }; } @@ -61,10 +58,7 @@ public override async Task StartAsync(string ip,int port) _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.HttpPort??0)); _serverMessageListener.Received += async (sender, message) => { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + await MessageListener.OnReceived(sender, message); }; } diff --git a/src/Surging.Core/Surging.Core.Protocol.Udp/UdpServiceHost.cs b/src/Surging.Core/Surging.Core.Protocol.Udp/UdpServiceHost.cs index 91f80144b..d8c4c1a35 100644 --- a/src/Surging.Core/Surging.Core.Protocol.Udp/UdpServiceHost.cs +++ b/src/Surging.Core/Surging.Core.Protocol.Udp/UdpServiceHost.cs @@ -44,10 +44,7 @@ public override async Task StartAsync(EndPoint endPoint) _serverMessageListener = await _messageListenerFactory(endPoint); _serverMessageListener.Received += async (sender, message) => { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + await MessageListener.OnReceived(sender, message); }; } @@ -57,11 +54,8 @@ public override async Task StartAsync(string ip, int port) return; _serverMessageListener = await _messageListenerFactory(new IPEndPoint(IPAddress.Parse(ip), AppConfig.ServerOptions.Ports.UdpPort)); _serverMessageListener.Received += async (sender, message) => - { - await Task.Run(() => - { - MessageListener.OnReceived(sender, message); - }); + { + await MessageListener.OnReceived(sender, message); }; }