kotlinx.coroutines: SharedFlow.emit() doesn't wait for a subscriber to complete collecting
SharedFlow doc says:
For example, the following class encapsulates an event bus that distributes events to all subscribers in a rendezvous manner, suspending until all subscribers process each event
So, with the following code
fun main() = runBlocking {
val numbers = MutableSharedFlow<Int>()
GlobalScope.launch {
delay(1000)
repeat(3) {
println("emit $it")
numbers.emit(it)
}
}
GlobalScope.launch {
numbers.collect {
delay(1000)
println("$it collected")
}
}.join()
}
we could expect the following output:
emit 0 0 collected emit 1 1 collected emit 2 2 collected
But the actual output is:
emit 0 emit 1 0 collected emit 2 1 collected 2 collected
Seems like the flow has an extra 1 size buffer and doesn’t suspend on first emit() call. Is it a bug, or I’m missing something?
About this issue
- Original URL
- State: open
- Created 3 years ago
- Comments: 35 (11 by maintainers)
Commits related to this issue
- IDEA-312013 don't rely on shared flow hack in FUS logger Hack in question: https://github.com/Kotlin/kotlinx.coroutines/issues/2603#issuecomment-808859170 It was intended to ensure that the emitted v... — committed to JetBrains/intellij-community by dovchinnikov a year ago
- fix(common): ConcurrentModificationException when generating mod sets config screen fix #6 The kotlin flow emit won't waiting for subscribers collected. https://github.com/Kotlin/kotlinx.coroutines/i... — committed to SettingDust/ModSets by SettingDust a year ago
Sorry for commenting in a closed thread, @elizarov , but I see the PR is just a tweak to the docs. Is there a way to make the SharedFlow suspend till all the subscribers have completed collecting, as we (mis)understood before? For me it would be a very good and easy way of syncronizing the emission of values through several flows.
Thanks. There’s already PR with a fix for the next release: #2437
@serandel Made an error while coding. Using
val eventBus = MutableSharedFlow<String>()works just fine for parallel as well. My issue was thatemitwas called beforelaunchInactually started. Wrapping my code with the following code (just to confirm my theory) worked. And everything ran as expected.I think you are right. After all, we have https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.flow/-sharing-started/-companion/-while-subscribed.html, which stops the operation altogether if no subscribers are present, which is much more intrusive than just waiting for every subscriber to receive the value.
Following @elizarov 's comment, if someone needs
emitto really wait for all subscribersIt will output
Without using it
newSingleThreadContextit would beAnd will produce (notice that
cis never consumed)As I was mentioning, collectors are expected to
suspend. Also, I don’t expect the immediate execution, I only expectemitto suspend until all collectors are run. Also,Unconfinedis not confined (😃), and collectors require to be running in the EDT.IMHO either documentation or implementation has to be changed since they don’t match each other at the moment. This is misleading and may significantly increase time of debugging.
Hi, @serandel Hope that @elizarov has more elegant solution, but for now I have some thoughts about that. We need some sort of “sync” event in our shared flow. It can be null, specific value or type. Then, we can modify an example above:
This code will print desired sequential emit-collect output. Of course, we need to use unbuffered shared flow to get suspend behavior.
Another implementation with
Channelfor waiting instead ofMutex, and noConcurrentHashMapthat’s I think isn’t available in:There’s quite a lot of people here, any chance there will be an official API for this request?
Actually the
collect()returning immediately is a simple problem - since it’s a ShraedFlow we can just block forever there.The worst problem is that
collector.emit()is done in the wrongCoroutineContext- it should be done in thecollect()'s context.It looks like
collect()returns immediately, which is incorrect. Sorry I just emoji’d instead of replying, dunno why I did that!emit()seems to be waiting correctly without a mutex, making that part nicer than my solution. Maybe those 2 concepts can be merged.This should work best for the mentioned case. Obviously, before any asynchronous flow transformation operators (aka Channel send receive)
@serandel it sounds like your use case is documented here: https://kotlinlang.org/docs/channels.html#fan-out
A possible solution in many cases is performing the collection in
Dispatchers.Unconfined.This way, the collection procedure is called from inside
sharedFlow.emit(it), soemitnever terminates until the callback is finished.However, this is not a general solution:
Dispatchers.Unconfined(including anywhere up the call stack!), this won’t work.delayor even just ayieldin yourcollectprocedure,emitwill not wait for the whole callback to finish.Dispatchers.Unconfinedgives a stronger guarantee than “emitwaits for the callbacks”: the callbacks are executed by it. This means that if there are several callbacks, they will be executed sequentially and never in parallel, which may or may not be exactly what you want.Could the interested parties share whether the limitations of collecting in
Dispatchers.Unconfinedaffect you and how?If emitter does not suspend waiting for collectors, then I don’t understand why
SharedFlow.emithassuspendmodifier.Well, if you look at the PR https://github.com/Kotlin/kotlinx.coroutines/pull/2437, the documentation initially said that, as rendezvous, the emission was suspending until all collectors processed the event. So I don’t think it’s breaking the mental model, really, when even the team thought it worked like that at first. 😉
SharedFlow (and, in consequence, StateFlow) is hot in the sense (IMHO) that its generation of values is independent on how many collectors you have, instead of every collector executing a different instance of the emitting code in parallel. If it has a rendezvous behaviour or not doesn’t affect this in my POV. Nevertheless, having a separate SharedFlow implementation instead of an option in the existing one, disabled by default, is perfectly fine as well.
Shall I open that ticket?
I don’t think this being an option on a SharedFlow is a good idea. It is stated everywhere that SharedFlow is, semantically, a hot stream of values, that is, it executes independently of the behavior of its subscribers. If we added an option to change that, this would go against the whole mental model of SharedFlow. Likewise, with StateFlow, which is also a hot flow, doing
flow.value = xbehaves in a fire-and-forget manner. What you’re asking for doesn’t fit the model of SharedFlow at all, in my opinion.I think you should open a separate issue instead, for a
Flowthat broadcasts values to multiple subscribers and awaits their consumption before proceeding.1.6.3, the problem is still there 😕 Perhaps is makes sense to reopen this issue?
Thanks, @j2esu!
I really like your solution for SharedFlow but unfortunately it messes up with StateFlow. 😕
Right now I’m playing with a decorator like this, but I’m not 100% sure I’m race conditions-free.