kotlinx.coroutines: Unexpected tryEmit behaviour

tryEmit doesn’t attempt to emit a value after first subscriber joined and returns false. Setting replay or extraBufferCapacity > 0 or replacing tryEmit by emit resolves the issue

    @Test
    fun tryEmitExample() = runBlocking {
        val sharedFlow = MutableSharedFlow<Long>()

        val asyncReceiver = async(){
            delay(300)
                sharedFlow.collect{
                    println("Received on 1 $it")
                }
            println("Done")
        }

        repeat(4) {
            delay(100)
            val res = sharedFlow.tryEmit(System.currentTimeMillis())
            println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value} try: $res")
        }
        asyncReceiver.cancel()
    }
Emitted 1605303026489 Subscribers: 0 try: true
Emitted 1605303026592 Subscribers: 0 try: true
Emitted 1605303026693 Subscribers: 1 try: false
Emitted 1605303026799 Subscribers: 1 try: false
    @Test
    fun emitExample() = runBlocking {
        val sharedFlow = MutableSharedFlow<Long>()

        val asyncReceiver = async(){
            delay(300)
            sharedFlow.collect{
                println("Received on 1 $it")
            }
            println("Done")
        }

        repeat(4) {
            delay(100)
            sharedFlow.emit(System.currentTimeMillis())
            println("Emitted ${System.currentTimeMillis()} Subscribers: ${sharedFlow.subscriptionCount.value}")
        }
        asyncReceiver.cancel()
    }
Emitted 1605303080955 Subscribers: 0
Emitted 1605303081061 Subscribers: 0
Received on 1 1605303081162
Emitted 1605303081166 Subscribers: 1
Received on 1 1605303081267
Emitted 1605303081267 Subscribers: 1

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 2
  • Comments: 17 (6 by maintainers)

Commits related to this issue

Most upvoted comments

@elizarov I would like to renew discussion about this one.

First of all, the documentation about default implementation of MutableSharedFlow() is positioned above SharedFlow. To me, this is completely unexpected. The way I usually look at the documentation is, that I cmd click on the type, that I am working with and look through documentation there. So I’d click on the invocation of MutableSharedFlow and read what it has to offer. That explanation is not present there at all, so I have completely overlooked it until I found this issue (which I had been specifically searching for).

I’d at least suggest to either move that piece of documentation above the function MutableSharedFlow, or if nothing else, add a link from documentation of MutableSharedFlow to documentation of SharedFlow with the explanation, that some concepts regarding the extraBufferCapacity and default values are explained in SharedFlow’s documentation. (I’d probably vote for the former solution)

Second, the default implementation of MutableSharedFlow is unexpected to me. Probably nobody wants that implementation. I can’t imagine there are many developers, who would expect, that if they create their flow like this: val testFlow = MutableSharedFlow<String>(), it means that all attempts to tryEmit() will immediately drop the event. Even if developers would expect that, it’s rarely the case that they would want that default behavior.

So I’d suggest to remove the default parameter values at least for replay and extraBufferCapacity parameters and require developers to provide some values of their own, to match their needs. Default values make sense if a sensible default is chosen, but personally I don’t believe replay=0, extraBufferCapacity=0 is a sensible default. With my suggestions, everybody will have to think about what values to choose and that way they’ll avoid bugs in advance. (Somebody just starting to learn about coroutines API is guaranteed to use default implementation of MutableSharedFlow and expect tryEmit to work, they won’t think twice about it)

What do you think about these suggestions?

Also, what do other devs think?

I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library and not carefully reading docs/making assumptions. But idk about this.

I think people who are new to coroutines and Kotlin Flow in general would be confused about this as well. I myself was surprised when I encountered it first (and I’m not even new to reactive frameworks or coroutines) and so was a colleague of mine. I think this is confusing to anyone with little experience.

I personally prefer if all the arguments are zero by default, if I didn’t ask for buffer capacity, don’t give me any, seems fair. I hate the fact that in the WhileSubscribed constructor, it has non zero default

You’re describing different situation. I am not suggesting to define non-zero default. I’m suggesting to force developer to select values by themselves, to force them to think about it.

