diff --git a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index 6f0c0a1656..e3330e5ebc 100644 --- a/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/js/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -33,6 +33,7 @@ import com.comcast.ip4s.Host import com.comcast.ip4s.IpAddress import com.comcast.ip4s.Port import com.comcast.ip4s.SocketAddress +import fs2.internal.jsdeps.node.eventsMod import fs2.internal.jsdeps.node.netMod import fs2.internal.jsdeps.node.nodeStrings import fs2.internal.jsdeps.std @@ -81,40 +82,38 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = (for { dispatcher <- Dispatcher[F] - queue <- Queue.unbounded[F, netMod.Socket].toResource - error <- F.deferred[Throwable].toResource + queue <- Queue.unbounded[F, Option[netMod.Socket]].toResource server <- Resource.make( F .delay( netMod.createServer( netMod.ServerOpts().setPauseOnConnect(true).setAllowHalfOpen(true), - sock => dispatcher.unsafeRunAndForget(queue.offer(sock)) + sock => dispatcher.unsafeRunAndForget(queue.offer(Some(sock))) ) ) )(server => - F.async_[Unit] { cb => + F.async[Unit] { cb => if (server.listening) - server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) + F.delay(server.close(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) >> queue + .offer(None) + .as(None) else - cb(Right(())) + F.delay(cb(Right(()))).as(None) } ) - _ <- registerListener[std.Error](server, nodeStrings.error)(_.once_error(_, _)) { e => - dispatcher.unsafeRunAndForget(error.complete(js.JavaScriptException(e))) - } - _ <- error.get - .race( - F - .async_[Unit] { cb => - server.listen( - address.foldLeft( - netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble)) - )((opts, host) => opts.setHost(host.toString)), - () => cb(Right(())) - ) + _ <- F + .async_[Unit] { cb => + server.once_error(nodeStrings.error, e => cb(Left(js.JavaScriptException(e)))) + server.listen( + address.foldLeft( + netMod.ListenOptions().setPort(port.fold(0.0)(_.value.toDouble)) + )((opts, host) => opts.setHost(host.toString)), + () => { + server.asInstanceOf[eventsMod.EventEmitter].removeAllListeners("error") + cb(Right(())) } - ) - .rethrow + ) + } .toResource ipAddress <- F .delay(server.address()) @@ -124,10 +123,9 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => } .toResource sockets = Stream - .fromQueueUnterminated(queue) + .fromQueueNoneTerminated(queue) .evalTap(setSocketOptions(options)) .flatMap(sock => Stream.resource(Socket.forAsync(sock))) - .concurrently(Stream.eval(error.get.flatMap(F.raiseError[Unit]))) } yield (ipAddress, sockets)).adaptError { case IOException(ex) => ex } }