kotlinx.coroutines: IO thread pool for blocking calls

We need some kind of IO dispatcher in kotlinx.coroutines that will be optimized for offloading of blocking calls and will be backed by an unbounded thread-pool that is smart enough to create new threads only when needed and to shut them down on timeout. The goal is that you could always wrap blocking IO call in withContext(IO) { ... } (UPDATED: it was formerly named run(IO)) and be sure that you will not run into the problem of having not enough threads in your pool. However, it will be up to the user to control the maximal number of concurrent operations via some other means.

It will be somewhat conceptually similar to newThread() scheduler in Rx, but it will be also designed to be used where io() scheduler in Rx is used.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 43
  • Comments: 45 (33 by maintainers)

Commits related to this issue

Most upvoted comments

The expected future looks like this. There going to be only a single pool of threads in addition to UI thread.

  • Experimental coroutines scheduler becomes DefaultDispatcher by default (CommonPool' becomes deprecated). So, you launch { … }` CPU-bound task in the default dispatcher. Its default parallelism is equal to the number of cores.
  • Experimental coroutines scheduler backs IO dispatcher, too, but default parallelism of IO is effectively unbounded. You can do withContext(IO) { ... } without having switch to a different thread, but if too many threads get blocked on IO, then new threads will spin up automatically to help.
  • You can create additional dispatchers backed by the same thread pool with a custom limit on parallelism for doing things like synchronous DB access (see #261).

Let me clarify. Here is a problem we have: We need to be able to do both CPU-bound tasks and IO/blocking tasks with coroutines. The original proposal (this github issue) was to achieve this via a dedicated IO-tuned dispatcher, so you’d separate these two use-cases by choosing an appropriate dispatcher:

launch(CommonPool) { ... cpu-bound code ... }
launch(IO) { ... blocking IO code ... }

The alternative idea is to borrow Scala’s approach: instead of two separate dispatchers let’s have a single dispatcher (let’s call it DefaultDispatcher) and use it like this:

launch(DefaultDispatcher) { ... cpu-bound code ... }
launch(DefaultDispatcher) { blocking { ... blocking IO code ... } }

What I particularly like about it, is that with this approach is makes sense to truly make this DefaultDispatcher a global default for all coroutine builders and just write:

launch { ... cpu-bound code ... }
launch { blocking { ... blocking IO code ... } }

That is what I’m thinking about… For example, if you do boo(); blocking { foo() } in the “IO-aware” thread it can run in without switching threads (which is cool for performance reasons), however, if you do the same code in UI, then blocking { foo() } can suspend, switch to background thread, then resume back in UI thread.

@elizarov What’s the state of mind on this issue? Dispatchers.IO is as confusing as ever.

IMHO of all the names suggested above, Dispatchers.Elastic is the most direct and self-describing one. Also, it draws a useful parallel with Amazon Elastic Compute Cloud.

@maxd Unfortunately, there is no “one fits everybody” solution to cancelling blocking operations. Different blocking libraries support different approaches to cancellation or none at all. Some of them support interrupted flag, others crash and/or hang when their thread is interrupted. For those cases that do support interrupt we have the following issue open: https://github.com/Kotlin/kotlinx.coroutines/issues/57

I would not recommend to use the code you’ve provided in production code for two reasons:

  • It creates and leaks a thread with Executors.newSingleThreadExecutor() on each invocation.
  • It uses blocking future.get inside of async, thus unnecessarily blocking a thread in the default context.

As a work-around for your particular case I’d recommend doing this.

First, implement awaitInterruptible extension for CompletableFuture in a straightforward way:

suspend fun <T> CompletableFuture<T>.awaitInterruptible(): T =
    suspendCancellableCoroutine { cont ->
        whenComplete { value, exception ->
            when {
                exception != null -> cont.resumeWithException(exception)
                else -> cont.resume(value)
            }
        }
        cont.invokeOnCompletion { 
            cancel(true) // interrupt running!
        }
    }

I’ve submitted a related feature request: #259

Then define one shared thread-pool for all your blocking calls:

val threadPool = Executors.newCachedThreadPool() // this seems to be the best type of pool to use

Finally, you can define the following helper function to make conversion of your interruptible blocking calls to suspending cancellable functions straightforward:

suspend inline fun <T> blockingInterruptible(crossinline block: () -> T) =
    CompletableFuture.supplyAsync(Supplier { block() }, threadPool).awaitInterruptible()

Or ElasticPool, ExtendablePool? And withBlocking { }?

@fvasco It finally clicked into my mind when I understood how it shall interface with a scheduler. The difference from PR #83 is that it should not require a switch of context at all if the scheduler that currently runs coroutine natively supports blocking operations itself and counts the number of ongoing blocking operations to make decisions on creating new threads.