Skip to content

Commit e19ee04

Browse files
fvascoelizarov
authored andcommitted
Add DefaultDispatcher on some reactive operators
1 parent baf86ab commit e19ee04

File tree

3 files changed

+9
-5
lines changed
  • reactive
    • kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive
    • kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor

3 files changed

+9
-5
lines changed

reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Convert.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package kotlinx.coroutines.experimental.reactive
1818

19+
import kotlinx.coroutines.experimental.DefaultDispatcher
1920
import kotlinx.coroutines.experimental.channels.ReceiveChannel
2021
import org.reactivestreams.Publisher
2122
import kotlin.coroutines.experimental.CoroutineContext
@@ -28,7 +29,7 @@ import kotlin.coroutines.experimental.CoroutineContext
2829
*
2930
* @param context -- the coroutine context from which the resulting observable is going to be signalled
3031
*/
31-
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext): Publisher<T> = publish(context) {
32+
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher<T> = publish(context) {
3233
for (t in this@asPublisher)
3334
send(t)
3435
}

reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Convert.kt

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines.experimental.reactor
22

3+
import kotlinx.coroutines.experimental.DefaultDispatcher
34
import kotlinx.coroutines.experimental.Deferred
45
import kotlinx.coroutines.experimental.Job
56
import kotlinx.coroutines.experimental.channels.ReceiveChannel
@@ -16,7 +17,7 @@ import kotlin.coroutines.experimental.CoroutineContext
1617
*
1718
* @param context -- the coroutine context from which the resulting mono is going to be signalled
1819
*/
19-
public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
20+
public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = mono(context) { this@asMono.join() }
2021

2122
/**
2223
* Converts this deferred value to the hot reactive mono that signals
@@ -27,7 +28,7 @@ public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { t
2728
*
2829
* @param context -- the coroutine context from which the resulting mono is going to be signalled
2930
*/
30-
public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
31+
public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = mono(context) { this@asMono.await() }
3132

3233
/**
3334
* Converts a stream of elements received from the channel to the hot reactive flux.
@@ -37,7 +38,7 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
3738
*
3839
* @param context -- the coroutine context from which the resulting flux is going to be signalled
3940
*/
40-
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext): Flux<T> = flux(context) {
41+
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = flux(context) {
4142
for (t in this@asFlux)
4243
send(t)
4344
}

reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Flux.kt

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package kotlinx.coroutines.experimental.reactor
22

3+
import kotlinx.coroutines.experimental.DefaultDispatcher
34
import kotlinx.coroutines.experimental.channels.ProducerScope
45
import kotlinx.coroutines.experimental.reactive.publish
56
import reactor.core.publisher.Flux
@@ -19,7 +20,8 @@ import kotlin.coroutines.experimental.CoroutineContext
1920
* | Normal completion or `close` without cause | `onComplete`
2021
* | Failure with exception or `close` with cause | `onError`
2122
*/
23+
@JvmOverloads // for binary compatibility with older code compiled before context had a default
2224
fun <T> flux(
23-
context: CoroutineContext,
25+
context: CoroutineContext = DefaultDispatcher,
2426
block: suspend ProducerScope<T>.() -> Unit
2527
): Flux<T> = Flux.from(publish(context, block))

0 commit comments

Comments
 (0)