kotlinx.coroutines: Unexpected tryEmit behaviour
tryEmit doesn’t attempt to emit a value after first subscriber joined and returns false.
Setting replay or extraBufferCapacity > 0 or replacing tryEmit by emit resolves the issue
@Test
fun tryEmitExample() = runBlocking {
val sharedFlow = MutableSharedFlow<Long>()
val asyncReceiver = async(){
delay(300)
sharedFlow.collect{
println("Received on 1 $it")
}
println("Done")
}
repeat(4) {
delay(100)
val res = sharedFlow.tryEmit(System.currentTimeMillis())
println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value} try: $res")
}
asyncReceiver.cancel()
}
Emitted 1605303026489 Subscribers: 0 try: true
Emitted 1605303026592 Subscribers: 0 try: true
Emitted 1605303026693 Subscribers: 1 try: false
Emitted 1605303026799 Subscribers: 1 try: false
@Test
fun emitExample() = runBlocking {
val sharedFlow = MutableSharedFlow<Long>()
val asyncReceiver = async(){
delay(300)
sharedFlow.collect{
println("Received on 1 $it")
}
println("Done")
}
repeat(4) {
delay(100)
sharedFlow.emit(System.currentTimeMillis())
println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value}")
}
asyncReceiver.cancel()
}
Emitted 1605303080955 Subscribers: 0
Emitted 1605303081061 Subscribers: 0
Received on 1 1605303081162
Emitted 1605303081166 Subscribers: 1
Received on 1 1605303081267
Emitted 1605303081267 Subscribers: 1
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 2
- Comments: 17 (6 by maintainers)
@elizarov I would like to renew discussion about this one.
First of all, the documentation about default implementation of
MutableSharedFlow()is positioned aboveSharedFlow. To me, this is completely unexpected. The way I usually look at the documentation is, that I cmd click on the type, that I am working with and look through documentation there. So I’d click on the invocation ofMutableSharedFlowand read what it has to offer. That explanation is not present there at all, so I have completely overlooked it until I found this issue (which I had been specifically searching for).I’d at least suggest to either move that piece of documentation above the function
MutableSharedFlow, or if nothing else, add a link from documentation ofMutableSharedFlowto documentation ofSharedFlowwith the explanation, that some concepts regarding theextraBufferCapacityand default values are explained inSharedFlow’s documentation. (I’d probably vote for the former solution)Second, the default implementation of
MutableSharedFlowis unexpected to me. Probably nobody wants that implementation. I can’t imagine there are many developers, who would expect, that if they create their flow like this:val testFlow = MutableSharedFlow<String>(), it means that all attempts totryEmit()will immediately drop the event. Even if developers would expect that, it’s rarely the case that they would want that default behavior.So I’d suggest to remove the default parameter values at least for
replayandextraBufferCapacityparameters and require developers to provide some values of their own, to match their needs. Default values make sense if a sensible default is chosen, but personally I don’t believereplay=0, extraBufferCapacity=0is a sensible default. With my suggestions, everybody will have to think about what values to choose and that way they’ll avoid bugs in advance. (Somebody just starting to learn about coroutines API is guaranteed to use default implementation of MutableSharedFlow and expecttryEmitto work, they won’t think twice about it)What do you think about these suggestions?
Also, what do other devs think?
I think people who are new to coroutines and Kotlin Flow in general would be confused about this as well. I myself was surprised when I encountered it first (and I’m not even new to reactive frameworks or coroutines) and so was a colleague of mine. I think this is confusing to anyone with little experience.
You’re describing different situation. I am not suggesting to define non-zero default. I’m suggesting to force developer to select values by themselves, to force them to think about it.
In my projects a sensible default might be
MutableSharedFlow(replay = 0, extraBufferCapacity = 1, bufferOverflow = BufferOverflow.DROP_OLDEST)This might not serve everyone, so I don’t want this to be the default, but at least to me, I NEVER want
MutableSharedFlow(replay = 0, extraBufferCapacity = 0, bufferOverflow = BufferOverflow.SUSPEND). It would rendertryEmitcompletely useless, which I don’t want probably 100% of time. Should anybody find it necessary or useful, they will still be able to do so, they’ll just specify the buffer capacity, to ensure they understand implications of their decision.tryEmit(unlikeemit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand,emitis suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.I just got bitten by this, spend a lot of time figuring out why tryEmit() doesn’t work. I think it should be documented properly or throw exception if trying to do tryEmit() with in proper setup because it basically unusable
Just my 2 cents.
I’m fairly use to Ctrl+Clicking up the inheritance hierarchy to find docs but maybe that’s me. Some links could be nice I guess.
When I first used mutable shared flow, I 100% expected that behaviour, at some point I went to check the docs to confirm that it was like this. The fact that
tryEmithastryin the name should warrant suspicion as to when it can fail, encouraging you to read the docs. I was using it to replace the typical list of listeners pattern. If the list is empty, no one is listening, so no one gets the event and the event is dropped.I personally prefer if all the arguments are zero by default, if I didn’t ask for buffer capacity, don’t give me any, seems fair. I hate the fact that in the
WhileSubscribedconstructor, it has non zero default, which to me was completely unexpected. I didn’t ask for any replay cache timeouts, why am I getting one? It should be zero. Luckily I read the docs and explicitly set it now.I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library and not carefully reading docs/making assumptions. But idk about this. There haven’t been too many complaints about it.
It’s not a bug, it’s a feature… 😃 This kind of implementation feels to me counterintuitive.
Same problem here even I used coroutine for several years😂 I even thought that
tryEmitequalslaunch { emit( value ) }in default MutableSharedFlow()One more solution (I don’t like it but still) might be to just crash
tryEmitwhenextraBufferCapacity = 0. IMHO these two together make no sense and developers should be informed about it.It is far to easy to just forget to set
extraBufferCapacityand finding this when fixing a bug in real world might get time consuming.The above discussion definitely and proposal definitely has merit, but I would like to point out one key observation. The whole conundrum centers around
tryEmit. If not fortryEmit, then theMutableSharedFlowdefaults look reasonable.Got bitten by this too.
I prefer
tryEmitbecause I want to try emit something without coroutine scope, and get the emit result is success or not.When the default
MutableSharedFlow<Int>( replay = 0, bufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND)meet withtryEmitfunction,tryEmitfunction is not working as my expected.I notificed that
tryEmitLockedinside intryEmithas logic below, which the problem is.Since the default
bufferCapacityvalue is 0, and the defaultonBufferOverflowvalue isBufferOverflow.SUSPENDwhen I create ashared flowwithMutableSharedFlow<Int>(), so thetryEmitLockedfunction is terminated before emit action, thus make my collector could not receive the item which I try to emit.@geek5nan, yes, it should be. but it should be 1 by default, otherwise people encounter unexpected errors like these.
Got bitten by this, I wish the default value for
extraBufferCapacitywas1, and was overridden to0for theshareInoperator.Looking at this again I guess one could argue that
tryEmitshould at least try to see if all subscribers are ready to receive without suspending.There’s a general lack of “rendezvous” when it comes to
SharedFlows. See #2818 .Have a look at #2346