RxJava: IO Scheduler breaks long running Subscriber onNext

I’m new to Rx and I’m having trouble finding a solution with a scenario. I’m trying to make a network call, parse the result into objects, and send it to the UI. Parsing the response is quick and updating the UI is slower. The problem is that there’s backpressure. Using onBackpressureBuffer doesn’t fix the problem, and it doesn’t even get rid of the MissingBackpressureException. When I tried to reduce the problem in a unit test, I found that I was able to get it to work by removing the subscribeOn(Schedulers.io()) line. I would expect this unit test to either fail or write “12345” to the consooe. Instead, it writes “1” and stops without any exceptions.

I’m also not sure why using a backpressure buffer isn’t working, so there may be two problems. At this point, all I know is that I’m in over my head and I can’t find out any more information on the web. Am I doing this wrong or are there bugs here?

Observable.from(new Integer[]{1, 2, 3, 4, 5})
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Integer>() {
        @Override public void onCompleted() { }
        @Override public void onError(Throwable e) {
            Assert.fail("error: " + e.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            try {
                System.out.print(integer);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.print("interrupted");
                Assert.fail(e.getMessage());
            }
        }
    });

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 32 (22 by maintainers)

Commits related to this issue

Most upvoted comments

I like to explain subscribeOn as being the iterable dual of putting the execute() outside the for loop.

executorService.execute(() -> {
  items = source.getItems();
  for (Object item : items) {
    do(item);
  }
});

that is why subscribeOn() affects what thread the whole Observable chain runs on from creation to processing of the data.

Conversely observeOn() is inside the for loop and why it only affects what thread the work after is done on.

items = source.getItems();
for (Object item : items) {
  executorService.execute(() -> {
    do(item);
  });
}

It is important to keep in mind that a Worker from a Scheduler will not let scheduled items execute concurrently where an Executor doesn’t make that guarantee.

Subscriptions go “upward”. That is, they start where you call subscribe and follow the chain upwards to the source observable(s). So when the operation inside of the observable does something blocking, you need the thread on which it is subscribed to be a background thread. Thus, subscribeOn controls the thread on which the observable does its work (note: there are exceptions to this).

Conversely, data goes “downward”. When an observable emits data, the subscriber observes it in the form of a callback. By default this happens synchronously (like a normal callback) on the same thread as the work is being done inside the observer. observeOn changes the thread on which an observer receives the callback.

If you understand these two facts, it should start to make sense why order matters and why you can do things like call these operators multiple times in a single stream.

For example,

Observable.create(s -> {
       // This runs on whatever thread calls 'subscribe' because there is no subscribeOn
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // Move data from the above observable to "io" scheduler.
    .observeOn(io())
     // This map (and thus DB call) happens on "io" scheduler.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

This stream is created synchronously and it reads and emits the first name synchronously when subscribe is called. Once that happens it moves data across threads twice resulting in the second DB read to happen on a background thread and then the print happening later in time on the main thread.

If you wanted the first DB read on a background thread (as you would in Android), you need to tell the stream to do the work which happens when you subscribe to the top observable on a background scheduler. This is done by putting subscribeOn in the chain, and in this example you can actually do it anywhere. I find it’s most intuitive to put it as close to the source observable as possible.

Observable.create(s -> {
       // This now runs on the "io" scheduler
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // When hooking up everything below to the observable above, do it on "io" scheduler.
    .subscribeOn(io())
    // Move data from the above observable to "io" scheduler.
    .observeOn(io())
     // This map (and thus DB call) happens on "io" scheduler.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

Now the first DB read inside the observable happens on a background thread. Now hopefully you notice from the comments that we are doing the first DB read on the “io” scheduler and then we are telling it to observe that data on the “io” scheduler. This is redundant and can be removed:

Observable.create(s -> {
       // This now runs on the "io" scheduler
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // When hooking up everything below to the observable above, do it on "io" scheduler.
    .subscribeOn(io())
     // This map still happens on "io" scheduler because that's the thread the data is emitted on.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

Hope this helps. Like I said, I’m preparing a 45m talk on these concepts with some (hopefully) nice visualizations of the concepts.