Skip to content

Commit d2ca6ab

Browse files
authored
Fix for value leaks its continuation (#2427)
1 parent a5fb580 commit d2ca6ab

File tree

2 files changed

+138
-5
lines changed

2 files changed

+138
-5
lines changed

RxSwift/Traits/PrimitiveSequence/PrimitiveSequence+Concurrency.swift

+37-5
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,21 @@ public extension PrimitiveSequenceType where Trait == SingleTrait {
2929
return try await withTaskCancellationHandler(
3030
operation: {
3131
try await withCheckedThrowingContinuation { continuation in
32+
var didResume = false
3233
disposable.setDisposable(
3334
self.subscribe(
34-
onSuccess: { continuation.resume(returning: $0) },
35-
onFailure: { continuation.resume(throwing: $0) }
35+
onSuccess: {
36+
didResume = true
37+
continuation.resume(returning: $0)
38+
},
39+
onFailure: {
40+
didResume = true
41+
continuation.resume(throwing: $0)
42+
},
43+
onDisposed: {
44+
guard !didResume else { return }
45+
continuation.resume(throwing: CancellationError())
46+
}
3647
)
3748
)
3849
}
@@ -69,16 +80,26 @@ public extension PrimitiveSequenceType where Trait == MaybeTrait {
6980
operation: {
7081
try await withCheckedThrowingContinuation { continuation in
7182
var didEmit = false
83+
var didResume = false
7284
disposable.setDisposable(
7385
self.subscribe(
7486
onSuccess: { value in
7587
didEmit = true
88+
didResume = true
7689
continuation.resume(returning: value)
7790
},
78-
onError: { error in continuation.resume(throwing: error) },
91+
onError: { error in
92+
didResume = true
93+
continuation.resume(throwing: error)
94+
},
7995
onCompleted: {
8096
guard !didEmit else { return }
97+
didResume = true
8198
continuation.resume(returning: nil)
99+
},
100+
onDisposed: {
101+
guard !didResume else { return }
102+
continuation.resume(throwing: CancellationError())
82103
}
83104
)
84105
)
@@ -114,10 +135,21 @@ public extension PrimitiveSequenceType where Trait == CompletableTrait, Element
114135
return try await withTaskCancellationHandler(
115136
operation: {
116137
try await withCheckedThrowingContinuation { continuation in
138+
var didResume = false
117139
disposable.setDisposable(
118140
self.subscribe(
119-
onCompleted: { continuation.resume() },
120-
onError: { error in continuation.resume(throwing: error) }
141+
onCompleted: {
142+
didResume = true
143+
continuation.resume()
144+
},
145+
onError: { error in
146+
didResume = true
147+
continuation.resume(throwing: error)
148+
},
149+
onDisposed: {
150+
guard !didResume else { return }
151+
continuation.resume(throwing: CancellationError())
152+
}
121153
)
122154
)
123155
}

Tests/RxSwiftTests/PrimitiveSequence+ConcurrencyTests.swift

+101
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,37 @@ extension PrimitiveSequenceConcurrencyTests {
4141
XCTAssertTrue(true)
4242
}
4343
}
44+
45+
func testSingleThrowsCancellationWithoutEvents() async throws {
46+
let single = Single<Void>.never()
47+
48+
Task {
49+
do {
50+
try await single.value
51+
XCTFail("Should not proceed beyond try")
52+
} catch {
53+
XCTAssertTrue(Task.isCancelled)
54+
XCTAssertTrue(error is CancellationError)
55+
}
56+
}.cancel()
57+
}
58+
59+
func testSingleNotThrowingCancellation() async throws {
60+
let single = Single.just(())
61+
62+
let task = Task {
63+
do {
64+
try await single.value
65+
XCTAssertTrue(true)
66+
} catch {
67+
XCTFail()
68+
}
69+
}
70+
71+
try await Task.sleep(nanoseconds: 1_000_000)
72+
task.cancel()
73+
}
74+
4475
}
4576

4677
// MARK: - Maybe
@@ -79,6 +110,49 @@ extension PrimitiveSequenceConcurrencyTests {
79110
XCTAssertTrue(true)
80111
}
81112
}
113+
114+
func testMaybeThrowsCancellationWithoutEvents() async throws {
115+
let maybe = Maybe<Void>.never()
116+
117+
Task {
118+
do {
119+
try await maybe.value
120+
XCTFail("Should not proceed beyond try")
121+
} catch {
122+
XCTAssertTrue(Task.isCancelled)
123+
XCTAssertTrue(error is CancellationError)
124+
}
125+
}.cancel()
126+
}
127+
128+
func testMaybeNotThrowingCancellationWhenCompleted() async throws {
129+
let maybe = Maybe<Int>.empty()
130+
131+
Task {
132+
do {
133+
let value = try await maybe.value
134+
XCTAssertNil(value)
135+
} catch {
136+
XCTFail("Should not throw an error")
137+
}
138+
}.cancel()
139+
}
140+
141+
func testMaybeNotThrowingCancellation() async throws {
142+
let maybe = Maybe.just(())
143+
144+
let task = Task {
145+
do {
146+
try await maybe.value
147+
XCTAssertTrue(true)
148+
} catch {
149+
XCTFail("Should not throw an error")
150+
}
151+
}
152+
153+
try await Task.sleep(nanoseconds: 1_000_000)
154+
task.cancel()
155+
}
82156
}
83157

84158
// MARK: - Completable
@@ -105,6 +179,33 @@ extension PrimitiveSequenceConcurrencyTests {
105179
XCTAssertTrue(true)
106180
}
107181
}
182+
183+
func testCompletableThrowsCancellationWithoutEvents() async throws {
184+
let completable = Completable.never()
185+
186+
Task {
187+
do {
188+
try await completable.value
189+
XCTFail()
190+
} catch {
191+
XCTAssertTrue(Task.isCancelled)
192+
XCTAssertTrue(error is CancellationError)
193+
}
194+
}.cancel()
195+
}
196+
197+
func testCompletableNotThrowingCancellation() async throws {
198+
let completable = Completable.empty()
199+
200+
Task {
201+
do {
202+
try await completable.value
203+
XCTAssertTrue(true)
204+
} catch {
205+
XCTFail("Should not throw an error")
206+
}
207+
}.cancel()
208+
}
108209
}
109210
#endif
110211

0 commit comments

Comments
 (0)