kotlinx.coroutines: Flow error handling and launchIn
Here is a proposal to address the pattern of code that is commonly found in UI code that needs to launch a coroutine collecting a flow and updating UI such that:
uiScope.launch {
responseDataFlow().collect { updateDispaly(it) }
}
launchIn
The first piece is launchIn(scope) operator that launches a coroutine collecting a flow and returns a Job. Hereby I propose that this operator does not take any lambda, so that the replacement for the original code pattern is:
responseDataFlow()
.onEach { updateDisplay(it) }
.launchIn(uiScope)
This usage pattern maintains a consistent rule that execution context specifications in flows always work on “upstream” flows (like in flowOn). Another reason for such a syntactic form will be apparent in the next sections.
onError
The most-generic basic handling operator is onError. It catches exception that happens in flow before this operator is applied and pass the caught exception to the supplied lambda. Example:
responseDataFlow()
.onEach { updateDisplay(it) }
.onError { e -> showErrorMessage(e) } // catch errors in response flow and updateDisplay
.launchIn(uiScope)
Notice how onError is written after onEach to resemble the regular try/catch code. Here it is important that collectIn takes no lambda and cannot fail, so writing onError before launchIn always catches all exceptions.
Implementation note: onError operator is already implemented in flow code, but now it is a private function called collectSafely that is used internally to implement other error-handling operators.
onCompletion
This operator calls its lambda whenever a flow completes for any reason, essentially working as try/finally:
responseDataFlow()
.onEach { updateDisplay(it) }
.onError { e -> showErrorMessage(e) }
.onCompletion { enableActionButtons() }
.launchIn(uiScope)
Advanced error handling
In addition to passing in an exception, onError operator also has FlowCollector receiver, which enables concise encoding of common error-handling patterns to replace an error with a value emitted to the flow. Some of those patterns are already provided as ready-to-use operators:
onErrorCollect(fallback: Flow<T>) = onError { emitAll(fallback) }
onErrorReturn(fallback: T) = onError { emit(fallback) }
TBD: Shall we rename them to onErrorEmitAll and onErrorEmit or leave them as is?
But onError can be used directly for more advanced case. For example, if there is a flow of some Response objects that support a Response.Failure case that wraps exception, then one can easily translate an exception in the upstream stream to a failed response:
responseDataFlow()
.onError { e -> emit(Response.Failure(e)) }
.onEach { updateDisplay(it) } // failure in repose are encoded as values in the flow
.launchIn(uiScope)
Retry
For consistency I also propose to rename retry to onErrorRetry.
Open questions
-
Shall
onErrorbe configured with(Throwable)->Booleanpredicate that defaults to “always true” just like the otheronErrorXxxoperators? It is not ideal to have a two-lambda function, but predicate here is a “strategy” that might be stored in a separate variable, which actually might result in quite a readable code likeonError(applicationErrors) { e -> displayAppErrorMessage(e) } -
onErrorvsonCompletion. One might envision a separate set ofonCompletionXxxoperators that are similar toonErrorXxxbut also perform the corresponding action on the normal completion of the flow, optionally accepting a(Throwable?)->Booleanpredicate. It is an open question if there are any use-cases to that.
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Comments: 23 (14 by maintainers)
Commits related to this issue
- Introducing Flow.onCompletion operator for declarative programming Prerequisite for #1263 — committed to Kotlin/kotlinx.coroutines by qwwdfsad 5 years ago
- Flow.launchIn operator Fixes #1263 — committed to Kotlin/kotlinx.coroutines by qwwdfsad 5 years ago
- Declarative flow operators (#1291) * Flow.onCompletion operator * Flow.launchIn operator Fixes #1263 — committed to Kotlin/kotlinx.coroutines by qwwdfsad 5 years ago
Yes. The goal is have more concise and more composable way to do
try/catch/finallyso that, for example, you can encapsulate error handling logic specific to your application as a flow operator and reuse it everywhere in a declarative way.Generally, using “bare”
try/catch/finallyin Kotlin is not very idiomatic. It is kind of “low-level” primitive that you usually find encapsulated inside higher-level operations. So far, coroutines are missing those “high-level” error-handling operations so you have to usetry/catch/finallyand the resulting code just looks “out-of-place” – a piece of imperative error-handling in sea of declarative code.I don’t see any easy way to enforce an order and I don’t see why it needs to enforced. It’s like
try/catch. You catch error in the code in the abovetryblock. We can tweak the naming to make “modifying” nature ofonErrormore explicit. We can even usecatch(instead ofonError) andfinally(instead ofonCompletion):(If go with this naming, we need to change the names of all the other
onErrorXxxoperators for consistency)Note, that you can use “error catching” operators multiple times to catch errors in differents parts of the flow:
catchdoesn’t terminate the downstream flow. It gets aFlowCollectorand can emit zero, one, or more values in response to an exception. It can also rethrow exceptions just like a regularcatchblock. This is already much simpler than the RxJava API which requires multiple operators to do these things. E.g.onErrorReturnItem(foo)iscatch { emit(foo) }.onErrorResumeNext(fooObservable)iscatch { emitAll(fooFlow) }.Flow solves a very specific problem: working with asynchronous streams of data. Treating everything as such streams is one way to write a reactive application, but it’s not the only way. I think it’s become the popular thing to do for Android because RxJava was much nicer to work with and compose than a bunch of simple callback APIs, but in a lot of cases simple coroutine features are nicer still. As @louiscad said, you can still use all the other, simpler coroutine APIs when the functionality they provide is all you need. However, there are times when you actually have a stream of data, and in those cases you want something as expressive and powerful as Flow or Reactive Streams. Every app is different, choose an architecture that makes sense for your app.
Plus I don’t see how these proposed APIs are more composable than plain
try/catch/finally? Why just notTry<T> or Result<T>?.if someone needs flow operator which handles errors globally there is no need extra
catchfinallyonNextit’s now possible to write it simply:also
When and why this should be used instead of just
launch(Dispatchers.Main) { collect { updateDisplay(it) } }?Flow / Streams in general is not a simple task to understand for a new programmer and by adding multiple ways to accomplish the same thing in the library makes more difficult to learn new concepts.
Ps. when we’re thinking in stream as a kind of streams of transformations (filtering, mapping) then APIs as
onNext,onErrorlooks like out of place.I understand that for current Android Java-ish developers this is cool and nice and it’s similar as RxJava, but please review this design and look for more opinions which are not affected by RxJava / Android users, at least defer it for later.
Pps. these are not stream operators they are not working on items on stream, but just are “terminating” callbacks. And stream (Flow) is designed for sequences of items not for a single item.
@antanas-arvasevicius see https://kotlinlang.org/api/latest/jvm/stdlib/kotlin/run-catching.html
Maybe I’m missing something, but why are we trying to simulate try/catch instead of using it? Ex: your ui code could be
I mean, maybe the proposed operators would be useful in other situations, but for the ui use-case this seems way more clean too me.
Agree with @streetsofboston, I think this inconsistency could potentially be confusing:
onEachandonCompletionare both just for performing side effects - they don’t transform the source flow in any way.onError, by default, doesn’t just allow performing side effects but also consumes the exception. This is more consistent withtry/catch(where you need to explicitly re-throw) but inconsistent with the other twoon*operators.I would rather keep
onErrorjust for side effects, and calling the proposed operator something likecatchError. Although I don’t think this operator is actually needed at all, its use cases are all covered byonErrorCollectalready (implementonErrorReturn, rethrow a different exception, etc.).Question: Is the order in which onError and onEach must be written enforced by onError returning a different type than onEach? Or is there another way the order can be enforced by the compiler.
If the order cannot be enforced, I’m a little worried about coding mistakes that are easy to make but hard to figure out.