Skip to content

Commit

Permalink
Merge pull request #3299 from lhns/feature/file-handle-seekable-byte-…
Browse files Browse the repository at this point in the history
…channel

FileHandle from SeekableByteChannel
  • Loading branch information
mpilquist authored Oct 17, 2023
2 parents fbd0f25 + 5c23dab commit 1906b33
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 3 deletions.
3 changes: 3 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ ThisBuild / mimaBinaryIssueFilters ++= Seq(
),
ProblemFilters.exclude[ReversedMissingMethodProblem](
"fs2.concurrent.Channel.closeWithElement"
),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem](
"fs2.io.file.Files.openSeekableByteChannel"
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io
package file

import java.nio.ByteBuffer
import java.nio.channels.{FileChannel, FileLock}
import java.nio.channels.{FileChannel, FileLock, SeekableByteChannel}
import java.nio.file.{OpenOption, Path => JPath}

import cats.effect.kernel.{Async, Resource, Sync}
Expand Down Expand Up @@ -119,4 +119,59 @@ private[file] trait FileHandleCompanionPlatform {
override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
F.blocking(chan.write(bytes.toByteBuffer, offset))
}

/** Creates a `FileHandle[F]` from a `java.nio.channels.SeekableByteChannel`. Because a `SeekableByteChannel` doesn't provide all the functionalities required by `FileHandle` some features like locking will be unavailable. */
private[file] def makeFromSeekableByteChannel[F[_]](
chan: SeekableByteChannel,
unsupportedOperationException: => Throwable
)(implicit F: Sync[F]): FileHandle[F] =
new FileHandle[F] {
type Lock = Unit

override def force(metaData: Boolean): F[Unit] =
F.raiseError(unsupportedOperationException)

override def lock: F[Lock] =
F.raiseError(unsupportedOperationException)

override def lock(position: Long, size: Long, shared: Boolean): F[Lock] =
F.raiseError(unsupportedOperationException)

override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] =
F.blocking {
val buf = ByteBuffer.allocate(numBytes)
val len = chan.synchronized {
// don't set position on sequential operations because not all file systems support setting the position
if (chan.position() != offset) { chan.position(offset); () }
chan.read(buf)
}
if (len < 0) None
else if (len == 0) Some(Chunk.empty)
else Some(Chunk.array(buf.array, 0, len))
}

override def size: F[Long] =
F.blocking(chan.size)

override def truncate(size: Long): F[Unit] =
F.blocking { chan.truncate(size); () }

override def tryLock: F[Option[Lock]] =
F.raiseError(unsupportedOperationException)

override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] =
F.raiseError(unsupportedOperationException)

override def unlock(f: Lock): F[Unit] =
F.unit

override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
F.blocking {
chan.synchronized {
// don't set position on sequential operations because not all file systems support setting the position
if (chan.position() != offset) { chan.position(offset); () }
chan.write(bytes.toByteBuffer)
}
}
}
}
24 changes: 22 additions & 2 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package file
import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.all._

import java.nio.channels.FileChannel
import java.nio.channels.{FileChannel, SeekableByteChannel}
import java.nio.file.{Files => JFiles, Path => JPath, _}
import java.nio.file.attribute.{
BasicFileAttributeView,
Expand All @@ -47,6 +47,12 @@ private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Fi
/** Creates a `FileHandle` for the supplied NIO `FileChannel`. JVM only. */
def openFileChannel(channel: F[FileChannel]): Resource[F, FileHandle[F]]

/** Creates a `FileHandle` for the supplied NIO `SeekableByteChannel`. Because a `SeekableByteChannel` doesn't provide all the functionalities required by `FileHandle` some features like locking will be unavailable. JVM only. */
def openSeekableByteChannel(
channel: F[SeekableByteChannel],
unsupportedOperationException: => Throwable
): Resource[F, FileHandle[F]]

/** Gets the contents of the specified directory whose paths match the supplied glob pattern.
*
* Example glob patterns: `*.scala`, `*.{scala,java}`
Expand Down Expand Up @@ -314,11 +320,25 @@ private[file] trait FilesCompanionPlatform {
def open(path: Path, flags: Flags): Resource[F, FileHandle[F]] =
openFileChannel(
Sync[F].blocking(FileChannel.open(path.toNioPath, flags.value.map(_.option): _*))
)
).recoverWith { case unsupportedOperationException: UnsupportedOperationException =>
// not all file systems support file channels
openSeekableByteChannel(
Sync[F].blocking(JFiles.newByteChannel(path.toNioPath, flags.value.map(_.option): _*)),
unsupportedOperationException
)
}

def openFileChannel(channel: F[FileChannel]): Resource[F, FileHandle[F]] =
Resource.make(channel)(ch => Sync[F].blocking(ch.close())).map(ch => FileHandle.make(ch))

def openSeekableByteChannel(
channel: F[SeekableByteChannel],
unsupportedOperationException: => Throwable
): Resource[F, FileHandle[F]] =
Resource
.make(channel)(ch => Sync[F].blocking(ch.close()))
.map(ch => FileHandle.makeFromSeekableByteChannel(ch, unsupportedOperationException))

def realPath(path: Path): F[Path] =
Sync[F].blocking(Path.fromNioPath(path.toNioPath.toRealPath()))

Expand Down
19 changes: 19 additions & 0 deletions io/jvm/src/test/scala/fs2/io/file/JvmFilesSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,23 @@ class JvmFilesSuite extends Fs2Suite with BaseFileSuite {
}
}

test("read from SeekableByteChannel") {
Stream
.resource(
tempFile
.evalMap(modify)
.flatMap(path =>
Files[IO].openSeekableByteChannel(
IO.blocking(JFiles.newByteChannel(path.toNioPath, Flag.Read.option)),
new UnsupportedOperationException()
)
)
.map(new ReadCursor[IO](_, 0))
)
.flatMap(_.readAll(4096).void.stream)
.compile
.toList
.assertEquals(List[Byte](0, 1, 2, 3))
}

}

0 comments on commit 1906b33

Please # to comment.