From 395039d40a4332c69d946922d374cc5df007c7dd Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Thu, 12 Aug 2021 00:13:28 +0000 Subject: [PATCH 1/2] Tweak Node.js server to be more idiomatic --- .../fs2/io/net/SocketGroupPlatform.scala | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index c2d1c0593f..c9257a71b2 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -33,6 +33,7 @@ import com.comcast.ip4s.Host import com.comcast.ip4s.IpAddress import com.comcast.ip4s.Port import com.comcast.ip4s.SocketAddress +import fs2.internal.jsdeps.node.eventsMod import fs2.internal.jsdeps.node.netMod import fs2.internal.jsdeps.node.nodeStrings import fs2.io.internal.EventEmitterOps._ @@ -80,14 +81,13 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = (for { dispatcher <- Dispatcher[F] - queue <- Queue.unbounded[F, netMod.Socket].toResource - error <- F.deferred[Throwable].toResource + queue <- Queue.unbounded[F, Option[netMod.Socket]].toResource server <- Resource.make( F .delay( netMod.createServer( netMod.ServerOpts().setPauseOnConnect(true).setAllowHalfOpen(true), - sock => dispatcher.unsafeRunAndForget(queue.offer(sock)) + sock => dispatcher.unsafeRunAndForget(queue.offer(Some(sock))) ) ) )(server => @@ -96,24 +96,21 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) else cb(Right(())) - } + } >> queue.offer(None) ) - _ <- registerListener[js.Error](server, nodeStrings.error)(_.once_error(_, _)) { e => - dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e))) - } - _ <- error.get - .race( - F - .async_[Unit] { cb => - server.listen( - address.foldLeft( - netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble)) - )((opts, host) => opts.setHost(host.toString)), - () => cb(Right(())) - ) + _ <- F + .async_[Unit] { cb => + server.once_error(nodeStrings.error, e => cb(Left(js.JavaScriptException(e)))) + server.listen( + address.foldLeft( + netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble)) + )((opts, host) => opts.setHost(host.toString)), + () => { + server.asInstanceOf[eventsMod.EventEmitter].removeAllListeners("error") + cb(Right(())) } - ) - .rethrow + ) + } .toResource ipAddress <- F .delay(server.address()) @@ -123,10 +120,9 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => } .toResource sockets = Stream - .fromQueueUnterminated(queue) + .fromQueueNoneTerminated(queue) .evalTap(setSocketOptions(options)) .flatMap(sock => Stream.resource(Socket.forAsync(sock))) - .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) } yield (ipAddress, sockets)).adaptError { case IOException(ex) => ex } } From ea9f6cd77c5f355a5927b340924c1168a41943b2 Mon Sep 17 00:00:00 2001 From: Arman Bilge Date: Fri, 10 Sep 2021 17:10:42 +0000 Subject: [PATCH 2/2] Close server, terminate stream, then wait for callback --- .../main/scala/fs2/io/net/SocketGroupPlatform.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index c9257a71b2..140b9ce983 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -91,12 +91,14 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => ) ) )(server => - F.async_[Unit] { cb => + F.async[Unit] { cb => if (server.listening) - server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) + F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) >> queue + .offer(None) + .as(None) else - cb(Right(())) - } >> queue.offer(None) + F.delay(cb(Right(()))).as(None) + } ) _ <- F .async_[Unit] { cb =>