RxJava: mergeDelayError not delaying errors when subscribing to schedulers

When two observables are merged with mergeDelayError and the resulting observable is subscribed to a scheduler, then errors are not delayed. Example:

Observable.mergeDelayError(
                Observable.error(new RuntimeException()),
                Observable.just("Hello")
            )
            .subscribeOn(Schedulers.io()).subscribe(observer);

In this case “hello” will never be emitted. Also mentioned in this SO question: http://stackoverflow.com/questions/32131594/rx-java-mergedelayerror-not-working-as-expected

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 18 (9 by maintainers)

Most upvoted comments

I’m still facing this issue using v.1.0.14.

Test that reproduce the case:

@Test public void testMergeDelayErrorWithOnErrorBeforeOnNext() {
    TestSubscriber<String> ts = new TestSubscriber<>();

    final Observable<String> errorObservable = Observable.error(new RuntimeException());

    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
      @Override public void call(Subscriber<? super String> subscriber) {
        try {
          //Simulate long operation
          Thread.sleep(1000);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        subscriber.onNext("1");
        subscriber.onNext("2");
        subscriber.onCompleted();
      }
    });

    Observable.mergeDelayError(errorObservable, observable)
        .observeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.newThread())
        .subscribe(ts);

    ts.awaitTerminalEvent();
    ts.assertError(RuntimeException.class);
    ts.assertReceivedOnNext(Arrays.asList("1", "2"));
  }

Test output:

java.lang.AssertionError: Number of items does not match. Provided: 2  Actual: 1
    at rx.observers.TestObserver.assertReceivedOnNext(TestObserver.java:116)
    at rx.observers.TestSubscriber.assertReceivedOnNext(TestSubscriber.java:229)
    attestMergeDelayErrorWithOnErrorBeforeOnNext

The original ReactiveX design favored fail-fast streams. An async boundary such as observeOn has the opportunity to see both the elements of the source thread and the exception that came after in that thread. Since observeOn with the fail-fast behavior was very established when this property started causing confusion, we had no choice but to overload and parametrize the error handling behavior. I’d say you have it better because Rx.NET’s suggested solution was to materialize and dematerialize the stream to overcome this property…

I think what you see is that the error cuts ahead in observeOn. Try applying doOnNextand doOnError after the mergeDelayError and see their order printed out.