Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Remove busywork left over from flush promises. #234

Merged
merged 2 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

/// Mark a flush point. This is called when flush is received, and instructs
/// the implementation to record the flush.
func markFlushPoint(promise: EventLoopPromise<Void>?) {
func markFlushPoint() {
fatalError("this must be overridden by sub class")
}

Expand Down Expand Up @@ -408,7 +408,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

self.markFlushPoint(promise: nil)
self.markFlushPoint()

if !isWritePending() && flushNow() == .register {
registerForWritable()
Expand Down
76 changes: 36 additions & 40 deletions Sources/NIO/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ private struct PendingDatagramWritesState {

private var pendingWrites = MarkedCircularBuffer<PendingDatagramWrite>(initialRingCapacity: 16)
private var chunks: Int = 0
private var toBeFlushedErrors: [Error] = []
public private(set) var bytes: Int = 0

public var nextWrite: PendingDatagramWrite? {
Expand All @@ -148,15 +147,13 @@ private struct PendingDatagramWritesState {
/// Indicates that the first outstanding write was written.
///
/// - returns: The promise that the caller must fire, along with an error to fire it with if it needs one.
///
private mutating func wroteFirst(error: Error? = nil) -> DatagramWritePromiseFiller? {
let first = self.pendingWrites.removeFirst()
self.chunks -= 1
self.subtractOutstanding(bytes: first.data.readableBytes)
if let promise = first.promise {
return (promise, error)
}

return nil
}

Expand Down Expand Up @@ -187,10 +184,6 @@ private struct PendingDatagramWritesState {
///
/// All writes before this checkpoint will eventually be written to the socket.
public mutating func markFlushCheckpoint() {
// No point marking a flush checkpoint if we have no writes!
guard self.pendingWrites.count > 0 else {
return
}
self.pendingWrites.mark()
}

Expand All @@ -201,8 +194,8 @@ private struct PendingDatagramWritesState {
/// - parameters:
/// - data: The result of the write operation: namely, for each datagram we attempted to write, the number of bytes we wrote.
/// - messages: The vector messages written, if any.
/// - returns: A list of promises and the error that should be sent to themn, if any, and a `WriteResult` which indicates if we could write everything or not.
public mutating func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
/// - returns: A promise and the error that should be sent to it, if any, and a `WriteResult` which indicates if we could write everything or not.
public mutating func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
switch data {
case .processed(let written):
if let messages = messages {
Expand All @@ -211,21 +204,17 @@ private struct PendingDatagramWritesState {
return didScalarWrite(written: written)
}
case .wouldBlock:
return ([], .wouldBlock)
return (nil, .wouldBlock)
}
}

public mutating func recoverableError(_ error: Error) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
public mutating func recoverableError(_ error: Error) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
// When we've hit an error we treat it like fully writing the first datagram. We aren't going to try to
// send it again.
var pendingPromises: [DatagramWritePromiseFiller] = []
self.toBeFlushedErrors.append(error)
if let promiseFiller = self.wroteFirst(error: error) {
pendingPromises.append(promiseFiller)
}
let promiseFiller = self.wroteFirst(error: error)
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely

return (pendingPromises, result)
return (promiseFiller, result)
}

/// Indicates that a vector write succeeded.
Expand All @@ -235,20 +224,33 @@ private struct PendingDatagramWritesState {
/// - messages: The list of message objects.
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises of the writes, and a `WriteResult` that indicates if we could write
/// everything or not.
private mutating func didVectorWrite(written: Int, messages: UnsafeMutableBufferPointer<MMsgHdr>) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
private mutating func didVectorWrite(written: Int, messages: UnsafeMutableBufferPointer<MMsgHdr>) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
var fillers: [DatagramWritePromiseFiller] = []
fillers.reserveCapacity(written)

// This was a vector write. We wrote `written` number of messages.
let writes = messages[messages.startIndex...messages.index(messages.startIndex, offsetBy: written - 1)]
var promiseFiller: DatagramWritePromiseFiller?

for write in writes {
let written = write.msg_len
fillers.append(contentsOf: didScalarWrite(written: Int(written)).0)
let thisWriteFiller = didScalarWrite(written: Int(written)).0
assert(thisWriteFiller?.1 == nil, "didVectorWrite called with errors on single writes!")

switch (promiseFiller, thisWriteFiller) {
case (.some(let all), .some(let this)):
all.0.futureResult.cascade(promise: this.0)
case (.none, .some(let this)):
promiseFiller = this
case (.some, .none),
(.none, .none):
break
}
}

// If we no longer have a mark, we wrote everything.
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (fillers, result)
return (promiseFiller, result)
}

/// Indicates that a scalar write succeeded.
Expand All @@ -257,17 +259,13 @@ private struct PendingDatagramWritesState {
/// - written: The number of bytes successfully written.
/// - returns: All the promises that must be fired, and a `WriteResult` that indicates if we could write
/// everything or not.
private mutating func didScalarWrite(written: Int) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
private mutating func didScalarWrite(written: Int) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
precondition(written <= self.pendingWrites[0].data.readableBytes,
"Appeared to write more bytes (\(written)) than the datagram contained (\(self.pendingWrites[0].data.readableBytes))")
var fillers: [DatagramWritePromiseFiller] = []
if let writeFiller = self.wroteFirst() {
fillers.append(writeFiller)
}

let writeFiller = self.wroteFirst()
// If we no longer have a mark, we wrote everything.
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (fillers, result)
return (writeFiller, result)
}

/// Is there a pending flush?
Expand Down Expand Up @@ -461,13 +459,13 @@ final class PendingDatagramWritesManager: PendingWritesManager {
/// - parameters:
/// - data: The result of the write operation.
private func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> OneWriteOperationResult {
let (promises, result) = self.state.didWrite(data, messages: messages)
let (promise, result) = self.state.didWrite(data, messages: messages)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
}

self.fulfillPromises(promises)
self.fulfillPromise(promise)
return result
}

Expand All @@ -482,12 +480,12 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private func handleError(_ error: Error) throws -> OneWriteOperationResult {
switch error {
case let e as IOError where e.errnoCode == EMSGSIZE:
let (promises, result) = self.state.recoverableError(ChannelError.writeMessageTooLarge)
self.fulfillPromises(promises)
let (promise, result) = self.state.recoverableError(ChannelError.writeMessageTooLarge)
self.fulfillPromise(promise)
return result
case let e as IOError where e.errnoCode == EHOSTUNREACH:
let (promises, result) = self.state.recoverableError(ChannelError.writeHostUnreachable)
self.fulfillPromises(promises)
let (promise, result) = self.state.recoverableError(ChannelError.writeHostUnreachable)
self.fulfillPromise(promise)
return result
default:
throw error
Expand Down Expand Up @@ -528,13 +526,11 @@ final class PendingDatagramWritesManager: PendingWritesManager {
messages: self.msgs)
}

private func fulfillPromises(_ results: [PendingDatagramWritesState.DatagramWritePromiseFiller]) {
for result in results {
if let error = result.1 {
result.0.fail(error: error)
} else {
result.0.succeed(result: ())
}
private func fulfillPromise(_ promise: PendingDatagramWritesState.DatagramWritePromiseFiller?) {
if let promise = promise, let error = promise.1 {
promise.0.fail(error: error)
} else if let promise = promise {
promise.0.succeed(result: ())
}
}

Expand Down
8 changes: 4 additions & 4 deletions Sources/NIO/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ final class SocketChannel: BaseSocketChannel<Socket> {
}
}

override func markFlushPoint(promise: EventLoopPromise<Void>?) {
override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
Expand Down Expand Up @@ -441,8 +441,8 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
promise?.fail(error: ChannelError.operationUnsupported)
}

override func markFlushPoint(promise: EventLoopPromise<Void>?) {
promise?.fail(error: ChannelError.operationUnsupported)
override func markFlushPoint() {
// We do nothing here: flushes are no-ops.
}

override func flushNow() -> IONotificationState {
Expand Down Expand Up @@ -616,7 +616,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {

/// Mark a flush point. This is called when flush is received, and instructs
/// the implementation to record the flush.
override func markFlushPoint(promise: EventLoopPromise<Void>?) {
override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
Expand Down