kotlinx.coroutines: Coroutines launch on default dispatcher when they should not

In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using withContext and asCoroutineDispatcher to launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:

          val executor = ThreadPoolExecutor(
                affinity,
                MAX_IO_THREADS,
                DEFAULT_TTL_SECONDS_THREAD,
                TimeUnit.SECONDS,
                SynchronousQueue()
            ) { runnable ->
                Thread(runnable).also {
                    it.name = "io-pool-${it.id}"
                    it.isDaemon = true
                    it.priority = Thread.NORM_PRIORITY
                }
            }
          
          val dispatcher = executor.asCoroutineDispatcher()

Notice SynchronousQueue to block spinning off any further threads/calls in when pool is full. Turns out looking at code of ExecutorCoroutineDispatcherBase https://github.com/Kotlin/kotlinx.coroutines/blob/d7de5f5ba66a8d005e5cbd03b18522112303fd54/kotlinx-coroutines-core/jvm/src/Executors.kt#L80-L87 turns out we launch/enqueue or coroutine on DefaultExecutor. This has multiple problems:

  • It breaks the foundational assumption of respecting dispatcher that was supplied, worst part of this is that it’s undocumented caveat.
  • It can result in a flurry of undesired threads; ultimately leading to OOM with it’s existing behavior. Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.

Here is a simple example to reproduce this issue locally:

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
    Thread(runnable).also {
        it.name = "io-pool-${it.id}"
        it.isDaemon = true
        it.priority = Thread.NORM_PRIORITY
    }
}

fun main() {
    runBlocking(executor.asCoroutineDispatcher()) {
        for (i in 1..5) {
            launch {
                println(Thread.currentThread().name)
                Thread.sleep(1000)
            }
        }
    }
}

I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor is an undesired behavior and breaks fundamental assumption.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 83
  • Comments: 19 (14 by maintainers)

Commits related to this issue

Most upvoted comments

@maxpert The proper fix will appear in the next major release, since it affect some of the internal APIs. Meanwhile, you can build your own patched version from executor-reject branch if you badly need it.

See a proposed solution in #2012. No new APIs. Just cancel the Job on rejection. I don’t think it is worth over-engineering a solution and providing any further customization capabilities.