RxJava: 2.x Exception unhandled after dispose()
Exception unhandled if dispose() called. Did I used it in a wrong way ?
error stack:
java.lang.IllegalStateException: example
at com.example.MyClass$2.apply(MyClass.java:31)
at com.example.MyClass$2.apply(MyClass.java:23)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10179)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxCachedThreadScheduler-1" java.lang.IllegalStateException: illegal
at com.example.MyClass$2.apply(MyClass.java:31)
at com.example.MyClass$2.apply(MyClass.java:23)
at io.reactivex.internal.operators.observable.ObservableFlatMap$MergeObserver.onNext(ObservableFlatMap.java:121)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$SubscribeOnObserver.onNext(ObservableSubscribeOn.java:63)
at io.reactivex.internal.operators.observable.ObservableScalarXMap$ScalarDisposable.run(ObservableScalarXMap.java:246)
at io.reactivex.internal.operators.observable.ObservableJust.subscribeActual(ObservableJust.java:35)
at io.reactivex.Observable.subscribe(Observable.java:10179)
at io.reactivex.internal.operators.observable.ObservableSubscribeOn$1.run(ObservableSubscribeOn.java:39)
at io.reactivex.Scheduler$1.run(Scheduler.java:134)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
sample code:
public class MyClass {
static Disposable disposable = null;
public static void main(String[] args) {
Observable.just(1)
.subscribeOn(Schedulers.io())
.flatMap(new Function<Integer, ObservableSource<Integer>>() {
@Override
public ObservableSource<Integer> apply(Integer integer) throws Exception {
long endTime = System.currentTimeMillis() + 500;
while (System.currentTimeMillis() < endTime) {
}
return Observable.error(new IllegalStateException("example"));
}
})
.subscribe(new Observer<Integer>() {
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
@Override
public void onSubscribe(Disposable d) {
disposable = d;
}
@Override
public void onNext(Integer integer) {}
});
try {
Thread.sleep(50);
disposable.dispose();
Thread.sleep(500000);
} catch (InterruptedException e) {
}
}
}
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 15 (7 by maintainers)
This is by design, your
Callable
crashes and the downstream has disposed so the operator routes the error to the global error handler as it can’t know if the exception is relevant or not. Don’t crash into RxJava and consider usingcreate()
withemitter.tryOnError()
instead.You implemented
BreakIfUnsubscribedFlowable
incorrectly as it has no connection to the downstream’s dispose management. Btw you can achieve a similar disconnect effect viaonTerminateDetach
.