diff --git a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Convert.kt b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Convert.kt index 1dbec54300..315e753a7e 100644 --- a/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Convert.kt +++ b/reactive/kotlinx-coroutines-reactive/src/main/kotlin/kotlinx/coroutines/experimental/reactive/Convert.kt @@ -16,6 +16,7 @@ package kotlinx.coroutines.experimental.reactive +import kotlinx.coroutines.experimental.DefaultDispatcher import kotlinx.coroutines.experimental.channels.ReceiveChannel import org.reactivestreams.Publisher import kotlin.coroutines.experimental.CoroutineContext @@ -28,7 +29,7 @@ import kotlin.coroutines.experimental.CoroutineContext * * @param context -- the coroutine context from which the resulting observable is going to be signalled */ -public fun ReceiveChannel.asPublisher(context: CoroutineContext): Publisher = publish(context) { +public fun ReceiveChannel.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher = publish(context) { for (t in this@asPublisher) send(t) } diff --git a/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Convert.kt b/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Convert.kt index c343b5c264..9eeee9ceaa 100644 --- a/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Convert.kt +++ b/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Convert.kt @@ -1,5 +1,6 @@ package kotlinx.coroutines.experimental.reactor +import kotlinx.coroutines.experimental.DefaultDispatcher import kotlinx.coroutines.experimental.Deferred import kotlinx.coroutines.experimental.Job import kotlinx.coroutines.experimental.channels.ReceiveChannel @@ -16,7 +17,7 @@ import kotlin.coroutines.experimental.CoroutineContext * * @param context -- the coroutine context from which the resulting mono is going to be signalled */ -public fun Job.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.join() } +public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono = mono(context) { this@asMono.join() } /** * Converts this deferred value to the hot reactive mono that signals @@ -27,7 +28,7 @@ public fun Job.asMono(context: CoroutineContext): Mono = mono(context) { t * * @param context -- the coroutine context from which the resulting mono is going to be signalled */ -public fun Deferred.asMono(context: CoroutineContext): Mono = mono(context) { this@asMono.await() } +public fun Deferred.asMono(context: CoroutineContext = DefaultDispatcher): Mono = mono(context) { this@asMono.await() } /** * Converts a stream of elements received from the channel to the hot reactive flux. @@ -37,7 +38,7 @@ public fun Deferred.asMono(context: CoroutineContext): Mono = mono(co * * @param context -- the coroutine context from which the resulting flux is going to be signalled */ -public fun ReceiveChannel.asFlux(context: CoroutineContext): Flux = flux(context) { +public fun ReceiveChannel.asFlux(context: CoroutineContext = DefaultDispatcher): Flux = flux(context) { for (t in this@asFlux) send(t) } \ No newline at end of file diff --git a/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Flux.kt b/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Flux.kt index 30d3f3db46..607567a7d8 100644 --- a/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Flux.kt +++ b/reactive/kotlinx-coroutines-reactor/src/main/kotlin/kotlinx/coroutines/experimental/reactor/Flux.kt @@ -1,5 +1,6 @@ package kotlinx.coroutines.experimental.reactor +import kotlinx.coroutines.experimental.DefaultDispatcher import kotlinx.coroutines.experimental.channels.ProducerScope import kotlinx.coroutines.experimental.reactive.publish import reactor.core.publisher.Flux @@ -19,7 +20,8 @@ import kotlin.coroutines.experimental.CoroutineContext * | Normal completion or `close` without cause | `onComplete` * | Failure with exception or `close` with cause | `onError` */ +@JvmOverloads // for binary compatibility with older code compiled before context had a default fun flux( - context: CoroutineContext, + context: CoroutineContext = DefaultDispatcher, block: suspend ProducerScope.() -> Unit ): Flux = Flux.from(publish(context, block))