Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix EventLoopFuture.and's serious threading issues [alternative] #176

Merged
merged 1 commit into from
Mar 16, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions Sources/NIO/EventLoopFuture.swift
Original file line number Diff line number Diff line change
Expand Up @@ -635,45 +635,48 @@ extension EventLoopFuture {
/// of results. If either one fails, the combined `EventLoopFuture` will fail with
/// the first error encountered.
public func and<U>(_ other: EventLoopFuture<U>, file: StaticString = #file, line: UInt = #line) -> EventLoopFuture<(T,U)> {
let andlock = Lock()
let promise = EventLoopPromise<(T,U)>(eventLoop: eventLoop, file: file, line: line)
var tvalue: T?
var uvalue: U?


assert(self.eventLoop === promise.futureResult.eventLoop)
_whenComplete { () -> CallbackList in
switch self.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let t):
andlock.lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

less locks FTW!

if let u = uvalue {
andlock.unlock()
return promise._setValue(value: .success((t, u)))
} else {
andlock.unlock()
tvalue = t
}
}
return CallbackList()
}

other._whenComplete { () -> CallbackList in

let hopOver: EventLoopFuture<U>
if self.eventLoop === other.eventLoop {
hopOver = other
} else {
let hopOverP = EventLoopPromise<U>(eventLoop: self.eventLoop, file: file, line: line)
other.cascade(promise: hopOverP)
hopOver = hopOverP.futureResult
}
hopOver._whenComplete { () -> CallbackList in
assert(self.eventLoop.inEventLoop)
switch other.value! {
case .failure(let error):
return promise._setValue(value: .failure(error))
case .success(let u):
andlock.lock()
if let t = tvalue {
andlock.unlock()
return promise._setValue(value: .success((t, u)))
} else {
andlock.unlock()
uvalue = u
}
}
return CallbackList()
}

return promise.futureResult
}

Expand Down
3 changes: 3 additions & 0 deletions Tests/NIOTests/EventLoopFutureTest+XCTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ extension EventLoopFutureTest {
("testOrderOfFutureCompletion", testOrderOfFutureCompletion),
("testEventLoopHoppingInThen", testEventLoopHoppingInThen),
("testEventLoopHoppingInThenWithFailures", testEventLoopHoppingInThenWithFailures),
("testEventLoopHoppingAndAll", testEventLoopHoppingAndAll),
("testEventLoopHoppingAndAllWithFailures", testEventLoopHoppingAndAllWithFailures),
("testFutureInVariousScenarios", testFutureInVariousScenarios),
]
}
}
Expand Down
142 changes: 141 additions & 1 deletion Tests/NIOTests/EventLoopFutureTest.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//===----------------------------------------------------------------------===//

import XCTest
import Dispatch
@testable import NIO

