RxJava: IO Scheduler breaks long running Subscriber onNext
I’m new to Rx and I’m having trouble finding a solution with a scenario. I’m trying to make a network call, parse the result into objects, and send it to the UI. Parsing the response is quick and updating the UI is slower. The problem is that there’s backpressure. Using onBackpressureBuffer doesn’t fix the problem, and it doesn’t even get rid of the MissingBackpressureException. When I tried to reduce the problem in a unit test, I found that I was able to get it to work by removing the subscribeOn(Schedulers.io()) line. I would expect this unit test to either fail or write “12345” to the consooe. Instead, it writes “1” and stops without any exceptions.
I’m also not sure why using a backpressure buffer isn’t working, so there may be two problems. At this point, all I know is that I’m in over my head and I can’t find out any more information on the web. Am I doing this wrong or are there bugs here?
Observable.from(new Integer[]{1, 2, 3, 4, 5})
.subscribeOn(Schedulers.io())
.subscribe(new Subscriber<Integer>() {
@Override public void onCompleted() { }
@Override public void onError(Throwable e) {
Assert.fail("error: " + e.getMessage());
}
@Override
public void onNext(Integer integer) {
try {
System.out.print(integer);
Thread.sleep(100);
} catch (InterruptedException e) {
System.out.print("interrupted");
Assert.fail(e.getMessage());
}
}
});
About this issue
- Original URL
- State: closed
- Created 9 years ago
- Comments: 32 (22 by maintainers)
Commits related to this issue
- Clarify that operator chaining order matters ReactiveX/RxJava#3152 — committed to ReactiveX/reactivex.github.io by DavidMGross 9 years ago
- clarify observeOn/subscribeOn docs fixes #135 see also ReactiveX/RxJava#3152 — committed to ReactiveX/reactivex.github.io by DavidMGross 9 years ago
I like to explain
subscribeOn
as being the iterable dual of putting theexecute()
outside the for loop.that is why
subscribeOn()
affects what thread the whole Observable chain runs on from creation to processing of the data.Conversely
observeOn()
is inside the for loop and why it only affects what thread the work after is done on.It is important to keep in mind that a
Worker
from aScheduler
will not let scheduled items execute concurrently where anExecutor
doesn’t make that guarantee.Subscriptions go “upward”. That is, they start where you call
subscribe
and follow the chain upwards to the source observable(s). So when the operation inside of the observable does something blocking, you need the thread on which it is subscribed to be a background thread. Thus,subscribeOn
controls the thread on which the observable does its work (note: there are exceptions to this).Conversely, data goes “downward”. When an observable emits data, the subscriber observes it in the form of a callback. By default this happens synchronously (like a normal callback) on the same thread as the work is being done inside the observer.
observeOn
changes the thread on which an observer receives the callback.If you understand these two facts, it should start to make sense why order matters and why you can do things like call these operators multiple times in a single stream.
For example,
This stream is created synchronously and it reads and emits the first name synchronously when
subscribe
is called. Once that happens it moves data across threads twice resulting in the second DB read to happen on a background thread and then the print happening later in time on the main thread.If you wanted the first DB read on a background thread (as you would in Android), you need to tell the stream to do the work which happens when you subscribe to the top observable on a background scheduler. This is done by putting
subscribeOn
in the chain, and in this example you can actually do it anywhere. I find it’s most intuitive to put it as close to the source observable as possible.Now the first DB read inside the observable happens on a background thread. Now hopefully you notice from the comments that we are doing the first DB read on the “io” scheduler and then we are telling it to observe that data on the “io” scheduler. This is redundant and can be removed:
Hope this helps. Like I said, I’m preparing a 45m talk on these concepts with some (hopefully) nice visualizations of the concepts.