Skip to content

Commit

Permalink
fix EventLoopFuture.and's serious threading issues
Browse files Browse the repository at this point in the history
Motivation:

EventLoopFuture.and had serious threading issues if the EventLoops
weren't the same.

Modifications:

Fixed the threading issues and tested them properly.

Result:

Hopefully `and` and `andAll` now don't crash if you use them across
EventLoops.
  • Loading branch information
Johannes Weiß committed Mar 16, 2018
1 parent ff8d5ed commit 10ba901
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 31 deletions.
92 changes: 62 additions & 30 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -635,43 +635,75 @@ extension EventLoopFuture {
/// of results. If either one fails, the combined `EventLoopFuture` will fail with
/// the first error encountered.
public func and<U>(_ other: EventLoopFuture<U>, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
let andlock = Lock()
let promise = EventLoopPromise<(T,U)>(eventLoop: eventLoop, file: file, line: line)
var tvalue: T?
var uvalue: U?

_whenComplete { () -> CallbackList in
switch self.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let t):
andlock.lock()
if let u = uvalue {
andlock.unlock()
return promise._setValue(value: .success((t, u)))
} else {
andlock.unlock()
tvalue = t
if self.eventLoop === other.eventLoop {
assert(self.eventLoop === other.eventLoop)
_whenComplete { () -> CallbackList in
switch self.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let t):
if let u = uvalue {
return promise._setValue(value: .success((t, u)))
} else {
tvalue = t
}
}
return CallbackList()
}
return CallbackList()
}

other._whenComplete { () -> CallbackList in
switch other.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let u):
andlock.lock()
if let t = tvalue {
andlock.unlock()
return promise._setValue(value: .success((t, u)))
} else {
andlock.unlock()
uvalue = u

other._whenComplete { () -> CallbackList in
switch other.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let u):
if let t = tvalue {
return promise._setValue(value: .success((t, u)))
} else {
uvalue = u
}
}
return CallbackList()
}
} else {
let andlock = Lock()

_whenComplete { () -> CallbackList in
switch self.value! {
case .failure:
self.cascadeFailure(promise: promise)
case .success(let t):
andlock.lock()
if let u = uvalue {
andlock.unlock()
promise.succeed(result: (t, u))
} else {
tvalue = t
andlock.unlock()
}
}
return CallbackList()
}

other._whenComplete { () -> CallbackList in
switch other.value! {
case .failure:
other.cascadeFailure(promise: promise)
case .success(let u):
andlock.lock()
if let t = tvalue {
andlock.unlock()
promise.succeed(result: (t, u))
} else {
uvalue = u
andlock.unlock()
}
}
return CallbackList()
}
return CallbackList()
}

