RxJava: FATAL EXCEPTION: RxCachedThreadScheduler-4 or -1 or -2 or -3 ...
E/AndroidRuntime: FATAL EXCEPTION: RxCachedThreadScheduler-1 Process: goujiawang.gjstore, PID: 5960 java.io.InterruptedIOException: thread interrupted at okio.Timeout.throwIfReached(Timeout.java:146) at okio.Okio$2.read(Okio.java:135) at okio.Buffer.writeAll(Buffer.java:993) at okhttp3.RequestBody$3.writeTo(RequestBody.java:118) at okhttp3.MultipartBody.writeOrCountBytes(MultipartBody.java:171) at okhttp3.MultipartBody.writeTo(MultipartBody.java:113) at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:189) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:92) at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:67) at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:170) at okhttp3.RealCall.execute(RealCall.java:60) at retrofit2.OkHttpCall.execute(OkHttpCall.java:174) at com.jakewharton.retrofit2.adapter.rxjava2.CallObservable.subscribeActual(CallObservable.java:41) at io.reactivex.Observable.subscribe(Observable.java:10151) at com.jakewharton.retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34) at io.reactivex.Observable.subscribe(Observable.java:10151) at io.reactivex.internal.operators.flowable.FlowableFromObservable.subscribeActual(FlowableFromObservable.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableOnBackpressureLatest.subscribeActual(FlowableOnBackpressureLatest.java:31) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onNext(FlowableFlatMap.java:156) at io.reactivex.internal.operators.flowable.FlowableFilter$FilterSubscriber.tryOnNext(FlowableFilter.java:72) at io.reactivex.internal.operators.flowable.FlowableFromIterable$IteratorConditionalSubscription.slowPath(FlowableFromIterable.java:376) at io.reactivex.internal.operators.flowable.FlowableFromIterable$BaseRangeSubscription.request(FlowableFromIterable.java:123) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.request(BasicFuseableSubscriber.java:152) at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.onSubscribe(FlowableFlatMap.java:110) at io.reactivex.internal.subscribers.BasicFuseableSubscriber.onSubscribe(BasicFuseableSubscriber.java:66) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribe(FlowableFromIterable.java:65) at io.reactivex.internal.operators.flowable.FlowableFromIterable.subscribeActual(FlowableFromIterable.java:46) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFilter.subscribeActual(FlowableFilter.java:35) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableFlatMap.subscribeActual(FlowableFlatMap.java:52) at io.reactivex.Flowable.subscribe(Flowable.java:12172) at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588) at java.lang.Thread.run(Thread.java:818)
upload = Flowable.fromIterable(view.getUploadFiles())
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return !TextUtils.isEmpty(s);
}
})
.flatMap(new Function<String, Publisher<BaseRes<UploadImagData>>>() {
@Override
public Publisher<BaseRes<UploadImagData>> apply(String s) throws Exception {
return RServices.get().uploadImg(view.getJSessionId(), UpLoadUtils.getRequestBody(s));
}
})
.compose(Transformer.<UploadImagData>retrofit())
.subscribeWith(new RSubscriber<UploadImagData>(context) {
@Override
public void _onNext(UploadImagData uploadImagData) {
picPaths.add(uploadImagData.getId());
}
@Override
public void _onTEmpty() {
T.show(context, context.getString(R.string.submit_failed));
}
@Override
public void _onNetWorkError() {
T.show(context, context.getString(R.string.network_not_well));
}
@Override
public void _onReturnCodeError(String returnCode, String msg) {
T.show(context, msg);
}
@Override
public void _onComplete() {
submitMaterialQuality(picPaths);
}
});
and when activity destroyed, I dispose it
upload .dispose();
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Comments: 28 (14 by maintainers)
@radzio I’ve faced the problem, my mistake was to trying to emit a
onError
after the dispose. For now, I surround any of myemitter.onError()
,onNext()
,onComplete()
like thatNote : RxJava2 only, that doesn’t crash on RxJava1
I’m having exactly the same issue. When online everything is good but crashes in airplane mode. Getting ‘unknownHost’ warnings etc. and then
FATAL EXCEPTION: RxCachedThreadScheduler-1
Could we get an explanation why this exception crashes instead of going down to
onError()
?Regards
@radzio Sorry to see it now, I used the following code to solve this problem.
Ok,I found the solution.
The problem is that we have to use thread interrupt to unblock blocking APIs. So when a blocking retrofit call is interrupted, it throws an InterruptedException but generally you can’t know why and have to check some cancellation indication flag.
In RxJava, the interruption happens first and the set of the cancellation flag happens after. If the blocked thread is slow to wake up, it will see the cancellation flag and not emit the interrupt error. If the blocked thread wakes up fast or the cancel thread pauses between the interrupt and setting the flag, the woken up thread may find the flag not set and think it was a spurious interrupt and complain.
If the flag would be set before the interrupt is sent, a cancellation would never trigger the signal of the interrupted error.
I seem to have a related error and am still confused about how to handle it…
When I disconnect the network, this code first correctly reports SocketTimeoutException in onError, but then immediately crashes with a ConnectException from somewhere within OkHttp.
If I replace the
flatMap
with eitherconcatMap
orswitchMap
it does not crash. Also, it didn’t crash with RxJava 1.Is this now the expected behavior and if so what exactly is the explanation and how should we deal with it?