In my projects a sensible default might be MutableSharedFlow(replay = 0, extraBufferCapacity = 1, bufferOverflow = BufferOverflow.DROP_OLDEST)

This might not serve everyone, so I don’t want this to be the default, but at least to me, I NEVER want MutableSharedFlow(replay = 0, extraBufferCapacity = 0, bufferOverflow = BufferOverflow.SUSPEND). It would render tryEmit completely useless, which I don’t want probably 100% of time. Should anybody find it necessary or useful, they will still be able to do so, they’ll just specify the buffer capacity, to ensure they understand implications of their decision.

tryEmit (unlike emit) is not a suspending function, so it clearly cannot operate without a buffer where it can store emitted value for all the suspending subscribers to process. On the other hand, emit is suspending, so it does not need buffer space, as it can always suspend in case any of the subscribers are not ready yet.

I just got bitten by this, spend a lot of time figuring out why tryEmit() doesn’t work. I think it should be documented properly or throw exception if trying to do tryEmit() with in proper setup because it basically unusable

Just my 2 cents.

I’m fairly use to Ctrl+Clicking up the inheritance hierarchy to find docs but maybe that’s me. Some links could be nice I guess.

When I first used mutable shared flow, I 100% expected that behaviour, at some point I went to check the docs to confirm that it was like this. The fact that tryEmit has try in the name should warrant suspicion as to when it can fail, encouraging you to read the docs. I was using it to replace the typical list of listeners pattern. If the list is empty, no one is listening, so no one gets the event and the event is dropped.

I personally prefer if all the arguments are zero by default, if I didn’t ask for buffer capacity, don’t give me any, seems fair. I hate the fact that in the WhileSubscribed constructor, it has non zero default, which to me was completely unexpected. I didn’t ask for any replay cache timeouts, why am I getting one? It should be zero. Luckily I read the docs and explicitly set it now.

I feel the only people who might get confused with flow default behaviour, are those coming from some other reactive library and not carefully reading docs/making assumptions. But idk about this. There haven’t been too many complaints about it.

It’s not a bug, it’s a feature… 😃 This kind of implementation feels to me counterintuitive.

Same problem here even I used coroutine for several years😂 I even thought that tryEmit equals launch { emit( value ) } in default MutableSharedFlow()

One more solution (I don’t like it but still) might be to just crash tryEmit when extraBufferCapacity = 0. IMHO these two together make no sense and developers should be informed about it.

It is far to easy to just forget to set extraBufferCapacity and finding this when fixing a bug in real world might get time consuming.

The above discussion definitely and proposal definitely has merit, but I would like to point out one key observation. The whole conundrum centers around tryEmit. If not for tryEmit, then the MutableSharedFlow defaults look reasonable.

Got bitten by this too.

I prefer tryEmit because I want to try emit something without coroutine scope, and get the emit result is success or not.

When the default MutableSharedFlow<Int>( replay = 0, bufferCapacity = 0, onBufferOverflow = BufferOverflow.SUSPEND) meet with tryEmit function, tryEmit function is not working as my expected.

I notificed that tryEmitLocked inside in tryEmit has logic below, which the problem is.

- if (bufferSize >= bufferCapacity && minCollectorIndex <= replayIndex) {
    when (onBufferOverflow) {
-     BufferOverflow.SUSPEND -> return false // will suspend
      BufferOverflow.DROP_LATEST -> return true // just drop incoming
      BufferOverflow.DROP_OLDEST -> {} // force enqueue & drop oldest instead
    }
  }

Since the default bufferCapacity value is 0, and the default onBufferOverflow value is BufferOverflow.SUSPEND when I create a shared flow with MutableSharedFlow<Int>(), so the tryEmitLocked function is terminated before emit action, thus make my collector could not receive the item which I try to emit.

@geek5nan, yes, it should be. but it should be 1 by default, otherwise people encounter unexpected errors like these.

Got bitten by this, I wish the default value for extraBufferCapacity was 1, and was overridden to 0 for the shareIn operator.

Looking at this again I guess one could argue that tryEmit should at least try to see if all subscribers are ready to receive without suspending.

There’s a general lack of “rendezvous” when it comes to SharedFlows. See #2818 .

Have a look at #2346