Skip to content

Commit

Permalink
Use a channel for QueryFlowValve instead of a MutableSharedFlow
Browse files Browse the repository at this point in the history
  • Loading branch information
tunjid committed Dec 28, 2023
1 parent 81edc46 commit 2b252e1
Showing 1 changed file with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package com.tunjid.tiler

import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flattenMerge
import kotlinx.coroutines.flow.flow
Expand Down Expand Up @@ -170,19 +170,16 @@ private class QueryFlowValve<Query, Item>(
) : suspend (Flow<Tile.Data<Query, Item>>?) -> Unit,
Flow<Tile.Data<Query, Item>> by fetcher.toOutputDataFlow(query) {

private val mutableSharedFlow = MutableSharedFlow<Flow<Tile.Data<Query, Item>>?>()
private val channel = Channel<Flow<Tile.Data<Query, Item>>?>()

val outputFlow: Flow<Tile.Output<Query, Item>> = mutableSharedFlow
val outputFlow: Flow<Tile.Output<Query, Item>> = channel
.consumeAsFlow()
.distinctUntilChanged()
.takeWhile { it != null }
.flatMapLatest { it ?: emptyFlow() }

override suspend fun invoke(flow: Flow<Tile.Data<Query, Item>>?) {
// Suspend till the downstream is connected
if (mutableSharedFlow.subscriptionCount.value < 1) {
mutableSharedFlow.subscriptionCount.first { it > 0 }
}
mutableSharedFlow.emit(flow)
channel.send(flow)
}
}

Expand Down

0 comments on commit 2b252e1

Please # to comment.