RxJava: 2.x Exception unhandled after dispose()

Exception unhandled if dispose() called. Did I used it in a wrong way ?

error stack:

java.lang.IllegalStateException: example
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxCachedThreadScheduler-1" java.lang.IllegalStateException: illegal
	at com.example.MyClass$2.apply(MyClass.java:31)
	at com.example.MyClass$2.apply(MyClass.java:23)
	at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
	at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
	at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
	at io.reactivex.Observable.subscribe(Observable.java:10179)
	at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
	at io.reactivex.Scheduler$1.run(Scheduler.java:134)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

sample code:


public class MyClass {

    static Disposable disposable = null;

    public static void main(String[] args) {

        Observable.just(1)
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Integer, ObservableSource<Integer>>() {
                    @Override
                    public ObservableSource<Integer> apply(Integer integer) throws Exception {
                        long endTime = System.currentTimeMillis() + 500;
                        while (System.currentTimeMillis() < endTime) {
                        }
                        return Observable.error(new IllegalStateException("example"));
                    }
                })
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onError(Throwable e) {}

                    @Override
                    public void onComplete() {}

                    @Override
                    public void onSubscribe(Disposable d) {
                        disposable = d;
                    }

                    @Override
                    public void onNext(Integer integer) {}
                });


        try {
            Thread.sleep(50);
            disposable.dispose();
            Thread.sleep(500000);
        } catch (InterruptedException e) {
        }
    }
}

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 15 (7 by maintainers)

Most upvoted comments

This is by design, your Callable crashes and the downstream has disposed so the operator routes the error to the global error handler as it can’t know if the exception is relevant or not. Don’t crash into RxJava and consider using create() with emitter.tryOnError() instead.

You implemented BreakIfUnsubscribedFlowable incorrectly as it has no connection to the downstream’s dispose management. Btw you can achieve a similar disconnect effect via onTerminateDetach.