kotlinx.coroutines: SharedFlow.emit() doesn't wait for a subscriber to complete collecting

SharedFlow doc says:

For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers process each event

So, with the following code

fun main() = runBlocking {
    val numbers = MutableSharedFlow<Int>()
    GlobalScope.launch {
        delay(1000)
        repeat(3) {
            println("emit $it")
            numbers.emit(it)
        }
    }
    GlobalScope.launch {
        numbers.collect {
            delay(1000)
            println("$it collected")
        }
    }.join()
}

we could expect the following output:

emit 0 0 collected emit 1 1 collected emit 2 2 collected

But the actual output is:

emit 0 emit 1 0 collected emit 2 1 collected 2 collected

Seems like the flow has an extra 1 size buffer and doesn’t suspend on first emit() call. Is it a bug, or I’m missing something?

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 35 (11 by maintainers)

Commits related to this issue

Most upvoted comments

Sorry for commenting in a closed thread, @elizarov , but I see the PR is just a tweak to the docs. Is there a way to make the SharedFlow suspend till all the subscribers have completed collecting, as we (mis)understood before? For me it would be a very good and easy way of syncronizing the emission of values through several flows.

Thanks. There’s already PR with a fix for the next release: #2437

@serandel Made an error while coding. Using val eventBus = MutableSharedFlow<String>() works just fine for parallel as well. My issue was that emit was called before launchIn actually started. Wrapping my code with the following code (just to confirm my theory) worked. And everything ran as expected.

    suspend fun subscribe(block: suspend (event: T) -> Unit) = suspendCoroutine {
        _events
            .onSubscription { it.resume(Unit) }
            .onEach { block(it) }
            .launchIn(scope)
        }
    }

I think you are right. After all, we have https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-sharing-started/-companion/-while-subscribed.html, which stops the operation altogether if no subscribers are present, which is much more intrusive than just waiting for every subscriber to receive the value.

Following @elizarov 's comment, if someone needs emit to really wait for all subscribers

class EventBus<T> {
    private val context = newSingleThreadContext("EventBus")
    private val scope = CoroutineScope(context)

    private val _events = MutableSharedFlow<T>()

    suspend fun emit(event: T) = withContext(context) {
        _events.emit(event)
    }

    fun subscribe(block: (event: T) -> Unit) = _events
        .onEach { block(it) }
        .launchIn(scope)
}

// Example
val eventBus = EventBus<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")

val job1 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("c")
println("c sent")

val job2 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("d")
println("d sent")

val job3 = eventBus
    .subscribe { event ->
        println("Received event: $event")
    }

eventBus.emit("e")
println("e sent")

job1.cancel()

eventBus.emit("f")
println("f sent")

job2.cancel()

eventBus.emit("g")
println("g sent")

job3.cancel()

It will output

a sent
b sent
Received event: c
c sent
Received event: d
Received event: d
d sent
Received event: e
Received event: e
Received event: e
e sent
Received event: f
Received event: f
f sent
Received event: g
g sent

Without using it newSingleThreadContext it would be

val eventBus = MutableSharedFlow<String>()
eventBus.emit("a")
println("a sent")
eventBus.emit("b")
println("b sent")

val job1 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("c")
println("c sent")

val job2 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("d")
println("d sent")

val job3 = eventBus
    .onEach { event ->
        println("Received event: $event")
    }
    .launchIn(this)

eventBus.emit("e")
println("e sent")

job1.cancel()

eventBus.emit("f")
println("f sent")

job2.cancel()

eventBus.emit("g")
println("g sent")

job3.cancel()

And will produce (notice that c is never consumed)

a sent
b sent
c sent
d sent
Received event: d
Received event: e
Received event: e
Received event: d
Received event: e
e sent
Received event: f
Received event: f
f sent
Received event: g
g sent

As I was mentioning, collectors are expected to suspend. Also, I don’t expect the immediate execution, I only expect emit to suspend until all collectors are run. Also, Unconfined is not confined (😃), and collectors require to be running in the EDT.

val events = MutableSharedFlow<String>() 
cs.launch { // inside some coroutine
  val data = prepareData()
  events.emit("prepared")
  // the following line should be invoked after all collectors are completed with "prepared" event,
  // as if the code is "sequential yet suspending" like all coroutines
  handle(data)
  events.emit("handled")
}

IMHO either documentation or implementation has to be changed since they don’t match each other at the moment. This is misleading and may significantly increase time of debugging.

Hi, @serandel Hope that @elizarov has more elegant solution, but for now I have some thoughts about that. We need some sort of “sync” event in our shared flow. It can be null, specific value or type. Then, we can modify an example above:

fun main() = runBlocking {
    val numbers = MutableSharedFlow<Int?>() // UPD: allow null as sync event
    GlobalScope.launch {
        delay(1000)
        repeat(3) {
            println("emit $it")
            numbers.emit(it)
            numbers.emit(null) // UPD: suspend until subscribers receive sync event
            // here we have a guarantee that business event collected, cause sync event received
        }
    }
    GlobalScope.launch {
        numbers.filterNotNull() // UPD: just skip sync event by filtering
            .collect {
                delay(1000)
                println("$it collected")
            }
    }.join()
}

This code will print desired sequential emit-collect output. Of course, we need to use unbuffered shared flow to get suspend behavior.

Another implementation with Channel for waiting instead of Mutex, and no ConcurrentHashMap that’s I think isn’t available in:

class WaitingSharedFlow<T>() : Flow<T> {
    private val allChannels = mutableSetOf<Channels<T>>()
    
    override suspend fun collect(collector: FlowCollector<T>) {
        val channels = Channels<T>()
        synchronized(allChannels) {
            allChannels += channels
        }
        
        try {
            while (true) {
                collector.emit(channels.data.receive())
                channels.done.send(Unit)
            }
        } finally {
            synchronized(allChannels) {
                allChannels -= channels.also { it.close() }
            }
        }
        
    }
    