return promise.futureResult
Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ extension EventLoopFutureTest {
("testOrderOfFutureCompletion", testOrderOfFutureCompletion),
("testEventLoopHoppingInThen", testEventLoopHoppingInThen),
("testEventLoopHoppingInThenWithFailures", testEventLoopHoppingInThenWithFailures),
("testEventLoopHoppingAndAll", testEventLoopHoppingAndAll),
("testEventLoopHoppingAndAllWithFailures", testEventLoopHoppingAndAllWithFailures),
("testFutureInVariousScenarios", testFutureInVariousScenarios),
]
}
}
Expand Down
141 changes: 140 additions & 1 deletion Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ class EventLoopFutureTest : XCTestCase {
let n = 20
let elg = MultiThreadedEventLoopGroup(numThreads: n)
var prev: EventLoopFuture<Int> = elg.next().newSucceededFuture(result: 0)
(1..<20).forEach { (i: Int) in
(1..<n).forEach { (i: Int) in
let p: EventLoopPromise<Int> = elg.next().newPromise()
prev.then { (i2: Int) -> EventLoopFuture<Int> in
XCTAssertEqual(i - 1, i2)
Expand Down Expand Up @@ -241,4 +241,143 @@ class EventLoopFutureTest : XCTestCase {
}
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

func testEventLoopHoppingAndAll() throws {
let n = 20
let elg = MultiThreadedEventLoopGroup(numThreads: n)
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<()> in
elg.next().newPromise()
}
let allOfEm = EventLoopFuture<()>.andAll(ps.map { $0.futureResult }, eventLoop: elg.next())
ps.reversed().forEach { p in
DispatchQueue.global().async {
p.succeed(result: ())
}
}
try allOfEm.wait()
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

func testEventLoopHoppingAndAllWithFailures() throws {
enum DummyError: Error { case dummy }
let n = 20
let fireBackEl = MultiThreadedEventLoopGroup(numThreads: 1)
let elg = MultiThreadedEventLoopGroup(numThreads: n)
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<()> in
elg.next().newPromise()
}
let allOfEm = EventLoopFuture<()>.andAll(ps.map { $0.futureResult }, eventLoop: fireBackEl.next())
ps.reversed().enumerated().forEach { idx, p in
DispatchQueue.global().async {
if idx == n / 2 {
p.fail(error: DummyError.dummy)
} else {
p.succeed(result: ())
}
}
}
do {
try allOfEm.wait()
XCTFail("unexpected failure")
} catch _ as DummyError {
// ok
} catch {
XCTFail("unexpected error: \(error)")
}
XCTAssertNoThrow(try elg.syncShutdownGracefully())
XCTAssertNoThrow(try fireBackEl.syncShutdownGracefully())
}

func testFutureInVariousScenarios() throws {
enum DummyError: Error { case dummy0; case dummy1 }
let elg = MultiThreadedEventLoopGroup(numThreads: 2)
let el1 = elg.next()
let el2 = elg.next()
precondition(el1 !== el2)
let q1 = DispatchQueue(label: "q1")
let q2 = DispatchQueue(label: "q2")

// this determines which promise is fulfilled first (and (true, true) meaning they race)
for whoGoesFirst in [(false, true), (true, false), (true, true)] {
// this determines what EventLoops the Promises are created on
for eventLoops in [(el1, el1), (el1, el2), (el2, el1), (el2, el2)] {
// this determines if the promises fail or succeed
for whoSucceeds in [(false, false), (false, true), (true, false), (true, true)] {
let p0: EventLoopPromise<Int> = eventLoops.0.newPromise()
let p1: EventLoopPromise<String> = eventLoops.1.newPromise()
let fAll = p0.futureResult.and(p1.futureResult)

// preheat both queues so we have a better chance of racing
let sem1 = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value: 0)
let g = DispatchGroup()
q1.async(group: g) {
sem2.signal()
sem1.wait()
}
q2.async(group: g) {
sem1.signal()
sem2.wait()
}
g.wait()

if whoGoesFirst.0 {
q1.async {
if whoSucceeds.0 {
p0.succeed(result: 7)
} else {
p0.fail(error: DummyError.dummy0)
}
if !whoGoesFirst.1 {
q2.asyncAfter(deadline: .now() + 0.1) {
if whoSucceeds.1 {
p1.succeed(result: "hello")
} else {
p1.fail(error: DummyError.dummy1)
}
}
}
}
}
if whoGoesFirst.1 {
q2.async {
if whoSucceeds.1 {
p1.succeed(result: "hello")
} else {
p1.fail(error: DummyError.dummy1)
}
if !whoGoesFirst.0 {
q1.asyncAfter(deadline: .now() + 0.1) {
if whoSucceeds.0 {
p0.succeed(result: 7)
} else {
p0.fail(error: DummyError.dummy0)
}
}
}
}
}
do {
let result = try fAll.wait()
if !whoSucceeds.0 || !whoSucceeds.1 {
XCTFail("unexpected success")
} else {
XCTAssert((7, "hello") == result)
}
} catch let e as DummyError {
switch e {
case .dummy0:
XCTAssertFalse(whoSucceeds.0)
case .dummy1:
XCTAssertFalse(whoSucceeds.1)
}
} catch {
XCTFail("unexpected error: \(error)")
}
}
}
}

XCTAssertNoThrow(try elg.syncShutdownGracefully())
}
}

0 comments on commit 10ba901

Please # to comment.