Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Remove busywork left over from flush promises. #234

Merged
merged 2 commits into from
Mar 26, 2018
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {

/// Mark a flush point. This is called when flush is received, and instructs
/// the implementation to record the flush.
func markFlushPoint(promise: EventLoopPromise<Void>?) {
func markFlushPoint() {
fatalError("this must be overridden by sub class")
}

Expand Down Expand Up @@ -408,7 +408,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
return
}

self.markFlushPoint(promise: nil)
self.markFlushPoint()

if !isWritePending() && flushNow() == .register {
registerForWritable()
Expand Down
76 changes: 36 additions & 40 deletions Sources/NIO/PendingDatagramWritesManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ private struct PendingDatagramWritesState {

private var pendingWrites = MarkedCircularBuffer<PendingDatagramWrite>(initialRingCapacity: 16)
private var chunks: Int = 0
private var toBeFlushedErrors: [Error] = []
public private(set) var bytes: Int = 0

public var nextWrite: PendingDatagramWrite? {
Expand All @@ -148,15 +147,13 @@ private struct PendingDatagramWritesState {
/// Indicates that the first outstanding write was written.
///
/// - returns: The promise that the caller must fire, along with an error to fire it with if it needs one.
///
private mutating func wroteFirst(error: Error? = nil) -> DatagramWritePromiseFiller? {
let first = self.pendingWrites.removeFirst()
self.chunks -= 1
self.subtractOutstanding(bytes: first.data.readableBytes)
if let promise = first.promise {
return (promise, error)
}

return nil
}

Expand Down Expand Up @@ -187,10 +184,6 @@ private struct PendingDatagramWritesState {
///
/// All writes before this checkpoint will eventually be written to the socket.
public mutating func markFlushCheckpoint() {
// No point marking a flush checkpoint if we have no writes!
guard self.pendingWrites.count > 0 else {
return
}
self.pendingWrites.mark()
}

Expand All @@ -201,8 +194,8 @@ private struct PendingDatagramWritesState {
/// - parameters:
/// - data: The result of the write operation: namely, for each datagram we attempted to write, the number of bytes we wrote.
/// - messages: The vector messages written, if any.
/// - returns: A list of promises and the error that should be sent to themn, if any, and a `WriteResult` which indicates if we could write everything or not.
public mutating func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
/// - returns: A promise and the error that should be sent to it, if any, and a `WriteResult` which indicates if we could write everything or not.
public mutating func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
switch data {
case .processed(let written):
if let messages = messages {
Expand All @@ -211,21 +204,17 @@ private struct PendingDatagramWritesState {
return didScalarWrite(written: written)
}
case .wouldBlock:
return ([], .wouldBlock)
return (nil, .wouldBlock)
}
}

public mutating func recoverableError(_ error: Error) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
public mutating func recoverableError(_ error: Error) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
// When we've hit an error we treat it like fully writing the first datagram. We aren't going to try to
// send it again.
var pendingPromises: [DatagramWritePromiseFiller] = []
self.toBeFlushedErrors.append(error)
if let promiseFiller = self.wroteFirst(error: error) {
pendingPromises.append(promiseFiller)
}
let promiseFiller = self.wroteFirst(error: error)
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely

return (pendingPromises, result)
return (promiseFiller, result)
}

/// Indicates that a vector write succeeded.
Expand All @@ -235,20 +224,33 @@ private struct PendingDatagramWritesState {
/// - messages: The list of message objects.
/// - returns: A closure that the caller _needs_ to run which will fulfill the promises of the writes, and a `WriteResult` that indicates if we could write
/// everything or not.
private mutating func didVectorWrite(written: Int, messages: UnsafeMutableBufferPointer<MMsgHdr>) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
private mutating func didVectorWrite(written: Int, messages: UnsafeMutableBufferPointer<MMsgHdr>) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
var fillers: [DatagramWritePromiseFiller] = []
fillers.reserveCapacity(written)

// This was a vector write. We wrote `written` number of messages.
let writes = messages[messages.startIndex...messages.index(messages.startIndex, offsetBy: written - 1)]
var promiseFiller: DatagramWritePromiseFiller?

for write in writes {
let written = write.msg_len
fillers.append(contentsOf: didScalarWrite(written: Int(written)).0)
let thisWriteFiller = didScalarWrite(written: Int(written)).0
assert(thisWriteFiller?.1 == nil, "didVectorWrite called with errors on single writes!")

switch (promiseFiller, thisWriteFiller) {
case (.some(let all), .some(let this)):
all.0.futureResult.cascade(promise: this.0)
case (.none, .some(let this)):
promiseFiller = this
case (.some, .none),
(.none, .none):
break
}
}

// If we no longer have a mark, we wrote everything.
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (fillers, result)
return (promiseFiller, result)
}

/// Indicates that a scalar write succeeded.
Expand All @@ -257,17 +259,13 @@ private struct PendingDatagramWritesState {
/// - written: The number of bytes successfully written.
/// - returns: All the promises that must be fired, and a `WriteResult` that indicates if we could write
/// everything or not.
private mutating func didScalarWrite(written: Int) -> ([DatagramWritePromiseFiller], OneWriteOperationResult) {
private mutating func didScalarWrite(written: Int) -> (DatagramWritePromiseFiller?, OneWriteOperationResult) {
precondition(written <= self.pendingWrites[0].data.readableBytes,
"Appeared to write more bytes (\(written)) than the datagram contained (\(self.pendingWrites[0].data.readableBytes))")
var fillers: [DatagramWritePromiseFiller] = []
if let writeFiller = self.wroteFirst() {
fillers.append(writeFiller)
}

let writeFiller = self.wroteFirst()
// If we no longer have a mark, we wrote everything.
let result: OneWriteOperationResult = self.pendingWrites.hasMark() ? .writtenPartially : .writtenCompletely
return (fillers, result)
return (writeFiller, result)
}

/// Is there a pending flush?
Expand Down Expand Up @@ -461,13 +459,13 @@ final class PendingDatagramWritesManager: PendingWritesManager {
/// - parameters:
/// - data: The result of the write operation.
private func didWrite(_ data: IOResult<Int>, messages: UnsafeMutableBufferPointer<MMsgHdr>?) -> OneWriteOperationResult {
let (promises, result) = self.state.didWrite(data, messages: messages)
let (promise, result) = self.state.didWrite(data, messages: messages)

if self.state.bytes < waterMark.low {
channelWritabilityFlag.store(true)
}

self.fulfillPromises(promises)
self.fulfillPromise(promise)
return result
}

Expand All @@ -482,12 +480,12 @@ final class PendingDatagramWritesManager: PendingWritesManager {
private func handleError(_ error: Error) throws -> OneWriteOperationResult {
switch error {
case let e as IOError where e.errnoCode == EMSGSIZE:
let (promises, result) = self.state.recoverableError(ChannelError.writeMessageTooLarge)
self.fulfillPromises(promises)
let (promise, result) = self.state.recoverableError(ChannelError.writeMessageTooLarge)
self.fulfillPromise(promise)
return result
case let e as IOError where e.errnoCode == EHOSTUNREACH:
let (promises, result) = self.state.recoverableError(ChannelError.writeHostUnreachable)
self.fulfillPromises(promises)
let (promise, result) = self.state.recoverableError(ChannelError.writeHostUnreachable)
self.fulfillPromise(promise)
return result
default:
throw error
Expand Down Expand Up @@ -528,13 +526,11 @@ final class PendingDatagramWritesManager: PendingWritesManager {
messages: self.msgs)
}

private func fulfillPromises(_ results: [PendingDatagramWritesState.DatagramWritePromiseFiller]) {
for result in results {
if let error = result.1 {
result.0.fail(error: error)
} else {
result.0.succeed(result: ())
}
private func fulfillPromise(_ promise: PendingDatagramWritesState.DatagramWritePromiseFiller?) {
if let promise = promise, let error = promise.1 {
promise.0.fail(error: error)
} else if let promise = promise {
promise.0.succeed(result: ())
}
}

Expand Down
8 changes: 4 additions & 4 deletions Sources/NIO/SocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ final class SocketChannel: BaseSocketChannel<Socket> {
}
}

override func markFlushPoint(promise: EventLoopPromise<Void>?) {
override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
Expand Down Expand Up @@ -441,8 +441,8 @@ final class ServerSocketChannel: BaseSocketChannel<ServerSocket> {
promise?.fail(error: ChannelError.operationUnsupported)
}

override func markFlushPoint(promise: EventLoopPromise<Void>?) {
promise?.fail(error: ChannelError.operationUnsupported)
override func markFlushPoint() {
// We do nothing here: flushes are no-ops.
}

override func flushNow() -> IONotificationState {
Expand Down Expand Up @@ -616,7 +616,7 @@ final class DatagramChannel: BaseSocketChannel<Socket> {

/// Mark a flush point. This is called when flush is received, and instructs
/// the implementation to record the flush.
override func markFlushPoint(promise: EventLoopPromise<Void>?) {
override func markFlushPoint() {
// Even if writable() will be called later by the EventLoop we still need to mark the flush checkpoint so we are sure all the flushed messages
// are actually written once writable() is called.
self.pendingWrites.markFlushCheckpoint()
Expand Down