kotlinx.coroutines: Coroutines launch on default dispatcher when they should not
In one of our production services we had a fixed thread-pool for handling blocking DB calls, and we are using withContext and asCoroutineDispatcher to launch such calls in context of DB threads. These calls have to stick to these thread-pool we have created otherwise they fail:
val executor = ThreadPoolExecutor(
affinity,
MAX_IO_THREADS,
DEFAULT_TTL_SECONDS_THREAD,
TimeUnit.SECONDS,
SynchronousQueue()
) { runnable ->
Thread(runnable).also {
it.name = "io-pool-${it.id}"
it.isDaemon = true
it.priority = Thread.NORM_PRIORITY
}
}
val dispatcher = executor.asCoroutineDispatcher()
Notice SynchronousQueue to block spinning off any further threads/calls in when pool is full. Turns out looking at code of ExecutorCoroutineDispatcherBase https://github.com/Kotlin/kotlinx.coroutines/blob/d7de5f5ba66a8d005e5cbd03b18522112303fd54/kotlinx-coroutines-core/jvm/src/Executors.kt#L80-L87 turns out we launch/enqueue or coroutine on DefaultExecutor. This has multiple problems:
- It breaks the foundational assumption of respecting dispatcher that was supplied, worst part of this is that it’s undocumented caveat.
- It can result in a flurry of undesired threads; ultimately leading to OOM with it’s existing behavior. Having an unbound queue or unbound set of threads is exactly what should be avoided at all costs.
Here is a simple example to reproduce this issue locally:
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors
import java.util.concurrent.SynchronousQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
val executor = ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, SynchronousQueue()) { runnable ->
Thread(runnable).also {
it.name = "io-pool-${it.id}"
it.isDaemon = true
it.priority = Thread.NORM_PRIORITY
}
}
fun main() {
runBlocking(executor.asCoroutineDispatcher()) {
for (i in 1..5) {
launch {
println(Thread.currentThread().name)
Thread.sleep(1000)
}
}
}
}
I believe the subsequent co-routines should be rejected; launching by default on DefaultExecutor is an undesired behavior and breaks fundamental assumption.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 83
- Comments: 19 (14 by maintainers)
Commits related to this issue
- Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and ... — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and ... — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and ... — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects submitted task, it means that it had reached its capacity and ... — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException (#2012) When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its c... — committed to Kotlin/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException (#2012) When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its c... — committed to recheej/kotlinx.coroutines by elizarov 4 years ago
- Cancel current Job on RejectedExecutionException (#2012) When the Executor that was used with Executor.asCoroutineDispatcher() extension rejects the submitted task, it means that it had reached its c... — committed to recheej/kotlinx.coroutines by elizarov 4 years ago
@maxpert The proper fix will appear in the next major release, since it affect some of the internal APIs. Meanwhile, you can build your own patched version from
executor-rejectbranch if you badly need it.See a proposed solution in #2012. No new APIs. Just cancel the Job on rejection. I don’t think it is worth over-engineering a solution and providing any further customization capabilities.