Skip to content

Commit

Permalink
Support hopping event loops with EventLoopFuture
Browse files Browse the repository at this point in the history
Motivation:

A somewhat common requirement when working with chains of futures
is needing to hop from one event loop to another. This is particularly
common when relying on the fact that EventLoopFuture will synchronize
with the event loop that created it: you often want to rely on that
implicit locking, rather than use Dispatch or Lock to achieve the
same effect.

While doing this hop requires relatively little code, it's not
necessarily totally apparent to new users how they would do it.
Additionally, the most naive implementation incurs the overhead of
allocations and reference counting in cases where it's not necessary
(e.g. when you have only one event loop, or when both work items are
being scheduled on the same event loop).

For this reason, we should have a nice concise way for a user to
request this behaviour and get a relatively performant implementation
of the behaviour.

Modifications:

Added EventLoopFuture<T>.on(eventLoop:).

Changed AcceptHandler to use the new method rather than its (slower)
alternative.

Result:

Users will have an easier time working with EventLoopFutures.
  • Loading branch information
Lukasa committed Mar 16, 2018
1 parent ff8d5ed commit 62c746b
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 3 deletions.
4 changes: 1 addition & 3 deletions Sources/NIO/Bootstrap.swift
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,9 @@ public final class ServerBootstrap {

func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
let accepted = self.unwrapInboundIn(data)
let hopEventLoopPromise: EventLoopPromise<()> = ctx.eventLoop.newPromise()
self.childChannelOptions.applyAll(channel: accepted).cascade(promise: hopEventLoopPromise)
let childChannelInit = self.childChannelInit ?? { (_: Channel) in ctx.eventLoop.newSucceededFuture(result: ()) }

hopEventLoopPromise.futureResult.then {
self.childChannelOptions.applyAll(channel: accepted).on(eventLoop: ctx.eventLoop).then {
assert(ctx.eventLoop.inEventLoop)
return childChannelInit(accepted)
}.then { () -> EventLoopFuture<()> in
Expand Down
22 changes: 22 additions & 0 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,29 @@ extension EventLoopFuture {
p0.succeed(result: ())
return body
}
}

extension EventLoopFuture {
/// Returns an `EventLoopFuture` that fires when this future completes, but executes its callbacks on the
/// target event loop instead of the original one.
///
/// It is common to want to "hop" event loops when you arrange some work: for example, you're closing one channel
/// from another, and want to hop back when the close completes. This method lets you spell that requirement
/// succinctly. It also contains an optimisation for the case when the loop you're hopping *from* is the same as
/// the one you're hopping *to*, allowing you to avoid doing allocations in that case.
///
/// - parameters:
/// - target: The `EventLoop` that the returned `EventLoopFuture` will run on.
/// - returns: An `EventLoopFuture` whose callbacks run on `target` instead of the original loop.
func on(eventLoop target: EventLoop) -> EventLoopFuture<T> {
if target === self.eventLoop {
// We're already on that event loop, nothing to do here. Save an allocation.
return self
}
let hoppingPromise: EventLoopPromise<T> = target.newPromise()
self.cascade(promise: hoppingPromise)
return hoppingPromise.futureResult
}
}

/// Execute the given function and synchronously complete the given `EventLoopPromise` (if not `nil`).
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ extension EventLoopFutureTest {
("testOrderOfFutureCompletion", testOrderOfFutureCompletion),
("testEventLoopHoppingInThen", testEventLoopHoppingInThen),
("testEventLoopHoppingInThenWithFailures", testEventLoopHoppingInThenWithFailures),
("testLoopHoppingHelperSuccess", testLoopHoppingHelperSuccess),
("testLoopHoppingHelperFailure", testLoopHoppingHelperFailure),
("testLoopHoppingHelperNoHopping", testLoopHoppingHelperNoHopping),
]
}
}
Expand Down
48 changes: 48 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -241,4 +241,52 @@ class EventLoopFutureTest : XCTestCase {
}
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

func testLoopHoppingHelperSuccess() throws {
let group = MultiThreadedEventLoopGroup(numThreads: 2)
let loop1 = group.next()
let loop2 = group.next()
XCTAssertFalse(loop1 === loop2)

let succeedingPromise: EventLoopPromise<Void> = loop1.newPromise()
let succeedingFuture = succeedingPromise.futureResult.map {
XCTAssertTrue(loop1.inEventLoop)
}.on(eventLoop: loop2).map {
XCTAssertTrue(loop2.inEventLoop)
}
succeedingPromise.succeed(result: ())
XCTAssertNoThrow(try succeedingFuture.wait())
}

func testLoopHoppingHelperFailure() throws {
let group = MultiThreadedEventLoopGroup(numThreads: 2)
let loop1 = group.next()
let loop2 = group.next()
XCTAssertFalse(loop1 === loop2)

let failingPromise: EventLoopPromise<Void> = loop2.newPromise()
let failingFuture = failingPromise.futureResult.thenIfErrorThrowing { error in
XCTAssertEqual(error as? EventLoopFutureTestError, EventLoopFutureTestError.example)
XCTAssertTrue(loop2.inEventLoop)
throw error
}.on(eventLoop: loop1).mapIfError { error in
XCTAssertEqual(error as? EventLoopFutureTestError, EventLoopFutureTestError.example)
XCTAssertTrue(loop1.inEventLoop)
}

failingPromise.fail(error: EventLoopFutureTestError.example)
XCTAssertNoThrow(try failingFuture.wait())
}

func testLoopHoppingHelperNoHopping() throws {
let group = MultiThreadedEventLoopGroup(numThreads: 2)
let loop1 = group.next()
let loop2 = group.next()
XCTAssertFalse(loop1 === loop2)

let noHoppingPromise: EventLoopPromise<Void> = loop1.newPromise()
let noHoppingFuture = noHoppingPromise.futureResult.on(eventLoop: loop1)
XCTAssertTrue(noHoppingFuture === noHoppingPromise.futureResult)
noHoppingPromise.succeed(result: ())
}
}

0 comments on commit 62c746b

Please # to comment.