diff --git a/library/tiler/src/commonMain/kotlin/com/tunjid/tiler/ConcurrentListTiler.kt b/library/tiler/src/commonMain/kotlin/com/tunjid/tiler/ConcurrentListTiler.kt index 2a50635..0b0b53a 100644 --- a/library/tiler/src/commonMain/kotlin/com/tunjid/tiler/ConcurrentListTiler.kt +++ b/library/tiler/src/commonMain/kotlin/com/tunjid/tiler/ConcurrentListTiler.kt @@ -16,14 +16,14 @@ package com.tunjid.tiler +import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.AbstractFlow import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.consumeAsFlow import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.emitAll import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.flatMapLatest import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flow @@ -170,19 +170,16 @@ private class QueryFlowValve( ) : suspend (Flow>?) -> Unit, Flow> by fetcher.toOutputDataFlow(query) { - private val mutableSharedFlow = MutableSharedFlow>?>() + private val channel = Channel>?>() - val outputFlow: Flow> = mutableSharedFlow + val outputFlow: Flow> = channel + .consumeAsFlow() .distinctUntilChanged() .takeWhile { it != null } .flatMapLatest { it ?: emptyFlow() } override suspend fun invoke(flow: Flow>?) { - // Suspend till the downstream is connected - if (mutableSharedFlow.subscriptionCount.value < 1) { - mutableSharedFlow.subscriptionCount.first { it > 0 } - } - mutableSharedFlow.emit(flow) + channel.send(flow) } }