From 01b5e71fd691b063ca2889cc41d679688e373adf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Wei=C3=9F?= Date: Thu, 19 Apr 2018 19:45:32 +0100 Subject: [PATCH] don't deliver events for unregistered fds Motivation: Since forever we had a major bug in the Selector: In this condition: - kqueue/epoll had many events - in one of the earlier events we unregister a Channel whose fd is on of the later events - we subsequently (still in the same event loop tick) register a new channel which gets the same fd as the previously closed one then we would deliver an event that was meant for a previous channel to a newly opened one. Thanks to @mcdappdev for hitting this bug, helping us debug it and also providing a repeatedly working repro. Modifications: if during event delivery any fd gets unregistered, we stop delivering the remaining events and rely on the selector to redeliver them again next time. Result: we don't deliver events for previously closed channels to new ones. --- Sources/NIO/BaseSocketChannel.swift | 41 ++- Sources/NIO/Selector.swift | 14 +- .../EchoServerClientTest+XCTest.swift | 1 + Tests/NIOTests/EchoServerClientTest.swift | 47 +++ Tests/NIOTests/SelectorTest+XCTest.swift | 1 + Tests/NIOTests/SelectorTest.swift | 267 +++++++++++++++++- Tests/NIOTests/TestUtils.swift | 14 + 7 files changed, 374 insertions(+), 11 deletions(-) diff --git a/Sources/NIO/BaseSocketChannel.swift b/Sources/NIO/BaseSocketChannel.swift index a2ecdd75a5..3f1f5a6492 100644 --- a/Sources/NIO/BaseSocketChannel.swift +++ b/Sources/NIO/BaseSocketChannel.swift @@ -34,6 +34,11 @@ private struct SocketChannelLifecycleManager { // this is queried from the Channel, ie. must be thread-safe internal let isActiveAtomic: Atomic // these are only to be accessed on the EventLoop + + // have we seen the `.readEOF` notification + // note: this can be `false` on a deactivated channel, we might just have torn it down. + var hasSeenEOFNotification: Bool = false + private var currentState: State = .fresh { didSet { assert(self.eventLoop.inEventLoop) @@ -228,8 +233,8 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { /// Returned by the `private func readable0()` to inform the caller about the current state of the underlying read stream. /// This is mostly useful when receiving `.readEOF` as we then need to drain the read stream fully (ie. until we receive EOF or error of course) private enum ReadStreamState { - /// Everything seems normal. - case normal + /// Everything seems normal + case normal(ReadResult) /// We saw EOF. case eof @@ -619,10 +624,15 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } } - private func registerForReadable() { + private final func registerForReadable() { assert(eventLoop.inEventLoop) assert(self.lifecycleManager.isRegistered) + guard !self.lifecycleManager.hasSeenEOFNotification else { + // we have seen an EOF notification before so there's no point in registering for reads + return + } + guard !self.interestedEvent.contains(.read) else { return } @@ -630,7 +640,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { self.safeReregister(interested: self.interestedEvent.union(.read)) } - func unregisterForReadable() { + internal final func unregisterForReadable() { assert(eventLoop.inEventLoop) assert(self.lifecycleManager.isRegistered) @@ -776,6 +786,16 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } final func readEOF() { + assert(!self.lifecycleManager.hasSeenEOFNotification) + self.lifecycleManager.hasSeenEOFNotification = true + + self.readEOF0() + + assert(!self.interestedEvent.contains(.read)) + assert(!self.interestedEvent.contains(.readEOF)) + } + + final func readEOF0() { if self.lifecycleManager.isRegistered { // we're unregistering from `readEOF` here as we want this to be one-shot. We're then synchronously // reading all input until the EOF that we're guaranteed to see. After that `readEOF` becomes uninteresting @@ -793,7 +813,9 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { assert(!self.lifecycleManager.isActive) assert(!self.lifecycleManager.isRegistered) break loop - case .normal: + case .normal(.none): + preconditionFailure("got .readEOF and read returned not reading any bytes, nor EOF.") + case .normal(.some): // normal, note that there is no guarantee we're still active (as the user might have closed in callout) continue loop } @@ -805,7 +827,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { // other words: Failing to unregister the whole selector will cause NIO to spin at 100% CPU constantly delivering // the `reset` event. final func reset() { - self.readEOF() + self.readEOF0() if self.socket.isOpen { assert(self.lifecycleManager.isRegistered) @@ -831,6 +853,8 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } public final func readable() { + assert(!self.lifecycleManager.hasSeenEOFNotification, + "got a read notification after having already seen .readEOF") self.readable0() } @@ -845,8 +869,9 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { } } + let readResult: ReadResult do { - try readFromSocket() + readResult = try readFromSocket() } catch let err { let readStreamState: ReadStreamState // ChannelError.eof is not something we want to fire through the pipeline as it just means the remote @@ -885,7 +910,7 @@ class BaseSocketChannel: SelectableChannel, ChannelCore { pipeline.fireChannelReadComplete0() } readIfNeeded0() - return .normal + return .normal(readResult) } /// Returns `true` if the `Channel` should be closed as result of the given `Error` which happened during `readFromSocket`. diff --git a/Sources/NIO/Selector.swift b/Sources/NIO/Selector.swift index 3f5ba789e4..26b7434712 100644 --- a/Sources/NIO/Selector.swift +++ b/Sources/NIO/Selector.swift @@ -259,6 +259,8 @@ final class Selector { private var eventsCapacity = 64 private var events: UnsafeMutablePointer private var registrations = [Int: R]() + // temporary workaround to stop us delivering outdated events; read in `whenReady`, set in `deregister` + private var deregistrationsHappened: Bool = false private static func allocateEventsArray(capacity: Int) -> UnsafeMutablePointer { let events: UnsafeMutablePointer = UnsafeMutablePointer.allocate(capacity: capacity) @@ -453,6 +455,8 @@ final class Selector { guard self.lifecycleState == .open else { throw IOError(errnoCode: EBADF, reason: "can't deregister from selector as it's \(self.lifecycleState).") } + // temporary workaround to stop us delivering outdated events + self.deregistrationsHappened = true try selectable.withUnsafeFileDescriptor { fd in guard let reg = registrations.removeValue(forKey: Int(fd)) else { return @@ -500,7 +504,10 @@ final class Selector { ready = Int(try Epoll.epoll_wait(epfd: self.fd, events: events, maxevents: Int32(eventsCapacity), timeout: -1)) } - for i in 0.. { Int(try KQueue.kevent(kq: self.fd, changelist: nil, nchanges: 0, eventlist: events, nevents: Int32(eventsCapacity), timeout: ts)) } - for i in 0.. = group.next().newPromise() + let countingHandler = ByteCountingHandler(numBytes: numBytes, promise: promise) + + // we're binding to IPv4 only + let serverChannel = try ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1) + .childChannelInitializer { channel in + channel.pipeline.add(handler: countingHandler) + } + .bind(host: "127.0.0.1", port: 0) + .wait() + + defer { + XCTAssertNoThrow(try serverChannel.syncCloseAcceptingAlreadyClosed()) + } + + // but we're trying to connect to (depending on the system configuration and resolver) IPv4 and IPv6 + let clientChannel = try ClientBootstrap(group: group) + .connect(host: "localhost", port: Int(serverChannel.localAddress!.port!)) + .thenIfError { + promise.fail(error: $0) + return group.next().newFailedFuture(error: $0) + } + .wait() + + defer { + XCTAssertNoThrow(try clientChannel.syncCloseAcceptingAlreadyClosed()) + } + + var buffer = clientChannel.allocator.buffer(capacity: numBytes) + + for i in 0.. { + init(_ value: T) { + self._value = value + } + private var _value: T + var value: T { + get { + XCTAssertNotNil(MultiThreadedEventLoopGroup.currentEventLoop) + return self._value + } + set { + XCTAssertNotNil(MultiThreadedEventLoopGroup.currentEventLoop) + self._value = newValue + } + } + } + enum DidNotReadError: Error { + case didNotReadGotInactive + case didNotReadGotReadComplete + } + + /// This handler is inserted in the `ChannelPipeline` that are re-connected. So we're closing a bunch of + /// channels and (in the same event loop tick) we then connect the same number for which I'm using the + /// terminology 're-connect' here. + /// These re-connected channels will re-use the fd numbers of the just closed channels. The interesting thing + /// is that the `Selector` will still have events buffered for the _closed fds_. Note: the re-connected ones + /// will end up using the _same_ fds and this test ensures that we're not getting the outdated events. In this + /// case the outdated events are all `.readEOF`s which manifest as `channelReadComplete`s. If we're delivering + /// outdated events, they will also happen in the _same event loop tick_ and therefore we do quite a few + /// assertions that we're either in or not in that interesting event loop tick. + class HappyWhenReadHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + + private let didReadPromise: EventLoopPromise + private let hasReConnectEventLoopTickFinished: Box + private var didRead: Bool = false + + init(hasReConnectEventLoopTickFinished: Box, didReadPromise: EventLoopPromise) { + self.didReadPromise = didReadPromise + self.hasReConnectEventLoopTickFinished = hasReConnectEventLoopTickFinished + } + + func channelActive(ctx: ChannelHandlerContext) { + // we expect these channels to be connected within the re-connect event loop tick + XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value) + } + + func channelInactive(ctx: ChannelHandlerContext) { + // we expect these channels to be close a while after the re-connect event loop tick + XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value) + XCTAssertTrue(self.didRead) + if !self.didRead { + self.didReadPromise.fail(error: DidNotReadError.didNotReadGotInactive) + ctx.close(promise: nil) + } + } + + func channelRead(ctx: ChannelHandlerContext, data: NIOAny) { + // we expect these channels to get data only a while after the re-connect event loop tick as it's + // impossible to get a read notification in the very same event loop tick that you got registered + XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value) + + XCTAssertFalse(self.didRead) + var buf = self.unwrapInboundIn(data) + XCTAssertEqual(1, buf.readableBytes) + XCTAssertEqual("H", buf.readString(length: 1)!) + self.didRead = true + self.didReadPromise.succeed(result: ()) + } + + func channelReadComplete(ctx: ChannelHandlerContext) { + // we expect these channels to get data only a while after the re-connect event loop tick as it's + // impossible to get a read notification in the very same event loop tick that you got registered + XCTAssertTrue(self.hasReConnectEventLoopTickFinished.value) + XCTAssertTrue(self.didRead) + if !self.didRead { + self.didReadPromise.fail(error: DidNotReadError.didNotReadGotReadComplete) + ctx.close(promise: nil) + } + } + } + + /// This handler will wait for all client channels to have come up and for one of them to have received EOF. + /// (We will see the EOF as they're set to support half-closure). Then, it'll close half of those file + /// descriptors and open the same number of new ones. The new ones (called re-connected) will share the same + /// fd numbers as the recently closed ones. That brings us in an interesting situation: There will (very likely) + /// be `.readEOF` events enqueued for the just closed ones and because the re-connected channels share the same + /// fd numbers danger looms. The `HappyWhenReadHandler` above makes sure nothing bad happens. + class CloseEveryOtherAndOpenNewOnesHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + + private let allChannels: Box<[Channel]> + private let serverAddress: SocketAddress + private let everythingWasReadPromise: EventLoopPromise + private let hasReConnectEventLoopTickFinished: Box + + init(allChannels: Box<[Channel]>, + hasReConnectEventLoopTickFinished: Box, + serverAddress: SocketAddress, + everythingWasReadPromise: EventLoopPromise) { + self.allChannels = allChannels + self.serverAddress = serverAddress + self.everythingWasReadPromise = everythingWasReadPromise + self.hasReConnectEventLoopTickFinished = hasReConnectEventLoopTickFinished + } + + func channelActive(ctx: ChannelHandlerContext) { + // collect all the channels + ctx.channel.getOption(option: ChannelOptions.allowRemoteHalfClosure).whenSuccess { halfClosureAllowed in + precondition(halfClosureAllowed, + "the test configuration is bogus: half-closure is dis-allowed which breaks the setup of this test") + } + self.allChannels.value.append(ctx.channel) + } + + func userInboundEventTriggered(ctx: ChannelHandlerContext, event: Any) { + // this is the `.readEOF` that is triggered by the `ServerHandler`'s `close` calls because our channel + // supports half-closure + guard self.allChannels.value.count == SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse else { + return + } + // all channels are up, so let's construct the situation we want to be in: + // 1. let's close half the channels + // 2. then re-connect (must be synchronous) the same number of channels and we'll get fd number re-use + + ctx.channel.eventLoop.execute { + // this will be run immediately after we processed all `Selector` events so when + // `self.hasReConnectEventLoopTickFinished.value` becomes true, we're out of the event loop + // tick that is interesting. + XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value) + self.hasReConnectEventLoopTickFinished.value = true + } + XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value) + + let everyOtherIndex = stride(from: 0, to: SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse, by: 2) + for f in everyOtherIndex { + XCTAssertTrue(self.allChannels.value[f].isActive) + // close will succeed synchronously as we're on the right event loop. + self.allChannels.value[f].close(promise: nil) + XCTAssertFalse(self.allChannels.value[f].isActive) + } + + // now we have completed stage 1: we freed up a bunch of file descriptor numbers, so let's open + // some new ones + var reconnectedChannelsHaveRead: [EventLoopFuture] = [] + for _ in everyOtherIndex { + var hasBeenAdded: Bool = false + let p: EventLoopPromise = ctx.channel.eventLoop.newPromise() + reconnectedChannelsHaveRead.append(p.futureResult) + let newChannel = ClientBootstrap(group: ctx.eventLoop) + .channelInitializer { channel in + channel.pipeline.add(handler: HappyWhenReadHandler(hasReConnectEventLoopTickFinished: self.hasReConnectEventLoopTickFinished, + didReadPromise: p)).map { + hasBeenAdded = true + } + } + .connect(to: self.serverAddress) + .map { (channel: Channel) -> Void in + XCTAssertFalse(self.hasReConnectEventLoopTickFinished.value, + """ + This is bad: the connect of the channels to be re-connected has not + completed synchronously. + We assumed that on all platform a UNIX Domain Socket connect is + synchronous but we must be wrong :(. + The good news is: Not everything is lost, this test should also work + if you instead open a regular file (in O_RDWR) and just use this file's + fd with `ClientBootstrap(group: group).withConnectedSocket(fileFD)`. + Sure, a file is not a socket but it's always readable and writable and + that fulfills the requirements we have here. + I still hope this change will never have to be done. + Note: if you changed anything about the pipeline's handler adding/removal + you might also have a bug there. + """) + } + // just to make sure we got `newChannel` synchronously and we could add our handler to the + // pipeline synchronously too. + XCTAssertTrue(newChannel.isFulfilled) + XCTAssertTrue(hasBeenAdded) + } + + // if all the new re-connected channels have read, then we're happy here. + EventLoopFuture.andAll(reconnectedChannelsHaveRead, + eventLoop: ctx.eventLoop).cascade(promise: self.everythingWasReadPromise) + // let's also remove all the channels so this code will not be triggered again. + self.allChannels.value.removeAll() + } + + } + + // all of the following are boxed as we need mutable references to them, they can only be read/written on the + // event loop `el`. + let allServerChannels: Box<[Channel]> = Box([]) + var allChannels: Box<[Channel]> = Box([]) + let hasReConnectEventLoopTickFinished: Box = Box(false) + let numberOfConnectedChannels: Box = Box(0) + + /// This spawns a server, always send a character immediately and after the first + /// `SelectorTest.numberOfChannelsToUse` have been established, we'll close them all. That will trigger + /// an `.readEOF` in the connected client channels which will then trigger other interesting things (see above). + class ServerHandler: ChannelInboundHandler { + typealias InboundIn = ByteBuffer + + private var number: Int = 0 + private let allServerChannels: Box<[Channel]> + private let numberOfConnectedChannels: Box + + init(allServerChannels: Box<[Channel]>, numberOfConnectedChannels: Box) { + self.allServerChannels = allServerChannels + self.numberOfConnectedChannels = numberOfConnectedChannels + } + + func channelActive(ctx: ChannelHandlerContext) { + var buf = ctx.channel.allocator.buffer(capacity: 1) + buf.write(string: "H") + ctx.channel.writeAndFlush(buf, promise: nil) + self.number += 1 + self.allServerChannels.value.append(ctx.channel) + if self.allServerChannels.value.count == SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse { + // just to be sure all of the client channels have connected + XCTAssertEqual(SelectorTest.testWeDoNotDeliverEventsForPreviouslyClosedChannels_numberOfChannelsToUse, numberOfConnectedChannels.value) + self.allServerChannels.value.forEach { c in + c.close(promise: nil) + } + } + } + } + let el = MultiThreadedEventLoopGroup(numThreads: 1).next() + defer { + XCTAssertNoThrow(try el.syncShutdownGracefully()) + } + let tempDir = createTemporaryDirectory() + let secondServerChannel = try! ServerBootstrap(group: el) + .childChannelInitializer { channel in + channel.pipeline.add(handler: ServerHandler(allServerChannels: allServerChannels, + numberOfConnectedChannels: numberOfConnectedChannels)) + } + .bind(to: SocketAddress(unixDomainSocketPath: "\(tempDir)/server-sock.uds")) + .wait() + + let everythingWasReadPromise: EventLoopPromise = el.newPromise() + XCTAssertNoThrow(try el.submit { () -> [EventLoopFuture] in + (0..(content: String? = nil, _ body: (NIO.FileHandle, Strin return try body(fileHandle, path) } +func createTemporaryDirectory() -> String { + let template = "/tmp/.NIOTests-temp-dir_XXXXXX" + var templateBytes = template.utf8 + [0] + let templateBytesCount = templateBytes.count + templateBytes.withUnsafeMutableBufferPointer { ptr in + ptr.baseAddress!.withMemoryRebound(to: Int8.self, capacity: templateBytesCount) { (ptr: UnsafeMutablePointer) in + let ret = mkdtemp(ptr) + XCTAssertNotNil(ret) + } + } + templateBytes.removeLast() + return String(decoding: templateBytes, as: UTF8.self) +} + func openTemporaryFile() -> (CInt, String) { let template = "/tmp/niotestXXXXXXX" var templateBytes = template.utf8 + [0]