kotlinx.coroutines: IO thread pool for blocking calls
We need some kind of IO dispatcher in kotlinx.coroutines that will be optimized for offloading of blocking calls and will be backed by an unbounded thread-pool that is smart enough to create new threads only when needed and to shut them down on timeout. The goal is that you could always wrap blocking IO call in withContext(IO) { ... } (UPDATED: it was formerly named run(IO)) and be sure that you will not run into the problem of having not enough threads in your pool. However, it will be up to the user to control the maximal number of concurrent operations via some other means.
It will be somewhat conceptually similar to newThread() scheduler in Rx, but it will be also designed to be used where io() scheduler in Rx is used.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Reactions: 43
- Comments: 45 (33 by maintainers)
Commits related to this issue
- Support for blocking calls inside coroutines (#79) — committed to fvasco/kotlinx.coroutines by deleted user 7 years ago
- Support for blocking calls inside coroutines (#79) — committed to fvasco/kotlinx.coroutines by deleted user 7 years ago
- Support for blocking calls inside coroutines (#79) — committed to fvasco/kotlinx.coroutines by deleted user 7 years ago
- Support for blocking calls inside coroutines (#79) — committed to fvasco/kotlinx.coroutines by deleted user 7 years ago
- Support for blocking calls inside coroutines (#79) — committed to fvasco/kotlinx.coroutines by deleted user 7 years ago
- Introduce IO dispatcher to offload blocking I/O-intensive tasks Fixes #79 — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Introduce IO dispatcher to offload blocking I/O-intensive tasks Fixes #79 — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Introduce IO dispatcher to offload blocking I/O-intensive tasks Fixes #79 — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Dispatchers are renamed and grouped in the Dispatchers object * Dispatchers.Default — a default dispatcher for background asynchronous tasks (currently backed by FJP commonPool, a new dispatcher in... — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Dispatchers are renamed and grouped in the Dispatchers object * Dispatchers.Default — a default dispatcher for background asynchronous tasks (currently backed by FJP commonPool, a new dispatcher in... — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Dispatchers are renamed and grouped in the Dispatchers object * Dispatchers.Default — a default dispatcher for background asynchronous tasks (currently backed by FJP commonPool, a new dispatcher in... — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
- Dispatchers are renamed and grouped in the Dispatchers object * Dispatchers.Default — a default dispatcher for background asynchronous tasks (currently backed by FJP commonPool, a new dispatcher in... — committed to Kotlin/kotlinx.coroutines by elizarov 6 years ago
The expected future looks like this. There going to be only a single pool of threads in addition to
UIthread.DefaultDispatcherby default (CommonPool' becomes deprecated). So, youlaunch { … }` CPU-bound task in the default dispatcher. Its default parallelism is equal to the number of cores.IOdispatcher, too, but default parallelism ofIOis effectively unbounded. You can dowithContext(IO) { ... }without having switch to a different thread, but if too many threads get blocked on IO, then new threads will spin up automatically to help.Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you’d separate these two use-cases by choosing an appropriate dispatcher:
The alternative idea is to borrow Scala’s approach: instead of two separate dispatchers let’s have a single dispatcher (let’s call it
DefaultDispatcher) and use it like this:What I particularly like about it, is that with this approach is makes sense to truly make this
DefaultDispatchera global default for all coroutine builders and just write:That is what I’m thinking about… For example, if you do
boo(); blocking { foo() }in the “IO-aware” thread it can run in without switching threads (which is cool for performance reasons), however, if you do the same code inUI, thenblocking { foo() }can suspend, switch to background thread, then resume back inUIthread.@elizarov What’s the state of mind on this issue?
Dispatchers.IOis as confusing as ever.IMHO of all the names suggested above,
Dispatchers.Elasticis the most direct and self-describing one. Also, it draws a useful parallel with Amazon Elastic Compute Cloud.@maxd Unfortunately, there is no “one fits everybody” solution to cancelling blocking operations. Different blocking libraries support different approaches to cancellation or none at all. Some of them support interrupted flag, others crash and/or hang when their thread is interrupted. For those cases that do support interrupt we have the following issue open: https://github.com/Kotlin/kotlinx.coroutines/issues/57
I would not recommend to use the code you’ve provided in production code for two reasons:
Executors.newSingleThreadExecutor()on each invocation.future.getinside ofasync, thus unnecessarily blocking a thread in the default context.As a work-around for your particular case I’d recommend doing this.
First, implement
awaitInterruptibleextension forCompletableFuturein a straightforward way:Then define one shared thread-pool for all your blocking calls:
Finally, you can define the following helper function to make conversion of your interruptible blocking calls to suspending cancellable functions straightforward:
Or
ElasticPool,ExtendablePool? AndwithBlocking { }?@fvasco It finally clicked into my mind when I understood how it shall interface with a scheduler. The difference from PR #83 is that it should not require a switch of context at all if the scheduler that currently runs coroutine natively supports blocking operations itself and counts the number of ongoing blocking operations to make decisions on creating new threads.