Skip to content

Commit

Permalink
FileHandle from SeekableByteChannel
Browse files Browse the repository at this point in the history
  • Loading branch information
lhns committed Oct 4, 2023
1 parent 4152142 commit a727bf4
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package io
package file

import java.nio.ByteBuffer
import java.nio.channels.{FileChannel, FileLock}
import java.nio.channels.{FileChannel, FileLock, SeekableByteChannel}
import java.nio.file.{OpenOption, Path => JPath}

import cats.effect.kernel.{Async, Resource, Sync}
Expand Down Expand Up @@ -119,4 +119,59 @@ private[file] trait FileHandleCompanionPlatform {
override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
F.blocking(chan.write(bytes.toByteBuffer, offset))
}

/** Creates a `FileHandle[F]` from a `java.nio.channels.SeekableByteChannel`. Because a `SeekableByteChannel` doesn't provide all the functionalities required by `FileHandle` some features like locking will be unavailable. */
private[file] def makeFromSeekableByteChannel[F[_]](
chan: SeekableByteChannel,
unsupportedOperationException: => Throwable
)(implicit F: Sync[F]): FileHandle[F] =
new FileHandle[F] {
type Lock = Unit

override def force(metaData: Boolean): F[Unit] =
F.raiseError(unsupportedOperationException)

override def lock: F[Lock] =
F.raiseError(unsupportedOperationException)

override def lock(position: Long, size: Long, shared: Boolean): F[Lock] =
F.raiseError(unsupportedOperationException)

override def read(numBytes: Int, offset: Long): F[Option[Chunk[Byte]]] =
F.blocking {
val buf = ByteBuffer.allocate(numBytes)
val len = chan.synchronized {
// don't set position on sequential operations because not all file systems support setting the position
if (chan.position() != offset) { chan.position(offset); () }
chan.read(buf)
}
if (len < 0) None
else if (len == 0) Some(Chunk.empty)
else Some(Chunk.array(buf.array, 0, len))
}

override def size: F[Long] =
F.blocking(chan.size)

override def truncate(size: Long): F[Unit] =
F.blocking { chan.truncate(size); () }

override def tryLock: F[Option[Lock]] =
F.raiseError(unsupportedOperationException)

override def tryLock(position: Long, size: Long, shared: Boolean): F[Option[Lock]] =
F.raiseError(unsupportedOperationException)

override def unlock(f: Lock): F[Unit] =
F.unit

override def write(bytes: Chunk[Byte], offset: Long): F[Int] =
F.blocking {
chan.synchronized {
// don't set position on sequential operations because not all file systems support setting the position
if (chan.position() != offset) { chan.position(offset); () }
chan.write(bytes.toByteBuffer)
}
}
}
}
24 changes: 22 additions & 2 deletions io/jvm-native/src/main/scala/fs2/io/file/FilesPlatform.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ package file
import cats.effect.kernel.{Async, Resource, Sync}
import cats.syntax.all._

import java.nio.channels.FileChannel
import java.nio.channels.{FileChannel, SeekableByteChannel}
import java.nio.file.{Files => JFiles, Path => JPath, _}
import java.nio.file.attribute.{
BasicFileAttributeView,
Expand All @@ -47,6 +47,12 @@ private[file] trait FilesPlatform[F[_]] extends DeprecatedFilesApi[F] { self: Fi
/** Creates a `FileHandle` for the supplied NIO `FileChannel`. JVM only. */
def openFileChannel(channel: F[FileChannel]): Resource[F, FileHandle[F]]

/** Creates a `FileHandle` for the supplied NIO `SeekableByteChannel`. Because a `SeekableByteChannel` doesn't provide all the functionalities required by `FileHandle` some features like locking will be unavailable. JVM only. */
def openSeekableByteChannel(
channel: F[SeekableByteChannel],
unsupportedOperationException: => Throwable
): Resource[F, FileHandle[F]]

/** Gets the contents of the specified directory whose paths match the supplied glob pattern.
*
* Example glob patterns: `*.scala`, `*.{scala,java}`
Expand Down Expand Up @@ -314,11 +320,25 @@ private[file] trait FilesCompanionPlatform {
def open(path: Path, flags: Flags): Resource[F, FileHandle[F]] =
openFileChannel(
Sync[F].blocking(FileChannel.open(path.toNioPath, flags.value.map(_.option): _*))
)
).recoverWith { case unsupportedOperationException: UnsupportedOperationException =>
// not all file systems support file channels
openSeekableByteChannel(
Sync[F].blocking(JFiles.newByteChannel(path.toNioPath, flags.value.map(_.option): _*)),
unsupportedOperationException
)
}

def openFileChannel(channel: F[FileChannel]): Resource[F, FileHandle[F]] =
Resource.make(channel)(ch => Sync[F].blocking(ch.close())).map(ch => FileHandle.make(ch))

def openSeekableByteChannel(
channel: F[SeekableByteChannel],
unsupportedOperationException: => Throwable
): Resource[F, FileHandle[F]] =
Resource
.make(channel)(ch => Sync[F].blocking(ch.close()))
.map(ch => FileHandle.makeFromSeekableByteChannel(ch, unsupportedOperationException))

def realPath(path: Path): F[Path] =
Sync[F].blocking(Path.fromNioPath(path.toNioPath.toRealPath()))

Expand Down

0 comments on commit a727bf4

Please # to comment.