Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

don't deliver events for unregistered fds #341

Merged
merged 1 commit into from
Apr 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 33 additions & 8 deletions Sources/NIO/BaseSocketChannel.swift
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ private struct SocketChannelLifecycleManager {
// this is queried from the Channel, ie. must be thread-safe
internal let isActiveAtomic: Atomic<Bool>
// these are only to be accessed on the EventLoop

// have we seen the `.readEOF` notification
// note: this can be `false` on a deactivated channel, we might just have torn it down.
var hasSeenEOFNotification: Bool = false

private var currentState: State = .fresh {
didSet {
assert(self.eventLoop.inEventLoop)
Expand Down Expand Up @@ -228,8 +233,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
/// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream.
/// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course)
private enum ReadStreamState {
/// Everything seems normal.
case normal
/// Everything seems normal
case normal(ReadResult)

/// We saw EOF.
case eof
Expand Down Expand Up @@ -619,18 +624,23 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

private func registerForReadable() {
private final func registerForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

guard !self.lifecycleManager.hasSeenEOFNotification else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a guard or an assert?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Lukasa this will be run if the user does ctx.read()

// we have seen an EOF notification before so there's no point in registering for reads
return
}

guard !self.interestedEvent.contains(.read) else {
return
}

self.safeReregister(interested: self.interestedEvent.union(.read))
}

func unregisterForReadable() {
internal final func unregisterForReadable() {
assert(eventLoop.inEventLoop)
assert(self.lifecycleManager.isRegistered)

Expand Down Expand Up @@ -776,6 +786,16 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

final func readEOF() {
assert(!self.lifecycleManager.hasSeenEOFNotification)
self.lifecycleManager.hasSeenEOFNotification = true

self.readEOF0()

assert(!self.interestedEvent.contains(.read))
assert(!self.interestedEvent.contains(.readEOF))
}

final func readEOF0() {
if self.lifecycleManager.isRegistered {
// we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously
// reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting
Expand All @@ -793,7 +813,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
assert(!self.lifecycleManager.isActive)
assert(!self.lifecycleManager.isRegistered)
break loop
case .normal:
case .normal(.none):
preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.")
case .normal(.some):
// normal, note that there is no guarantee we're still active (as the user might have closed in callout)
continue loop
}
Expand All @@ -805,7 +827,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
// other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering
// the `reset` event.
final func reset() {
self.readEOF()
self.readEOF0()

if self.socket.isOpen {
assert(self.lifecycleManager.isRegistered)
Expand All @@ -831,6 +853,8 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}

public final func readable() {
assert(!self.lifecycleManager.hasSeenEOFNotification,
"got a read notification after having already seen .readEOF")
self.readable0()
}

Expand All @@ -845,8 +869,9 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
}
}

let readResult: ReadResult
do {
try readFromSocket()
readResult = try readFromSocket()
} catch let err {
let readStreamState: ReadStreamState
// ChannelError.eof is not something we want to fire through the pipeline as it just means the remote
Expand Down Expand Up @@ -885,7 +910,7 @@ class BaseSocketChannel<T: BaseSocket>: SelectableChannel, ChannelCore {
pipeline.fireChannelReadComplete0()
}
readIfNeeded0()
return .normal
return .normal(readResult)
}

/// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`.
Expand Down
14 changes: 12 additions & 2 deletions Sources/NIO/Selector.swift
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ final class Selector<R: Registration> {
private var eventsCapacity = 64
private var events: UnsafeMutablePointer<EventType>
private var registrations = [Int: R]()
// temporary workaround to stop us delivering outdated events; read in `whenReady`, set in `deregister`
private var deregistrationsHappened: Bool = false

private static func allocateEventsArray(capacity: Int) -> UnsafeMutablePointer<EventType> {
let events: UnsafeMutablePointer<EventType> = UnsafeMutablePointer.allocate(capacity: capacity)
Expand Down Expand Up @@ -453,6 +455,8 @@ final class Selector<R: Registration> {
guard self.lifecycleState == .open else {
throw IOError(errnoCode: EBADF, reason: "can't deregister from selector as it's \(self.lifecycleState).")
}
// temporary workaround to stop us delivering outdated events
self.deregistrationsHappened = true
try selectable.withUnsafeFileDescriptor { fd in
guard let reg = registrations.removeValue(forKey: Int(fd)) else {
return
Expand Down Expand Up @@ -500,7 +504,10 @@ final class Selector<R: Registration> {
ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: -1))
}

for i in 0..<ready {
// start with no deregistrations happened
self.deregistrationsHappened = false
// temporary workaround to stop us delivering outdated events; possibly set in `deregister`
for i in 0..<ready where !self.deregistrationsHappened {
let ev = events[i]
switch ev.data.fd {
case eventfd:
Expand Down Expand Up @@ -540,7 +547,10 @@ final class Selector<R: Registration> {
Int(try KQueue.kevent(kq: self.fd, changelist: nil, nchanges: 0, eventlist: events, nevents: Int32(eventsCapacity), timeout: ts))
}

for i in 0..<ready {
// start with no deregistrations happened
self.deregistrationsHappened = false
// temporary workaround to stop us delivering outdated events; possibly set in `deregister`
for i in 0..<ready where !self.deregistrationsHappened {
let ev = events[i]
let filter = Int32(ev.filter)
guard Int32(ev.flags) & EV_ERROR == 0 else {
Expand Down
1 change: 1 addition & 0 deletions Tests/NIOTests/EchoServerClientTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extension EchoServerClientTest {
("testPendingReadProcessedAfterWriteError", testPendingReadProcessedAfterWriteError),
("testChannelErrorEOFNotFiredThroughPipeline", testChannelErrorEOFNotFiredThroughPipeline),
("testPortNumbers", testPortNumbers),
("testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4", testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4),
]
}
}
Expand Down
47 changes: 47 additions & 0 deletions Tests/NIOTests/EchoServerClientTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -779,4 +779,51 @@ class EchoServerClientTest : XCTestCase {
}
XCTAssertTrue(atLeastOneSucceeded)
}

func testConnectingToIPv4And6ButServerOnlyWaitsOnIPv4() throws {
let group = MultiThreadedEventLoopGroup(numThreads: 1)
defer {
XCTAssertNoThrow(try group.syncShutdownGracefully())
}

let numBytes = 16 * 1024
let promise: EventLoopPromise<ByteBuffer> = group.next().newPromise()
let countingHandler = ByteCountingHandler(numBytes: numBytes, promise: promise)

// we're binding to IPv4 only
let serverChannel = try ServerBootstrap(group: group)
.serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
.childChannelInitializer { channel in
channel.pipeline.add(handler: countingHandler)
}
.bind(host: "127.0.0.1", port: 0)
.wait()

defer {
XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed())
}

// but we're trying to connect to (depending on the system configuration and resolver) IPv4 and IPv6
let clientChannel = try ClientBootstrap(group: group)
.connect(host: "localhost", port: Int(serverChannel.localAddress!.port!))
.thenIfError {
promise.fail(error: $0)
return group.next().newFailedFuture(error: $0)
}
.wait()

defer {
XCTAssertNoThrow(try clientChannel.syncCloseAcceptingAlreadyClosed())
}

var buffer = clientChannel.allocator.buffer(capacity: numBytes)

for i in 0..<numBytes {
buffer.write(integer: UInt8(i % 256))
}

try clientChannel.writeAndFlush(NIOAny(buffer)).wait()

try countingHandler.assertReceived(buffer: buffer)
}
}
1 change: 1 addition & 0 deletions Tests/NIOTests/SelectorTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ extension SelectorTest {
return [
("testDeregisterWhileProcessingEvents", testDeregisterWhileProcessingEvents),
("testDeregisterAndCloseWhileProcessingEvents", testDeregisterAndCloseWhileProcessingEvents),
("testWeDoNotDeliverEventsForPreviouslyClosedChannels", testWeDoNotDeliverEventsForPreviouslyClosedChannels),
]
}
}
Expand Down
Loading