enum EventLoopFutureTestError : Error {
Expand Down Expand Up @@ -213,7 +214,7 @@ class EventLoopFutureTest : XCTestCase {
let n = 20
let elg = MultiThreadedEventLoopGroup(numThreads: n)
var prev: EventLoopFuture<Int> = elg.next().newSucceededFuture(result: 0)
(1..<20).forEach { (i: Int) in
(1..<n).forEach { (i: Int) in
let p: EventLoopPromise<Int> = elg.next().newPromise()
prev.then { (i2: Int) -> EventLoopFuture<Int> in
XCTAssertEqual(i - 1, i2)
Expand Down Expand Up @@ -241,4 +242,143 @@ class EventLoopFutureTest : XCTestCase {
}
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

func testEventLoopHoppingAndAll() throws {
let n = 20
let elg = MultiThreadedEventLoopGroup(numThreads: n)
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<()> in
elg.next().newPromise()
}
let allOfEm = EventLoopFuture<()>.andAll(ps.map { $0.futureResult }, eventLoop: elg.next())
ps.reversed().forEach { p in
DispatchQueue.global().async {
p.succeed(result: ())
}
}
try allOfEm.wait()
XCTAssertNoThrow(try elg.syncShutdownGracefully())
}

func testEventLoopHoppingAndAllWithFailures() throws {
enum DummyError: Error { case dummy }
let n = 20
let fireBackEl = MultiThreadedEventLoopGroup(numThreads: 1)
let elg = MultiThreadedEventLoopGroup(numThreads: n)
let ps = (0..<n).map { (_: Int) -> EventLoopPromise<()> in
elg.next().newPromise()
}
let allOfEm = EventLoopFuture<()>.andAll(ps.map { $0.futureResult }, eventLoop: fireBackEl.next())
ps.reversed().enumerated().forEach { idx, p in
DispatchQueue.global().async {
if idx == n / 2 {
p.fail(error: DummyError.dummy)
} else {
p.succeed(result: ())
}
}
}
do {
try allOfEm.wait()
XCTFail("unexpected failure")
} catch _ as DummyError {
// ok
} catch {
XCTFail("unexpected error: \(error)")
}
XCTAssertNoThrow(try elg.syncShutdownGracefully())
XCTAssertNoThrow(try fireBackEl.syncShutdownGracefully())
}

func testFutureInVariousScenarios() throws {
enum DummyError: Error { case dummy0; case dummy1 }
let elg = MultiThreadedEventLoopGroup(numThreads: 2)
let el1 = elg.next()
let el2 = elg.next()
precondition(el1 !== el2)
let q1 = DispatchQueue(label: "q1")
let q2 = DispatchQueue(label: "q2")

// this determines which promise is fulfilled first (and (true, true) meaning they race)
for whoGoesFirst in [(false, true), (true, false), (true, true)] {
// this determines what EventLoops the Promises are created on
for eventLoops in [(el1, el1), (el1, el2), (el2, el1), (el2, el2)] {
// this determines if the promises fail or succeed
for whoSucceeds in [(false, false), (false, true), (true, false), (true, true)] {
let p0: EventLoopPromise<Int> = eventLoops.0.newPromise()
let p1: EventLoopPromise<String> = eventLoops.1.newPromise()
let fAll = p0.futureResult.and(p1.futureResult)

// preheat both queues so we have a better chance of racing
let sem1 = DispatchSemaphore(value: 0)
let sem2 = DispatchSemaphore(value: 0)
let g = DispatchGroup()
q1.async(group: g) {
sem2.signal()
sem1.wait()
}
q2.async(group: g) {
sem1.signal()
sem2.wait()
}
g.wait()

if whoGoesFirst.0 {
q1.async {
if whoSucceeds.0 {
p0.succeed(result: 7)
} else {
p0.fail(error: DummyError.dummy0)
}
if !whoGoesFirst.1 {
q2.asyncAfter(deadline: .now() + 0.1) {
if whoSucceeds.1 {
p1.succeed(result: "hello")
} else {
p1.fail(error: DummyError.dummy1)
}
}
}
}
}
if whoGoesFirst.1 {
q2.async {
if whoSucceeds.1 {
p1.succeed(result: "hello")
} else {
p1.fail(error: DummyError.dummy1)
}
if !whoGoesFirst.0 {
q1.asyncAfter(deadline: .now() + 0.1) {
if whoSucceeds.0 {
p0.succeed(result: 7)
} else {
p0.fail(error: DummyError.dummy0)
}
}
}
}
}
do {
let result = try fAll.wait()
if !whoSucceeds.0 || !whoSucceeds.1 {
XCTFail("unexpected success")
} else {
XCTAssert((7, "hello") == result)
}
} catch let e as DummyError {
switch e {
case .dummy0:
XCTAssertFalse(whoSucceeds.0)
case .dummy1:
XCTAssertFalse(whoSucceeds.1)
}
} catch {
XCTFail("unexpected error: \(error)")
}
}
}
}

XCTAssertNoThrow(try elg.syncShutdownGracefully())
}
}