RxJava: How to keep an Observable alive after onError?
(I’m learning reactive)
My source:
- I have a cold Observable produced by a third party library (retrofit)
- when subscribed upon it perform an http call to a rest service, parse the data and provide the value
- then it immediately onComplete
- if there’s an error it provide the error
This is what I want:
- “helper” singleton
- the helper provide an hot Observable for data read on a remote REST http service
- any subscriber should receive the last valid value emitted or nothing if the call has not finished yet
- onComplete should never be received
- the helper has a trigger refresh method to cause a new http call
- all subscribers should receive it automatically as a new event
- when an error occur subscribers should be able to handle in in catch / onError callback
- but they should keep receiving values after that
- if a new subscription is performed after an error it should get the previous non-error result if any, the error otherwise (or both eventually)
I’ve been able to achieve almost all of this playing with Subjects and RxRelay library. The last two points, in bold, are the one I can’t achieve.
In my helper class triggerRefresh
method I use RxRelay with a PublishRelay
and a ReplaySubject
with capacity 1
:
coldObservable
.materialize()
.subscribe(relay);
relay
.filter(notification -> notification.getKind() != Notification.Kind.OnCompleted)
.dematerialize()
.subscribe(subject);
I execute this code everytime I need to performe a new http call, I also keep a reference to the two subscription and unsubscribe them before doing it (I just didn’t included that part of the code).
as hot observable I simply pass subject.asObservable();
It works as I want unless an error happens.
When an error occur all the Observer stop receiving events even if I trigger a refresh.
How should I handle this kind of situation with rx?
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Reactions: 9
- Comments: 20 (4 by maintainers)
The pattern for this abersnaze is referring to is (swallowing all errors, you can also log etc inside the flatmap using either
doOnError(...)
or in theonErrorResumeNext(...)
):By trapping the error in the inner observable, all the outer subscription sees is the non-error cases so never terminates.
I’m facing the same issue, I feel like this is a fairly common use case. Take a simple stream of
click events
that youflatMap
to a stream of something you get from the network usingRetrofit
.OnError
I’d like to inform my view that something is wrong, but I would like my button to continue triggering network calls. In a more complex scenario, if I have multiple subscribers observing that observable, they will all stop working after anonError
, I’ll have to create a newobservable
and register all mysubscriber
again.Before the Observable is returned from the flatMap add on an
.onError...(...)
. It’s like the try/catch of Rx.That’s exactly it! Thank you both! @abersnaze @jamesgorman2
You are throwing from the method
observableThatMayThrow
that is outside of an RxJava chain. You could have just writtenflatMap(item -> { throw new RuntimeException(); })
which fails the mapper function and no inner source is ever involved.You should embed the failure in an RxJava flow via the
error()
operator, not throw it:This way, the error is signaled and
onErrorResumeNext
can turn it into an empty source.