Skip to content

Commit 5da1fd9

Browse files
committed
Introduce non-suspendable version of onClose
Fixes #341
1 parent e27723f commit 5da1fd9

File tree

8 files changed

+268
-2
lines changed

8 files changed

+268
-2
lines changed

Diff for: binary-compatibility-validator/reference-public-api/kotlinx-coroutines-core.txt

+1
Original file line numberDiff line numberDiff line change
@@ -661,6 +661,7 @@ public final class kotlinx/coroutines/experimental/channels/ChannelsKt {
661661
public static final fun maxWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
662662
public static final fun minWith (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Ljava/util/Comparator;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
663663
public static final fun none (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
664+
public static final fun onClose (Lkotlinx/coroutines/experimental/channels/Channel;Lkotlin/jvm/functions/Function1;)V
664665
public static final fun requireNoNulls (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;)Lkotlinx/coroutines/experimental/channels/ReceiveChannel;
665666
public static final fun single (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;
666667
public static final fun singleOrNull (Lkotlinx/coroutines/experimental/channels/ReceiveChannel;Lkotlin/coroutines/experimental/Continuation;)Ljava/lang/Object;

Diff for: common/kotlinx-coroutines-core-common/src/main/kotlin/kotlinx/coroutines/experimental/channels/Channels.kt

+14
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,20 @@ public suspend inline fun <E> ReceiveChannel<E>.consumeEach(action: (E) -> Unit)
149149
for (e in this) action(e)
150150
}
151151

152+
/**
153+
* Invokes the given block **synchronously** in the context of cancellation when channel or its job is cancelled.
154+
* If channel is already closed, block is invoked immediately.
155+
* [block] is invoked only once even if channel is cancelled or closed more than once
156+
*
157+
* @param block non-blocking callback which should be invoked on cancellation
158+
*/
159+
public inline fun Channel<*>.onClose(crossinline block: (Throwable?) -> Unit) {
160+
job.invokeOnCompletion { cause ->
161+
if (cause !is JobCancellationException || cause.cause != null) block(cause)
162+
else block(null)
163+
}
164+
}
165+
152166
/**
153167
* @suppress: **Deprecated**: binary compatibility with old code
154168
*/

Diff for: common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/BasicOperationsTest.kt

+53
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,59 @@ class BasicOperationsTest : TestBase() {
9191
testJobCancellation(it, IllegalStateException())
9292
}
9393

94+
@Test
95+
fun testOnClose() = testChannel {
96+
val channel = it.create()
97+
var invoked = false
98+
channel.onClose {
99+
assertNull(it)
100+
invoked = true
101+
}
102+
103+
channel.close()
104+
assertTrue(invoked, "Kind $it")
105+
}
106+
107+
@Test
108+
fun testOnCloseWithException() = testChannel {
109+
val channel = it.create()
110+
var invoked = false
111+
channel.onClose {
112+
assertTrue(it is IllegalStateException)
113+
invoked = true
114+
}
115+
116+
channel.close(IllegalStateException())
117+
assertTrue(invoked, "Kind $it")
118+
}
119+
120+
@Test
121+
fun testOnCloseWithJob() = testChannel {
122+
val channel = it.create()
123+
var invoked = false
124+
channel.onClose {
125+
assertTrue(it is IllegalArgumentException)
126+
invoked = true
127+
}
128+
129+
channel.job.cancel(IllegalArgumentException())
130+
assertTrue(invoked, "Kind $it")
131+
}
132+
133+
@Test
134+
fun testOnCloseMultipleTimes() = testChannel {
135+
val channel = it.create()
136+
var invoked = false
137+
channel.onClose {
138+
assertFalse(invoked)
139+
invoked = true
140+
}
141+
142+
assertTrue(channel.job.cancel())
143+
assertFalse(channel.job.cancel())
144+
assertTrue(invoked, "Kind $it")
145+
}
146+
94147
private suspend inline fun <reified T : Exception> testJobCancellation(kind: TestChannelKind, exception: T? = null) {
95148
var channel: Channel<Int>? = null
96149
val producer = async(coroutineContext) {

Diff for: common/kotlinx-coroutines-core-common/src/test/kotlin/kotlinx/coroutines/experimental/channels/TestChannelKind.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ private class ChannelViaBroadcast<E>(
6464
private val broadcast: BroadcastChannel<E>
6565
): Channel<E>, SendChannel<E> by broadcast {
6666
val sub = broadcast.openSubscription()
67-
67+
override val job: Job = sub.job
6868
override val isClosedForReceive: Boolean get() = sub.isClosedForReceive
6969
override val isEmpty: Boolean get() = sub.isEmpty
7070

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
18+
package guide.channel.example11
19+
20+
import kotlinx.coroutines.experimental.*
21+
import kotlinx.coroutines.experimental.channels.*
22+
23+
fun main(args: Array<String>) = runBlocking<Unit> {
24+
val channel = Channel<Int>()
25+
println("Registering onClose in thread ${Thread.currentThread().name}")
26+
channel.onClose {
27+
println("Invoking onClose in thread ${Thread.currentThread().name}")
28+
}
29+
30+
val job = launch(newSingleThreadContext("ClosingThread")) {
31+
channel.close()
32+
}
33+
34+
job.join()
35+
}

Diff for: core/kotlinx-coroutines-core/src/test/kotlin/guide/test/GuideTest.kt

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
// This file was automatically generated from coroutines-guide.md by Knit tool. Do not edit.
22
package guide.test
33

4-
import org.junit.Test
4+
import org.junit.*
55

66
class GuideTest {
77

@@ -291,6 +291,14 @@ class GuideTest {
291291
)
292292
}
293293

294+
@Test
295+
fun testGuideChannelExample11() {
296+
test("GuideChannelExample11") { guide.channel.example11.main(emptyArray()) }.verifyLinesStart(
297+
"Registering onClose in thread main @coroutine#1",
298+
"Invoking onClose in thread ClosingThread @coroutine#2"
299+
)
300+
}
301+
294302
@Test
295303
fun testGuideChannelExample03() {
296304
test("GuideChannelExample03") { guide.channel.example03.main(emptyArray()) }.verifyLines(
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright 2016-2017 JetBrains s.r.o.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package kotlinx.coroutines.experimental.channels
18+
19+
import com.devexperts.dxlab.lincheck.*
20+
import com.devexperts.dxlab.lincheck.annotations.*
21+
import com.devexperts.dxlab.lincheck.paramgen.*
22+
import com.devexperts.dxlab.lincheck.stress.*
23+
import kotlinx.coroutines.experimental.*
24+
import kotlinx.coroutines.experimental.selects.*
25+
import org.junit.*
26+
import java.io.*
27+
28+
@Param(name = "value", gen = IntGen::class, conf = "1:3")
29+
class SelectChannelLinearizabilityTest : TestBase() {
30+
31+
private val lt = LinTesting()
32+
private var capacity = 0
33+
private lateinit var channel: Channel<Int>
34+
35+
@Reset
36+
fun reset() {
37+
channel = Channel(capacity)
38+
}
39+
40+
@Operation(runOnce = true)
41+
fun onSend1(@Param(name = "value") value: Int) = onSend(value, "onSend1")
42+
43+
@Operation(runOnce = true)
44+
fun onSend2(@Param(name = "value") value: Int) = onSend(value, "onSend2")
45+
46+
47+
private fun onSend(value: Int, name: String): List<OpResult> {
48+
return lt.run(name) {
49+
select<Unit> {
50+
channel.onSend(value) {}
51+
}
52+
}
53+
}
54+
55+
@Operation(runOnce = true)
56+
fun close1() = lt.run("close1") { channel.close(IOException("close1")) }
57+
58+
@Operation(runOnce = true)
59+
fun close2() = lt.run("close2") { channel.close(IOException("close2")) }
60+
61+
@Operation(runOnce = true)
62+
fun onReceive1() = onReceive("onReceive1")
63+
64+
@Operation(runOnce = true)
65+
fun onReceive2() = onReceive("onReceive2")
66+
67+
private fun onReceive(name: String): List<OpResult> {
68+
return lt.run(name) {
69+
select<Int> {
70+
channel.onReceive {
71+
it
72+
}
73+
}
74+
}
75+
}
76+
77+
@Operation(runOnce = true)
78+
fun onClose1() = onClose("onClose1")
79+
80+
@Operation(runOnce = true)
81+
fun onClose2() = onClose("onClose2")
82+
83+
private fun onClose(name: String): List<OpResult> {
84+
return lt.run(name) {
85+
select<Unit> {
86+
channel.onClose {
87+
try {
88+
val result = channel.offer(42)
89+
throw AssertionError("Offer in 'onClose' should throw, but instead returned $result")
90+
} catch (e: Exception) {
91+
// Should happen
92+
}
93+
}
94+
}
95+
}
96+
}
97+
98+
@Test
99+
fun testRendezvousChannelLinearizability() {
100+
runTest(0)
101+
}
102+
103+
@Test
104+
fun testArrayChannelLinearizability() {
105+
for (i in listOf(1, 2, 16)) {
106+
runTest(i)
107+
}
108+
}
109+
110+
@Test
111+
fun testConflatedChannelLinearizability() = runTest(Channel.CONFLATED)
112+
113+
@Test
114+
fun testUnlimitedChannelLinearizability() = runTest(Channel.UNLIMITED)
115+
116+
private fun runTest(capacity: Int) {
117+
this.capacity = capacity
118+
val options = StressOptions()
119+
.iterations(100)
120+
.invocationsPerIteration(200 * stressTestMultiplier)
121+
.addThread(1, 3)
122+
.addThread(1, 3)
123+
.addThread(1, 3)
124+
.addThread(1, 3)
125+
.verifier(LinVerifier::class.java)
126+
LinChecker.check(SelectChannelLinearizabilityTest::class.java, options)
127+
}
128+
}

Diff for: coroutines-guide.md

+27
Original file line numberDiff line numberDiff line change
@@ -1340,6 +1340,32 @@ fun main(args: Array<String>) = runBlocking<Unit> {
13401340
Done!
13411341
-->
13421342

1343+
Additionally, it's possible to invoke a specific listener which is fired when the channel is closed using [onClose].
1344+
Note that listener is invoked synchronously in the context of close:
1345+
1346+
```kotlin
1347+
fun main(args: Array<String>) = runBlocking<Unit> {
1348+
val channel = Channel<Int>()
1349+
println("Registering onClose in thread ${Thread.currentThread().name}")
1350+
channel.onClose {
1351+
println("Invoking onClose in thread ${Thread.currentThread().name}")
1352+
}
1353+
1354+
val job = launch(newSingleThreadContext("ClosingThread")) {
1355+
channel.close()
1356+
}
1357+
1358+
job.join()
1359+
}
1360+
```
1361+
1362+
You can get full code [here](core/kotlinx-coroutines-core/src/test/kotlin/guide/example-channel-11.kt)
1363+
1364+
<!--- TEST LINES_START
1365+
Registering onClose in thread main @coroutine#1
1366+
Invoking onClose in thread ClosingThread @coroutine#2
1367+
-->
1368+
13431369
### Building channel producers
13441370

13451371
The pattern where a coroutine is producing a sequence of elements is quite common.
@@ -2441,6 +2467,7 @@ Channel was closed
24412467
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/send.html
24422468
[ReceiveChannel.receive]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-receive-channel/receive.html
24432469
[SendChannel.close]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-send-channel/close.html
2470+
[onClose]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/on-close.html
24442471
[produce]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/produce.html
24452472
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/consume-each.html
24462473
[Channel()]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental.channels/-channel.html

0 commit comments

Comments
 (0)