Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

non-blocking versions of file open/stat #256

Merged
merged 2 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions Sources/NIO/BlockingIOThreadPool.swift
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public final class BlockingIOThreadPool {

/// Submit a `WorkItem` to process.
///
/// - note: This is a low-level method, in most cases the `runIfActive` method should be used.
///
/// - parameters:
/// - body: The `WorkItem` to process by the `BlockingIOThreadPool`.
public func submit(_ body: @escaping WorkItem) {
Expand Down Expand Up @@ -153,6 +155,31 @@ public final class BlockingIOThreadPool {
}
}

public extension BlockingIOThreadPool {
/// Runs the submitted closure if the thread pool is still active, otherwise fails the promise.
/// The closure will be run on the thread pool so can do blocking work.
///
/// - parameters:
/// - eventLoop: The `EventLoop` the returned `EventLoopFuture` will fire on.
/// - body: The closure which performs some blocking work to be done on the thread pool.
/// - returns: The `EventLoopFuture` of `promise` fulfilled with the result (or error) of the passed closure.
public func runIfActive<T>(eventLoop: EventLoop, _ body: @escaping () throws -> T) -> EventLoopFuture<T> {
let promise: EventLoopPromise<T> = eventLoop.newPromise()
self.submit { shouldRun in
guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else {
promise.fail(error: ChannelError.ioOnClosedChannel)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO using an ChannelError here is a bit odd as the BlockingIOThreadPool has nothing to do with a Channel at all and we also have no reference to the Channel.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@normanmaurer yes, that is indeed very odd. However that is what the other methods do so I copied it. Happy to fix but then we're slightly inconsistent...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm ok... We should have spotted this before :( I guess we need to fix this in a next major

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@normanmaurer well, we could just be different in this particular method. That wouldn't be breaking API as it's a new API. It would just behave slightly differently than the read apis.

return
}
do {
try promise.succeed(result: body())
} catch {
promise.fail(error: error)
}
}
return promise.futureResult
}
}

extension BlockingIOThreadPool {
public func shutdownGracefully(_ callback: @escaping (Error?) -> Void) {
self.shutdownGracefully(queue: .global(), callback)
Expand Down
75 changes: 44 additions & 31 deletions Sources/NIO/NonBlockingFileIO.swift
Original file line number Diff line number Diff line change
Expand Up @@ -185,44 +185,57 @@ public struct NonBlockingFileIO {
return eventLoop.newSucceededFuture(result: allocator.buffer(capacity: 0))
}

let p: EventLoopPromise<ByteBuffer> = eventLoop.newPromise()
var buf = allocator.buffer(capacity: byteCount)
self.threadPool.submit { shouldRun in
guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else {
p.fail(error: ChannelError.ioOnClosedChannel)
return
}

return self.threadPool.runIfActive(eventLoop: eventLoop) { () -> ByteBuffer in
var bytesRead = 0
while bytesRead < byteCount {
do {
let n = try buf.writeWithUnsafeMutableBytes { ptr in
let res = try fileHandle.withUnsafeFileDescriptor { descriptor in
try Posix.read(descriptor: descriptor,
pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self),
size: byteCount - bytesRead)
}
switch res {
case .processed(let n):
assert(n >= 0, "read claims to have read a negative number of bytes \(n)")
return n
case .wouldBlock:
throw Error.descriptorSetToNonBlocking
}
let n = try buf.writeWithUnsafeMutableBytes { ptr in
let res = try fileHandle.withUnsafeFileDescriptor { descriptor in
try Posix.read(descriptor: descriptor,
pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self),
size: byteCount - bytesRead)
}
if n == 0 {
// EOF
break
} else {
bytesRead += n
switch res {
case .processed(let n):
assert(n >= 0, "read claims to have read a negative number of bytes \(n)")
return n
case .wouldBlock:
throw Error.descriptorSetToNonBlocking
}
} catch {
p.fail(error: error)
return
}
if n == 0 {
// EOF
break
} else {
bytesRead += n
}
}
return buf
}
}

/// Open the file at `path` on a private thread pool which is separate from any `EventLoop` thread.
///
/// This function will return (a future) of the `FileHandle` associated with the file opened and a `FileRegion`
/// comprising of the whole file. The caller must close the returned `FileHandle` when its no longer needed.
///
/// - note: The reason this returns the `FileHandle` and the `FileRegion` is that both the opening of a file as well as the querying of its size are blocking.
///
/// - parameters:
/// - path: The path of the file to be opened.
/// - eventLoop: The `EventLoop` on which the returned `EventLoopFuture` will fire.
/// - returns: An `EventLoopFuture` containing the `FileHandle` and the `FileRegion` comprising the whole file.
public func openFile(path: String, eventLoop: EventLoop) -> EventLoopFuture<(FileHandle, FileRegion)> {
return self.threadPool.runIfActive(eventLoop: eventLoop) {
let fh = try FileHandle(path: path)
do {
let fr = try FileRegion(fileHandle: fh)
return (fh, fr)
} catch {
_ = try? fh.close()
throw error
}
p.succeed(result: buf)
}
return p.futureResult
}

}
102 changes: 54 additions & 48 deletions Sources/NIOHTTP1Server/main.swift
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ private final class HTTPHandler: ChannelInboundHandler {
if balloonInMemory {
self.buffer.clear()
} else {
ctx.writeAndFlush(self.wrapOutboundOut(.head(.init(version: request.version, status: .ok))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.head(httpResponseHead(request: request, status: .ok))), promise: nil)
}
case .body(buffer: var buf):
if balloonInMemory {
Expand All @@ -172,7 +172,7 @@ private final class HTTPHandler: ChannelInboundHandler {
case .head(let request):
self.keepAlive = request.isKeepAlive
self.state.requestReceived()
ctx.writeAndFlush(self.wrapOutboundOut(.head(.init(version: request.version, status: .ok))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.head(httpResponseHead(request: request, status: .ok))), promise: nil)
case .body(buffer: _):
()
case .end:
Expand Down Expand Up @@ -280,6 +280,38 @@ private final class HTTPHandler: ChannelInboundHandler {
private func handleFile(ctx: ChannelHandlerContext, request: HTTPServerRequestPart, ioMethod: FileIOMethod, path: String) {
self.buffer.clear()

func sendErrorResponse(request: HTTPRequestHead, _ error: Error) {
var body = ctx.channel.allocator.buffer(capacity: 128)
let response = { () -> HTTPResponseHead in
switch error {
case let e as IOError where e.errnoCode == ENOENT:
body.write(staticString: "IOError (not found)\r\n")
return httpResponseHead(request: request, status: .notFound)
case let e as IOError:
body.write(staticString: "IOError (other)\r\n")
body.write(string: e.description)
body.write(staticString: "\r\n")
return httpResponseHead(request: request, status: .notFound)
default:
body.write(string: "\(type(of: error)) error\r\n")
return httpResponseHead(request: request, status: .internalServerError)
}
}()
body.write(string: "\(error)")
body.write(staticString: "\r\n")
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
ctx.channel.close(promise: nil)
}

func responseHead(request: HTTPRequestHead, fileRegion region: FileRegion) -> HTTPResponseHead {
var response = httpResponseHead(request: request, status: .ok)
response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")
return response
}

switch request {
case .head(let request):
self.keepAlive = request.isKeepAlive
Expand All @@ -291,28 +323,25 @@ private final class HTTPHandler: ChannelInboundHandler {
return
}
let path = self.htdocsPath + "/" + path
do {
let file = try FileHandle(path: path)
let region = try FileRegion(fileHandle: file)
var response = httpResponseHead(request: request, status: .ok)

response.headers.add(name: "Content-Length", value: "\(region.endIndex)")
response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8")

let fileHandleAndRegion = self.fileIO.openFile(path: path, eventLoop: ctx.eventLoop)
fileHandleAndRegion.whenFailure {
sendErrorResponse(request: request, $0)
}
fileHandleAndRegion.whenSuccess { (file, region) in
switch ioMethod {
case .nonblockingFileIO:
var responseStarted = false
let f = self.fileIO.readChunked(fileRegion: region,
chunkSize: 32 * 1024,
allocator: ctx.channel.allocator,
eventLoop: ctx.eventLoop) { buffer in
if !responseStarted {
responseStarted = true
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
}
return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}
f.then { () -> EventLoopFuture<Void> in
let response = responseHead(request: request, fileRegion: region)
return self.fileIO.readChunked(fileRegion: region,
chunkSize: 32 * 1024,
allocator: ctx.channel.allocator,
eventLoop: ctx.eventLoop) { buffer in
if !responseStarted {
responseStarted = true
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
}
return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer))))
}.then { () -> EventLoopFuture<Void> in
let p: EventLoopPromise<Void> = ctx.eventLoop.newPromise()
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
Expand All @@ -332,9 +361,9 @@ private final class HTTPHandler: ChannelInboundHandler {
_ = try? file.close()
}
case .sendfile:
ctx.write(self.wrapOutboundOut(.head(response))).then {
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region))))
}.then {
let response = responseHead(request: request, fileRegion: region)
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))).then {
let p: EventLoopPromise<Void> = ctx.eventLoop.newPromise()
self.completeResponse(ctx, trailers: nil, promise: p)
return p.futureResult
Expand All @@ -344,30 +373,7 @@ private final class HTTPHandler: ChannelInboundHandler {
_ = try? file.close()
}
}
} catch {
var body = ctx.channel.allocator.buffer(capacity: 128)
let response = { () -> HTTPResponseHead in
switch error {
case let e as IOError where e.errnoCode == ENOENT:
body.write(staticString: "IOError (not found)\r\n")
return httpResponseHead(request: request, status: .notFound)
case let e as IOError:
body.write(staticString: "IOError (other)\r\n")
body.write(string: e.description)
body.write(staticString: "\r\n")
return httpResponseHead(request: request, status: .notFound)
default:
body.write(string: "\(type(of: error)) error\r\n")
return httpResponseHead(request: request, status: .internalServerError)
}
}()
body.write(string: "\(error)")
body.write(staticString: "\r\n")
ctx.write(self.wrapOutboundOut(.head(response)), promise: nil)
ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil)
ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil)
ctx.channel.close(promise: nil)
}
}
case .end:
self.state.requestComplete()
default:
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ extension NonBlockingFileIOTest {
("testFileRegionReadFromPipeFails", testFileRegionReadFromPipeFails),
("testReadFromNonBlockingPipeFails", testReadFromNonBlockingPipeFails),
("testSeekPointerIsSetToFront", testSeekPointerIsSetToFront),
("testFileOpenWorks", testFileOpenWorks),
("testFileOpenWorksWithEmptyFile", testFileOpenWorksWithEmptyFile),
("testFileOpenFails", testFileOpenFails),
]
}
}
Expand Down
39 changes: 39 additions & 0 deletions Tests/NIOTests/NonBlockingFileIOTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -449,4 +449,43 @@ class NonBlockingFileIOTest: XCTestCase {
}
XCTAssertEqual(2, numCalls)
}

func testFileOpenWorks() throws {
let content = "123"
try withTemporaryFile(content: content) { (fileHandle, path) -> Void in
let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait()
try fh.withUnsafeFileDescriptor { fd in
XCTAssertGreaterThanOrEqual(fd, 0)
}
XCTAssertTrue(fh.isOpen)
XCTAssertEqual(0, fr.readerIndex)
XCTAssertEqual(3, fr.endIndex)
try fh.close()
}
}

func testFileOpenWorksWithEmptyFile() throws {
let content = ""
try withTemporaryFile(content: content) { (fileHandle, path) -> Void in
let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait()
try fh.withUnsafeFileDescriptor { fd in
XCTAssertGreaterThanOrEqual(fd, 0)
}
XCTAssertTrue(fh.isOpen)
XCTAssertEqual(0, fr.readerIndex)
XCTAssertEqual(0, fr.endIndex)
try fh.close()
}
}

func testFileOpenFails() throws {
do {
_ = try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).wait()
XCTFail("should've thrown")
} catch let e as IOError where e.errnoCode == ENOTDIR {
// OK
} catch {
XCTFail("wrong error: \(error)")
}
}
}