RxJava: FATAL EXCEPTION: RxCachedThreadScheduler-4 or -1 or -2 or -3 ...

E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: goujiawang.gjstore, PID: 5960 java.io.InterruptedIOException: thread interrupted at okio.Timeout.throwIfReached(Timeout.java:146) at okio.Okio$2.read(Okio.java:135) at okio.Buffer.writeAll(Buffer.java:993) at okhttp3.RequestBody$3.writeTo(RequestBody.java:118) at okhttp3.MultipartBody.writeOrCountBytes(MultipartBody.java:171) at okhttp3.MultipartBody.writeTo(MultipartBody.java:113) at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:189) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:170) at okhttp3.RealCall.execute(RealCall.java:60) at retrofit2.OkHttpCall.execute(OkHttpCall.java:174) at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41) at io.reactivex.Observable.subscribe(Observable.java:10151) at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10151) at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:156) at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext(FlowableFilter.java:72) at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorConditionalSubscription.slowPath(FlowableFromIterable.java:376) at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:152) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:110) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:66) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:65) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:46) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:35) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) at java.lang.Thread.run(Thread.java:818)

  upload = Flowable.fromIterable(view.getUploadFiles())
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !TextUtils.isEmpty(s);
                    }
                })
                .flatMap(new Function<String, Publisher<BaseRes<UploadImagData>>>() {
                    @Override
                    public Publisher<BaseRes<UploadImagData>> apply(String s) throws Exception {
                        return RServices.get().uploadImg(view.getJSessionId(), UpLoadUtils.getRequestBody(s));
                    }
                })
                .compose(Transformer.<UploadImagData>retrofit())
                .subscribeWith(new RSubscriber<UploadImagData>(context) {
                    @Override
                    public void _onNext(UploadImagData uploadImagData) {
                        picPaths.add(uploadImagData.getId());
                    }

                    @Override
                    public void _onTEmpty() {
                        T.show(context, context.getString(R.string.submit_failed));
                    }

                    @Override
                    public void _onNetWorkError() {
                        T.show(context, context.getString(R.string.network_not_well));
                    }

                    @Override
                    public void _onReturnCodeError(String returnCode, String msg) {
                        T.show(context, msg);
                    }

                    @Override
                    public void _onComplete() {
                        submitMaterialQuality(picPaths);
                    }
                });

and when activity destroyed, I dispose it

 upload .dispose();

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 28 (14 by maintainers)

Most upvoted comments

@radzio I’ve faced the problem, my mistake was to trying to emit a onError after the dispose. For now, I surround any of my emitter.onError(), onNext(), onComplete() like that

if (!emitter.isDisposed())
    emitter.onNext(value)

Note : RxJava2 only, that doesn’t crash on RxJava1

I’m having exactly the same issue. When online everything is good but crashes in airplane mode. Getting ‘unknownHost’ warnings etc. and then FATAL EXCEPTION: RxCachedThreadScheduler-1

Could we get an explanation why this exception crashes instead of going down to onError()?

Regards

@radzio Sorry to see it now, I used the following code to solve this problem.

static {
        RxJavaPlugins.setErrorHandler(new Consumer<Throwable>() {
            @Override
            public void accept(Throwable throwable) throws Exception {
                if (throwable instanceof InterruptedException) {
                    log("Thread interrupted");
                } else if (throwable instanceof InterruptedIOException) {
                    log("Io interrupted");
                } else if (throwable instanceof SocketException) {
                    log("Socket error");
                }
            }
        });
    }

Ok,I found the solution.

The problem is that we have to use thread interrupt to unblock blocking APIs. So when a blocking retrofit call is interrupted, it throws an InterruptedException but generally you can’t know why and have to check some cancellation indication flag.

In RxJava, the interruption happens first and the set of the cancellation flag happens after. If the blocked thread is slow to wake up, it will see the cancellation flag and not emit the interrupt error. If the blocked thread wakes up fast or the cancel thread pauses between the interrupt and setting the flag, the woken up thread may find the flag not set and think it was a spurious interrupt and complain.

If the flag would be set before the interrupt is sent, a cancellation would never trigger the signal of the interrupted error.

I seem to have a related error and am still confused about how to handle it…

When I disconnect the network, this code first correctly reports SocketTimeoutException in onError, but then immediately crashes with a ConnectException from somewhere within OkHttp.

compositeDisposable.add(
    Flowable.interval(10, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .flatMap(__ -> channelInfoManager.sendKeepAlive().toFlowable())
        .subscribe(
            Log::onNextWithTimber,
            Log::onErrorWithTimber
        )
);

If I replace the flatMap with either concatMap or switchMap it does not crash. Also, it didn’t crash with RxJava 1.

Is this now the expected behavior and if so what exactly is the explanation and how should we deal with it?