diff --git a/build.sbt b/build.sbt index ee433263bb..8a265ac0eb 100644 --- a/build.sbt +++ b/build.sbt @@ -2,7 +2,7 @@ import com.typesafe.tools.mima.core._ Global / onChangedBuildSource := ReloadOnSourceChanges -ThisBuild / tlBaseVersion := "3.4" +ThisBuild / tlBaseVersion := "3.5" ThisBuild / organization := "co.fs2" ThisBuild / organizationName := "Functional Streams for Scala" @@ -208,9 +208,9 @@ lazy val core = crossProject(JVMPlatform, JSPlatform, NativePlatform) libraryDependencies ++= Seq( "org.typelevel" %%% "cats-core" % "2.9.0", "org.typelevel" %%% "cats-laws" % "2.9.0" % Test, - "org.typelevel" %%% "cats-effect" % "3.4.1", - "org.typelevel" %%% "cats-effect-laws" % "3.4.1" % Test, - "org.typelevel" %%% "cats-effect-testkit" % "3.4.1" % Test, + "org.typelevel" %%% "cats-effect" % "3.5-4b87497", + "org.typelevel" %%% "cats-effect-laws" % "3.5-4b87497" % Test, + "org.typelevel" %%% "cats-effect-testkit" % "3.5-4b87497" % Test, "org.scodec" %%% "scodec-bits" % "1.1.34", "org.typelevel" %%% "scalacheck-effect-munit" % "2.0.0-M2" % Test, "org.typelevel" %%% "munit-cats-effect" % "2.0.0-M3" % Test, diff --git a/io/js/src/main/scala/fs2/io/ioplatform.scala b/io/js/src/main/scala/fs2/io/ioplatform.scala index b2eb620aef..68e92f00fd 100644 --- a/io/js/src/main/scala/fs2/io/ioplatform.scala +++ b/io/js/src/main/scala/fs2/io/ioplatform.scala @@ -161,8 +161,9 @@ private[fs2] trait ioplatform { val end = if (endAfterUse) Stream.exec { - F.async_[Unit] { cb => - writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException))) + F.async[Unit] { cb => + F.delay(writable.end(e => cb(e.toLeft(()).leftMap(js.JavaScriptException)))) + .as(Some(F.unit)) } } else Stream.empty diff --git a/io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala b/io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala index c05ddb8017..d4e2b05ce8 100644 --- a/io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala +++ b/io/jvm-native/src/main/scala/fs2/io/net/SocketGroupPlatform.scala @@ -32,8 +32,10 @@ import java.nio.channels.{ } import java.nio.channels.AsynchronousChannelGroup import cats.syntax.all._ +import cats.effect.syntax.all._ import cats.effect.kernel.{Async, Resource} import com.comcast.ip4s.{Host, IpAddress, Port, SocketAddress} +import fs2.concurrent.Channel private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => private[fs2] def unsafe[F[_]: Async](channelGroup: AsynchronousChannelGroup): SocketGroup[F] = @@ -57,17 +59,21 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => def connect(ch: AsynchronousSocketChannel): F[AsynchronousSocketChannel] = to.resolve[F].flatMap { ip => - Async[F].async_[AsynchronousSocketChannel] { cb => - ch.connect( - ip.toInetSocketAddress, - null, - new CompletionHandler[Void, Void] { - def completed(result: Void, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) + Async[F].async[AsynchronousSocketChannel] { cb => + Async[F] + .delay { + ch.connect( + ip.toInetSocketAddress, + null, + new CompletionHandler[Void, Void] { + def completed(result: Void, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) } - ) + .as(Some(Async[F].delay(ch.close()))) } } @@ -80,7 +86,10 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => options: List[SocketOption] ): Resource[F, (SocketAddress[IpAddress], Stream[F, Socket[F]])] = { - val setup: Resource[F, AsynchronousServerSocketChannel] = + val setup: Resource[ + F, + (AsynchronousServerSocketChannel, Channel[F, Either[Throwable, AsynchronousSocketChannel]]) + ] = Resource.eval(address.traverse(_.resolve[F])).flatMap { addr => Resource .make( @@ -88,61 +97,83 @@ private[net] trait SocketGroupCompanionPlatform { self: SocketGroup.type => AsynchronousServerSocketChannel.open(channelGroup) ) )(sch => Async[F].delay(if (sch.isOpen) sch.close())) - .evalTap(ch => + .evalTap { sch => Async[F].delay( - ch.bind( + sch.bind( new InetSocketAddress( addr.map(_.toInetAddress).orNull, port.map(_.value).getOrElse(0) ) ) ) - ) - } + } + .mproduct { sch => + def acceptChannel: F[AsynchronousSocketChannel] = + Async[F].async[AsynchronousSocketChannel] { cb => + Async[F] + .delay { + sch.accept( + null, + new CompletionHandler[AsynchronousSocketChannel, Void] { + def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = + cb(Right(ch)) + def failed(rsn: Throwable, attachment: Void): Unit = + cb(Left(rsn)) + } + ) + } + .as(Some(Async[F].delay(sch.close()))) + } - def acceptIncoming( - sch: AsynchronousServerSocketChannel - ): Stream[F, Socket[F]] = { - def go: Stream[F, Socket[F]] = { - def acceptChannel: F[AsynchronousSocketChannel] = - Async[F].async_[AsynchronousSocketChannel] { cb => - sch.accept( - null, - new CompletionHandler[AsynchronousSocketChannel, Void] { - def completed(ch: AsynchronousSocketChannel, attachment: Void): Unit = - cb(Right(ch)) - def failed(rsn: Throwable, attachment: Void): Unit = - cb(Left(rsn)) + Resource + .make(Channel.synchronous[F, Either[Throwable, AsynchronousSocketChannel]]) { + accepted => + accepted.close *> + accepted.stream + .foreach(_.traverse_(ch => Async[F].delay(ch.close()))) + .compile + .drain + } + .flatTap { accepted => + Stream + .repeatEval(acceptChannel.attempt) + .through(accepted.sendAll) + .compile + .drain + .background } - ) } - def setOpts(ch: AsynchronousSocketChannel) = - Async[F].delay { - options.foreach(o => ch.setOption(o.key, o.value)) - } + } - Stream.eval(acceptChannel.attempt).flatMap { + def acceptIncoming(sch: AsynchronousServerSocketChannel)( + incoming: Stream[F, Either[Throwable, AsynchronousSocketChannel]] + ): Stream[F, Socket[F]] = { + def setOpts(ch: AsynchronousSocketChannel) = + Async[F].delay { + options.foreach(o => ch.setOption(o.key, o.value)) + } + + incoming + .flatMap { case Left(_) => Stream.empty[F] case Right(accepted) => Stream.resource(Socket.forAsync(accepted).evalTap(_ => setOpts(accepted))) - } ++ go - } - - go.handleErrorWith { - case err: AsynchronousCloseException => - Stream.eval(Async[F].delay(sch.isOpen)).flatMap { isOpen => - if (isOpen) Stream.raiseError[F](err) - else Stream.empty - } - case err => Stream.raiseError[F](err) - } + } + .handleErrorWith { + case err: AsynchronousCloseException => + Stream.eval(Async[F].delay(sch.isOpen)).flatMap { isOpen => + if (isOpen) Stream.raiseError[F](err) + else Stream.empty + } + case err => Stream.raiseError[F](err) + } } - setup.map { sch => + setup.map { case (sch, incoming) => val jLocalAddress = sch.getLocalAddress.asInstanceOf[java.net.InetSocketAddress] val localAddress = SocketAddress.fromInetSocketAddress(jLocalAddress) - (localAddress, acceptIncoming(sch)) + (localAddress, incoming.stream.through(acceptIncoming(sch)(_))) } } }