Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Reactive default context #174

Merged
merged 1 commit into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package kotlinx.coroutines.experimental.reactive

import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import org.reactivestreams.Publisher
import kotlin.coroutines.experimental.CoroutineContext
Expand All @@ -28,7 +29,7 @@ import kotlin.coroutines.experimental.CoroutineContext
*
* @param context -- the coroutine context from which the resulting observable is going to be signalled
*/
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext): Publisher<T> = publish(context) {
public fun <T> ReceiveChannel<T>.asPublisher(context: CoroutineContext = DefaultDispatcher): Publisher<T> = publish(context) {
for (t in this@asPublisher)
send(t)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kotlinx.coroutines.experimental.reactor

import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.ReceiveChannel
Expand All @@ -16,7 +17,7 @@ import kotlin.coroutines.experimental.CoroutineContext
*
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { this@asMono.join() }
public fun Job.asMono(context: CoroutineContext = DefaultDispatcher): Mono<Unit> = mono(context) { this@asMono.join() }

/**
* Converts this deferred value to the hot reactive mono that signals
Expand All @@ -27,7 +28,7 @@ public fun Job.asMono(context: CoroutineContext): Mono<Unit> = mono(context) { t
*
* @param context -- the coroutine context from which the resulting mono is going to be signalled
*/
public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(context) { this@asMono.await() }
public fun <T> Deferred<T?>.asMono(context: CoroutineContext = DefaultDispatcher): Mono<T> = mono(context) { this@asMono.await() }

/**
* Converts a stream of elements received from the channel to the hot reactive flux.
Expand All @@ -37,7 +38,7 @@ public fun <T> Deferred<T?>.asMono(context: CoroutineContext): Mono<T> = mono(co
*
* @param context -- the coroutine context from which the resulting flux is going to be signalled
*/
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext): Flux<T> = flux(context) {
public fun <T> ReceiveChannel<T>.asFlux(context: CoroutineContext = DefaultDispatcher): Flux<T> = flux(context) {
for (t in this@asFlux)
send(t)
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package kotlinx.coroutines.experimental.reactor

import kotlinx.coroutines.experimental.DefaultDispatcher
import kotlinx.coroutines.experimental.channels.ProducerScope
import kotlinx.coroutines.experimental.reactive.publish
import reactor.core.publisher.Flux
Expand All @@ -19,7 +20,8 @@ import kotlin.coroutines.experimental.CoroutineContext
* | Normal completion or `close` without cause | `onComplete`
* | Failure with exception or `close` with cause | `onError`
*/
@JvmOverloads // for binary compatibility with older code compiled before context had a default
fun <T> flux(
context: CoroutineContext,
context: CoroutineContext = DefaultDispatcher,
block: suspend ProducerScope<T>.() -> Unit
): Flux<T> = Flux.from(publish(context, block))