Skip to content

Commit

Permalink
non-blocking versions of file open/stat
Browse files Browse the repository at this point in the history
Motivation:

Opening a file, seeking it or querying its size (or other information)
is blocking on UNIX so `NonBlockingFileIO` should support that too.

Modifications:

Added a method to `NonBlockingFileIO` which lets the user open a file
without blocking the calling thread.

Result:

Less blocking is good :)
  • Loading branch information
weissi committed Apr 3, 2018
1 parent 2af28a7 commit 60f6164
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 77 deletions.
27 changes: 27 additions & 0 deletions Sources/NIO/BlockingIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public final class BlockingIOThreadPool {

/// Submit a `WorkItem` to process.
///
/// - note: This is a low-level method, in most cases the `runIfActive` method should be used.
///
/// - parameters:
/// - body: The `WorkItem` to process by the `BlockingIOThreadPool`.
public func submit(_ body: @escaping WorkItem) {
Expand Down Expand Up @@ -153,6 +155,31 @@ public final class BlockingIOThreadPool {
}
}

public extension BlockingIOThreadPool {
/// Runs the submitted closure if the thread pool is still active, otherwise fails the promise.
/// The closure will be run on the thread pool so can do blocking work.
///
/// - parameters:
/// - eventLoop: The `EventLoop` to create the returned `EventLoopFuture` on.
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - returns: An `EventLoopFuture` fulfilled with the result (or error) of the passed closure.
public func runIfActive<T>(eventLoop: EventLoop, _ body: @escaping () throws -> T) -> EventLoopFuture<T> {
let promise: EventLoopPromise<T> = eventLoop.newPromise()
self.submit { shouldRun in
guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else {
promise.fail(error: ChannelError.ioOnClosedChannel)
return
}
do {
try promise.succeed(result: body())
} catch {
promise.fail(error: error)
}
}
return promise.futureResult
}
}

extension BlockingIOThreadPool {
public func shutdownGracefully(_ callback: @escaping (Error?) -> Void) {
self.shutdownGracefully(queue: .global(), callback)
Expand Down
75 changes: 44 additions & 31 deletions Sources/NIO/NonBlockingFileIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,44 +185,57 @@ public struct NonBlockingFileIO {
return eventLoop.newSucceededFuture(result: allocator.buffer(capacity: 0))
}

let p: EventLoopPromise<ByteBuffer> = eventLoop.newPromise()
var buf = allocator.buffer(capacity: byteCount)
self.threadPool.submit { shouldRun in
guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else {
p.fail(error: ChannelError.ioOnClosedChannel)
return
}

return self.threadPool.runIfActive(eventLoop: eventLoop) { () -> ByteBuffer in
var bytesRead = 0
while bytesRead < byteCount {
do {
let n = try buf.writeWithUnsafeMutableBytes { ptr in
let res = try fileHandle.withUnsafeFileDescriptor { descriptor in
try Posix.read(descriptor: descriptor,
pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self),
size: byteCount - bytesRead)
}
switch res {
case .processed(let n):
assert(n >= 0, "read claims to have read a negative number of bytes \(n)")
return n
case .wouldBlock:
throw Error.descriptorSetToNonBlocking
}
let n = try buf.writeWithUnsafeMutableBytes { ptr in
let res = try fileHandle.withUnsafeFileDescriptor { descriptor in
try Posix.read(descriptor: descriptor,
pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self),
size: byteCount - bytesRead)
}
if n == 0 {
// EOF
break
} else {
bytesRead += n
switch res {
case .processed(let n):
assert(n >= 0, "read claims to have read a negative number of bytes \(n)")
return n
case .wouldBlock:
throw Error.descriptorSetToNonBlocking
}
} catch {
p.fail(error: error)
return
}
if n == 0 {
// EOF
break
} else {
bytesRead += n
}
}
return buf
}
}

/// Open the file at `path` on a private thread pool which is separate from any `EventLoop` thread.
///
/// This function will return (a future) of the `FileHandle` associated with the file opened and a `FileRegion`
/// comprising of the whole file. The caller must close the returned `FileHandle` when its no longer needed.
///
/// - note: The reason this returns the `FileHandle` and the `FileRegion` is that both the opening of a file as well as the querying of its size are blocking.
///
/// - parameters:
/// - path: The path of the file to be opened.
/// - eventLoop: The `EventLoop` on which the returned `EventLoopFuture` will fire.
/// - returns: An `EventLoopFuture` containing the `FileHandle` and the `FileRegion` comprising the whole file.
public func openFile(path: String, eventLoop: EventLoop) -> EventLoopFuture<(FileHandle, FileRegion)> {
return self.threadPool.runIfActive(eventLoop: eventLoop) {
let fh = try FileHandle(path: path)
do {
let fr = try FileRegion(fileHandle: fh)
return (fh, fr)
} catch {
_ = try? fh.close()
throw error
}
p.succeed(result: buf)
}
return p.futureResult
}

}
98 changes: 52 additions & 46 deletions Sources/NIOHTTP1Server/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,38 @@ private final class HTTPHandler: ChannelInboundHandler {
private func handleFile(ctx: ChannelHandlerContext, request: HTTPServerRequestPart, ioMethod: FileIOMethod, path: String) {
self.buffer.clear()

func sendErrorResponse(request: HTTPRequestHead, _ error: Error) {
var body = ctx.channel.allocator.buffer(capacity: 128)
let response = { () -> HTTPResponseHead in
switch error {
case let e as IOError where e.errnoCode == ENOENT:
body.write(staticString: "IOError (not found)\r\n")
return HTTPResponseHead(version: request.version, status: .notFound)
case let e as IOError:
body.write(staticString: "IOError (other)\r\n")
body.write(string: e.description)
body.write(staticString: "\r\n")
return HTTPResponseHead(version: request.version, status: .notFound)
default:
body.write(string: "\(type(of: error)) error\r\n")
return HTTPResponseHead(version: request.version, status: .internalServerError)
}
}()
body.write(string: "\(error)")
body.write(staticString: "\r\n")
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
ctx.channel.close(promise: nil)
}

func responseHead(request: HTTPRequestHead, fileRegion region: FileRegion) -> HTTPResponseHead {
var response = HTTPResponseHead(version: request.version, status: .ok)
response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
return response
}

switch request {
case .head(let request):
self.keepAlive = request.isKeepAlive
Expand All @@ -260,28 +292,25 @@ private final class HTTPHandler: ChannelInboundHandler {
return
}
let path = self.htdocsPath + "/" + path
do {
let file = try FileHandle(path: path)
let region = try FileRegion(fileHandle: file)
var response = HTTPResponseHead(version: request.version, status: .ok)

response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")

let fileHandleAndRegion = self.fileIO.openFile(path: path, eventLoop: ctx.eventLoop)
fileHandleAndRegion.whenFailure {
sendErrorResponse(request: request, $0)
}
fileHandleAndRegion.whenSuccess { (file, region) in
switch ioMethod {
case .nonblockingFileIO:
var responseStarted = false
let f = self.fileIO.readChunked(fileRegion: region,
chunkSize: 32 * 1024,
allocator: ctx.channel.allocator,
eventLoop: ctx.eventLoop) { buffer in
if !responseStarted {
responseStarted = true
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
}
return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}
f.then { () -> EventLoopFuture<Void> in
let response = responseHead(request: request, fileRegion: region)
return self.fileIO.readChunked(fileRegion: region,
chunkSize: 32 * 1024,
allocator: ctx.channel.allocator,
eventLoop: ctx.eventLoop) { buffer in
if !responseStarted {
responseStarted = true
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
}
return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}.then { () -> EventLoopFuture<Void> in
let p: EventLoopPromise<Void> = ctx.eventLoop.newPromise()
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
Expand All @@ -301,9 +330,9 @@ private final class HTTPHandler: ChannelInboundHandler {
_ = try? file.close()
}
case .sendfile:
ctx.write(self.wrapOutboundOut(.head(response))).then {
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region))))
}.then {
let response = responseHead(request: request, fileRegion: region)
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))).then {
let p: EventLoopPromise<Void> = ctx.eventLoop.newPromise()
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
Expand All @@ -313,30 +342,7 @@ private final class HTTPHandler: ChannelInboundHandler {
_ = try? file.close()
}
}
} catch {
var body = ctx.channel.allocator.buffer(capacity: 128)
let response = { () -> HTTPResponseHead in
switch error {
case let e as IOError where e.errnoCode == ENOENT:
body.write(staticString: "IOError (not found)\r\n")
return HTTPResponseHead(version: request.version, status: .notFound)
case let e as IOError:
body.write(staticString: "IOError (other)\r\n")
body.write(string: e.description)
body.write(staticString: "\r\n")
return HTTPResponseHead(version: request.version, status: .notFound)
default:
body.write(string: "\(type(of: error)) error\r\n")
return HTTPResponseHead(version: request.version, status: .internalServerError)
}
}()
body.write(string: "\(error)")
body.write(staticString: "\r\n")
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
ctx.channel.close(promise: nil)
}
}
case .end:
self.state.requestComplete()
default:
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ extension NonBlockingFileIOTest {
("testFileRegionReadFromPipeFails", testFileRegionReadFromPipeFails),
("testReadFromNonBlockingPipeFails", testReadFromNonBlockingPipeFails),
("testSeekPointerIsSetToFront", testSeekPointerIsSetToFront),
("testFileOpenWorks", testFileOpenWorks),
("testFileOpenWorksWithEmptyFile", testFileOpenWorksWithEmptyFile),
("testFileOpenFails", testFileOpenFails),
]
}
}
Expand Down
39 changes: 39 additions & 0 deletions Tests/NIOTests/NonBlockingFileIOTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,43 @@ class NonBlockingFileIOTest: XCTestCase {
}
XCTAssertEqual(2, numCalls)
}

func testFileOpenWorks() throws {
let content = "123"
try withTemporaryFile(content: content) { (fileHandle, path) -> Void in
let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait()
try fh.withUnsafeFileDescriptor { fd in
XCTAssertGreaterThanOrEqual(fd, 0)
}
XCTAssertTrue(fh.isOpen)
XCTAssertEqual(0, fr.readerIndex)
XCTAssertEqual(3, fr.endIndex)
try fh.close()
}
}

func testFileOpenWorksWithEmptyFile() throws {
let content = ""
try withTemporaryFile(content: content) { (fileHandle, path) -> Void in
let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait()
try fh.withUnsafeFileDescriptor { fd in
XCTAssertGreaterThanOrEqual(fd, 0)
}
XCTAssertTrue(fh.isOpen)
XCTAssertEqual(0, fr.readerIndex)
XCTAssertEqual(0, fr.endIndex)
try fh.close()
}
}

func testFileOpenFails() throws {
do {
_ = try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).wait()
XCTFail("should've thrown")
} catch let e as IOError where e.errnoCode == ENOTDIR {
// OK
} catch {
XCTFail("wrong error: \(error)")
}
}
}

0 comments on commit 60f6164

Please # to comment.