diff --git a/Sources/NIO/BlockingIOThreadPool.swift b/Sources/NIO/BlockingIOThreadPool.swift index cfc6e13863..8f4e0bedcd 100644 --- a/Sources/NIO/BlockingIOThreadPool.swift +++ b/Sources/NIO/BlockingIOThreadPool.swift @@ -75,6 +75,8 @@ public final class BlockingIOThreadPool { /// Submit a `WorkItem` to process. /// + /// - note: This is a low-level method, in most cases the `runIfActive` method should be used. + /// /// - parameters: /// - body: The `WorkItem` to process by the `BlockingIOThreadPool`. public func submit(_ body: @escaping WorkItem) { @@ -153,6 +155,30 @@ public final class BlockingIOThreadPool { } } +public extension BlockingIOThreadPool { + /// Runs the submitted closure if the thread pool is still active, otherwise fails the promise. + /// The closure will be run on the thread pool so can do blocking work. + /// + /// - parameters: + /// - promise: The `EventLoopPromise` that will be fulfilled when the block finished running (or failed). + /// - body: The closure which performs some blocking work to be done on the thread pool. + /// - returns: The `EventLoopFuture` of `promise` fulfilled with the result (or error) of the passed closure. + public func runIfActive(promise: EventLoopPromise, _ body: @escaping () throws -> T) -> EventLoopFuture { + self.submit { shouldRun in + guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else { + promise.fail(error: ChannelError.ioOnClosedChannel) + return + } + do { + try promise.succeed(result: body()) + } catch { + promise.fail(error: error) + } + } + return promise.futureResult + } +} + extension BlockingIOThreadPool { public func shutdownGracefully(_ callback: @escaping (Error?) -> Void) { self.shutdownGracefully(queue: .global(), callback) diff --git a/Sources/NIO/NonBlockingFileIO.swift b/Sources/NIO/NonBlockingFileIO.swift index 2c83c6367b..55817843d5 100644 --- a/Sources/NIO/NonBlockingFileIO.swift +++ b/Sources/NIO/NonBlockingFileIO.swift @@ -185,44 +185,57 @@ public struct NonBlockingFileIO { return eventLoop.newSucceededFuture(result: allocator.buffer(capacity: 0)) } - let p: EventLoopPromise = eventLoop.newPromise() var buf = allocator.buffer(capacity: byteCount) - self.threadPool.submit { shouldRun in - guard case shouldRun = BlockingIOThreadPool.WorkItemState.active else { - p.fail(error: ChannelError.ioOnClosedChannel) - return - } - + return self.threadPool.runIfActive(promise: eventLoop.newPromise()) { () -> ByteBuffer in var bytesRead = 0 while bytesRead < byteCount { - do { - let n = try buf.writeWithUnsafeMutableBytes { ptr in - let res = try fileHandle.withUnsafeFileDescriptor { descriptor in - try Posix.read(descriptor: descriptor, - pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), - size: byteCount - bytesRead) - } - switch res { - case .processed(let n): - assert(n >= 0, "read claims to have read a negative number of bytes \(n)") - return n - case .wouldBlock: - throw Error.descriptorSetToNonBlocking - } + let n = try buf.writeWithUnsafeMutableBytes { ptr in + let res = try fileHandle.withUnsafeFileDescriptor { descriptor in + try Posix.read(descriptor: descriptor, + pointer: ptr.baseAddress!.assumingMemoryBound(to: UInt8.self), + size: byteCount - bytesRead) } - if n == 0 { - // EOF - break - } else { - bytesRead += n + switch res { + case .processed(let n): + assert(n >= 0, "read claims to have read a negative number of bytes \(n)") + return n + case .wouldBlock: + throw Error.descriptorSetToNonBlocking } - } catch { - p.fail(error: error) - return } + if n == 0 { + // EOF + break + } else { + bytesRead += n + } + } + return buf + } + } + + /// Open the file at `path` on a private thread pool which is separate from any `EventLoop` thread. + /// + /// This function will return (a future) of the `FileHandle` associated with the file opened and a `FileRegion` + /// comprising of the whole file. The caller must close the returned `FileHandle` when its no longer needed. + /// + /// - note: The reason this returns the `FileHandle` and the `FileRegion` is that both the opening of a file as well as the querying of its size are blocking. + /// + /// - parameters: + /// - path: The path of the file to be opened. + /// - eventLoop: The `EventLoop` on which the returned `EventLoopFuture` will fire. + /// - returns: An `EventLoopFuture` containing the `FileHandle` and the `FileRegion` comprising the whole file. + public func openFile(path: String, eventLoop: EventLoop) -> EventLoopFuture<(FileHandle, FileRegion)> { + return self.threadPool.runIfActive(promise: eventLoop.newPromise()) { + let fh = try FileHandle(path: path) + do { + let fr = try FileRegion(fileHandle: fh) + return (fh, fr) + } catch { + _ = try? fh.close() + throw error } - p.succeed(result: buf) } - return p.futureResult } + } diff --git a/Sources/NIOHTTP1Server/main.swift b/Sources/NIOHTTP1Server/main.swift index 82873327c7..59b20d6c42 100644 --- a/Sources/NIOHTTP1Server/main.swift +++ b/Sources/NIOHTTP1Server/main.swift @@ -249,6 +249,38 @@ private final class HTTPHandler: ChannelInboundHandler { private func handleFile(ctx: ChannelHandlerContext, request: HTTPServerRequestPart, ioMethod: FileIOMethod, path: String) { self.buffer.clear() + func sendErrorResponse(request: HTTPRequestHead, _ error: Error) { + var body = ctx.channel.allocator.buffer(capacity: 128) + let response = { () -> HTTPResponseHead in + switch error { + case let e as IOError where e.errnoCode == ENOENT: + body.write(staticString: "IOError (not found)\r\n") + return HTTPResponseHead(version: request.version, status: .notFound) + case let e as IOError: + body.write(staticString: "IOError (other)\r\n") + body.write(string: e.description) + body.write(staticString: "\r\n") + return HTTPResponseHead(version: request.version, status: .notFound) + default: + body.write(string: "\(type(of: error)) error\r\n") + return HTTPResponseHead(version: request.version, status: .internalServerError) + } + }() + body.write(string: "\(error)") + body.write(staticString: "\r\n") + ctx.write(self.wrapOutboundOut(.head(response)), promise: nil) + ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil) + ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) + ctx.channel.close(promise: nil) + } + + func responseHead(request: HTTPRequestHead, fileRegion region: FileRegion) -> HTTPResponseHead { + var response = HTTPResponseHead(version: request.version, status: .ok) + response.headers.add(name: "Content-Length", value: "\(region.endIndex)") + response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") + return response + } + switch request { case .head(let request): self.keepAlive = request.isKeepAlive @@ -260,28 +292,25 @@ private final class HTTPHandler: ChannelInboundHandler { return } let path = self.htdocsPath + "/" + path - do { - let file = try FileHandle(path: path) - let region = try FileRegion(fileHandle: file) - var response = HTTPResponseHead(version: request.version, status: .ok) - - response.headers.add(name: "Content-Length", value: "\(region.endIndex)") - response.headers.add(name: "Content-Type", value: "text/plain; charset=utf-8") - + let fileHandleAndRegion = self.fileIO.openFile(path: path, eventLoop: ctx.eventLoop) + fileHandleAndRegion.whenFailure { + sendErrorResponse(request: request, $0) + } + fileHandleAndRegion.whenSuccess { (file, region) in switch ioMethod { case .nonblockingFileIO: var responseStarted = false - let f = self.fileIO.readChunked(fileRegion: region, - chunkSize: 32 * 1024, - allocator: ctx.channel.allocator, - eventLoop: ctx.eventLoop) { buffer in - if !responseStarted { - responseStarted = true - ctx.write(self.wrapOutboundOut(.head(response)), promise: nil) - } - return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer)))) - } - f.then { () -> EventLoopFuture in + let response = responseHead(request: request, fileRegion: region) + return self.fileIO.readChunked(fileRegion: region, + chunkSize: 32 * 1024, + allocator: ctx.channel.allocator, + eventLoop: ctx.eventLoop) { buffer in + if !responseStarted { + responseStarted = true + ctx.write(self.wrapOutboundOut(.head(response)), promise: nil) + } + return ctx.writeAndFlush(self.wrapOutboundOut(.body(.byteBuffer(buffer)))) + }.then { () -> EventLoopFuture in let p: EventLoopPromise = ctx.eventLoop.newPromise() self.completeResponse(ctx, trailers: nil, promise: p) return p.futureResult @@ -301,9 +330,9 @@ private final class HTTPHandler: ChannelInboundHandler { _ = try? file.close() } case .sendfile: - ctx.write(self.wrapOutboundOut(.head(response))).then { - ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))) - }.then { + let response = responseHead(request: request, fileRegion: region) + ctx.write(self.wrapOutboundOut(.head(response)), promise: nil) + ctx.writeAndFlush(self.wrapOutboundOut(.body(.fileRegion(region)))).then { let p: EventLoopPromise = ctx.eventLoop.newPromise() self.completeResponse(ctx, trailers: nil, promise: p) return p.futureResult @@ -313,30 +342,7 @@ private final class HTTPHandler: ChannelInboundHandler { _ = try? file.close() } } - } catch { - var body = ctx.channel.allocator.buffer(capacity: 128) - let response = { () -> HTTPResponseHead in - switch error { - case let e as IOError where e.errnoCode == ENOENT: - body.write(staticString: "IOError (not found)\r\n") - return HTTPResponseHead(version: request.version, status: .notFound) - case let e as IOError: - body.write(staticString: "IOError (other)\r\n") - body.write(string: e.description) - body.write(staticString: "\r\n") - return HTTPResponseHead(version: request.version, status: .notFound) - default: - body.write(string: "\(type(of: error)) error\r\n") - return HTTPResponseHead(version: request.version, status: .internalServerError) - } - }() - body.write(string: "\(error)") - body.write(staticString: "\r\n") - ctx.write(self.wrapOutboundOut(.head(response)), promise: nil) - ctx.write(self.wrapOutboundOut(.body(.byteBuffer(body))), promise: nil) - ctx.writeAndFlush(self.wrapOutboundOut(.end(nil)), promise: nil) - ctx.channel.close(promise: nil) - } + } case .end: self.state.requestComplete() default: diff --git a/Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift b/Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift index 974f08b85b..6a4c1bf9de 100644 --- a/Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift +++ b/Tests/NIOTests/NonBlockingFileIOTest+XCTest.swift @@ -45,6 +45,9 @@ extension NonBlockingFileIOTest { ("testFileRegionReadFromPipeFails", testFileRegionReadFromPipeFails), ("testReadFromNonBlockingPipeFails", testReadFromNonBlockingPipeFails), ("testSeekPointerIsSetToFront", testSeekPointerIsSetToFront), + ("testFileOpenWorks", testFileOpenWorks), + ("testFileOpenWorksWithEmptyFile", testFileOpenWorksWithEmptyFile), + ("testFileOpenFails", testFileOpenFails), ] } } diff --git a/Tests/NIOTests/NonBlockingFileIOTest.swift b/Tests/NIOTests/NonBlockingFileIOTest.swift index 567b4c21a9..3dcdff1910 100644 --- a/Tests/NIOTests/NonBlockingFileIOTest.swift +++ b/Tests/NIOTests/NonBlockingFileIOTest.swift @@ -449,4 +449,43 @@ class NonBlockingFileIOTest: XCTestCase { } XCTAssertEqual(2, numCalls) } + + func testFileOpenWorks() throws { + let content = "123" + try withTemporaryFile(content: content) { (fileHandle, path) -> Void in + let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(3, fr.endIndex) + try fh.close() + } + } + + func testFileOpenWorksWithEmptyFile() throws { + let content = "" + try withTemporaryFile(content: content) { (fileHandle, path) -> Void in + let (fh, fr) = try self.fileIO.openFile(path: path, eventLoop: self.eventLoop).wait() + try fh.withUnsafeFileDescriptor { fd in + XCTAssertGreaterThanOrEqual(fd, 0) + } + XCTAssertTrue(fh.isOpen) + XCTAssertEqual(0, fr.readerIndex) + XCTAssertEqual(0, fr.endIndex) + try fh.close() + } + } + + func testFileOpenFails() throws { + do { + _ = try self.fileIO.openFile(path: "/dev/null/this/does/not/exist", eventLoop: self.eventLoop).wait() + XCTFail("should've thrown") + } catch let e as IOError where e.errnoCode == ENOTDIR { + // OK + } catch { + XCTFail("wrong error: \(error)") + } + } }