    suspend fun emit(value: T) = coroutineScope {
        synchronized(allChannels) {} // Ensuring memory barrier with collectors.
        for (channels in allChannels) {
            launch {
                try {
                    channels.data.send(value)
                } catch (_: ClosedSendChannelException) {
                    return@launch
                }
                try {
                    channels.done.receive()
                } catch (_: ClosedReceiveChannelException) {}
            }
        }
    }
    
    private data class Channels<T>(val data: Channel<T> = Channel(), val done: Channel<Unit> = Channel()) {
        fun close() {
            data.close()
            done.close()
        }
    }
}

There’s quite a lot of people here, any chance there will be an official API for this request?

Actually the collect() returning immediately is a simple problem - since it’s a ShraedFlow we can just block forever there.

The worst problem is that collector.emit() is done in the wrong CoroutineContext - it should be done in the collect()'s context.

It looks like collect() returns immediately, which is incorrect. Sorry I just emoji’d instead of replying, dunno why I did that!

emit() seems to be waiting correctly without a mutex, making that part nicer than my solution. Maybe those 2 concepts can be merged.

This should work best for the mentioned case. Obviously, before any asynchronous flow transformation operators (aka Channel send receive)

class MutableSharedWaitFlow<T> : FlowCollector<T>, Flow<T> {

    private val collectorsMutex = Mutex()
    private val collectors = mutableSetOf<FlowCollector<T>>()

    override suspend fun emit(value: T) {
        val emitted = collectorsMutex.withLock { collectors }
        coroutineScope {
            emitted
                .map { collector ->
                    launch {
                        try {
                            collector.emit(value)
                        } catch (e: CancellationException) {
                            collectorsMutex.withLock { collectors.remove(collector) }
                        }
                    }
                }
                .joinAll()
        }
    }

    override suspend fun collect(collector: FlowCollector<T>) {
        collectorsMutex.withLock { collectors.add(collector) }
    }
}

@serandel it sounds like your use case is documented here: https://kotlinlang.org/docs/channels.html#fan-out

A possible solution in many cases is performing the collection in Dispatchers.Unconfined.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

fun main(): Unit = runBlocking {
    val sharedFlow = MutableSharedFlow<Int>()
    launch(Dispatchers.Unconfined) {
        sharedFlow.collect {
            println("Received $it")
            Thread.sleep(1000)
            println("Finished collecting $it")
        }
    }
    launch(Dispatchers.Unconfined) {
        sharedFlow.collect {
            println("Second collector: received $it")
            Thread.sleep(1000)
            println("Second collector: finished collecting $it")
        }
    }
    launch(Dispatchers.Default) {
        repeat(10) {
            println("Emitting $it")
            sharedFlow.emit(it)
            println("Emitted $it")
        }
    }
}

This way, the collection procedure is called from inside sharedFlow.emit(it), so emit never terminates until the callback is finished.

However, this is not a general solution:

  1. If emissions themselves happen from Dispatchers.Unconfined (including anywhere up the call stack!), this won’t work.
  2. It is only guaranteed that the immediate execution will happen until the first suspension. If there’s a delay or even just a yield in your collect procedure, emit will not wait for the whole callback to finish.
  3. Collecting in Dispatchers.Unconfined gives a stronger guarantee than “emit waits for the callbacks”: the callbacks are executed by it. This means that if there are several callbacks, they will be executed sequentially and never in parallel, which may or may not be exactly what you want.

Could the interested parties share whether the limitations of collecting in Dispatchers.Unconfined affect you and how?

If emitter does not suspend waiting for collectors, then I don’t understand why SharedFlow.emit has suspend modifier.

Well, if you look at the PR https://github.com/Kotlin/kotlinx.coroutines/pull/2437, the documentation initially said that, as rendezvous, the emission was suspending until all collectors processed the event. So I don’t think it’s breaking the mental model, really, when even the team thought it worked like that at first. 😉

SharedFlow (and, in consequence, StateFlow) is hot in the sense (IMHO) that its generation of values is independent on how many collectors you have, instead of every collector executing a different instance of the emitting code in parallel. If it has a rendezvous behaviour or not doesn’t affect this in my POV. Nevertheless, having a separate SharedFlow implementation instead of an option in the existing one, disabled by default, is perfectly fine as well.

Shall I open that ticket?

I don’t think this being an option on a SharedFlow is a good idea. It is stated everywhere that SharedFlow is, semantically, a hot stream of values, that is, it executes independently of the behavior of its subscribers. If we added an option to change that, this would go against the whole mental model of SharedFlow. Likewise, with StateFlow, which is also a hot flow, doing flow.value = x behaves in a fire-and-forget manner. What you’re asking for doesn’t fit the model of SharedFlow at all, in my opinion.

I think you should open a separate issue instead, for a Flow that broadcasts values to multiple subscribers and awaits their consumption before proceeding.

1.6.3, the problem is still there 😕 Perhaps is makes sense to reopen this issue?

Thanks, @j2esu!

I really like your solution for SharedFlow but unfortunately it messes up with StateFlow. 😕

Right now I’m playing with a decorator like this, but I’m not 100% sure I’m race conditions-free.

class RendezvousSharedFlow<T>(private val flow: MutableSharedFlow<T>) {
    private val rendezvous = Channel<Unit>()

    suspend fun emit(value: T) {
        flow.emit(value)

        repeat(flow.subscriptionCount.value) {
            rendezvous.receive()
        }
    }

    suspend fun collect(collector: suspend (T) -> Unit) {
        flow.collect {
                collector(it)
                rendezvous.send(Unit)
        }
    }
}