retrofit: Unsubsribe from pending request leads to java.io.InterruptedIOException and memory leak

I have a retrofit interface method returns an Observable, if unsubscribe it in onDestory() in an Activity or a Fragment while the request is in-progress, I always get a java.io.InterruptedIOException, and this leads to memory leak, I use LeakCanary for memory leak detection. I know currently there is no method to cancel request and I don’t want to use Callback because I use Rxjava in my app, is there any way to avoid the memory leak?

compile 'com.squareup.retrofit:retrofit:1.9.0'
compile 'com.squareup.okhttp:okhttp:2.3.0'
@POST("/functions/findLogRecordsForUser")
    Observable<FindLogRecordsForUserResponse> findLogRecordsForUser(@Body FindLogRecordsForUserParams params);
 RequestInterceptor requestInterceptor = request -> {
            request.addHeader("Content-Type", "application/json");
            request.addHeader("Application-Id", application.getString(R.string.application_id));
            request.addHeader("Client-Key", application.getString(R.string.client_key));

        };

        RestAdapter restAdapter = new RestAdapter.Builder()
                .setEndpoint(Service.BASE_URL + Service.VERSION)
                .setLogLevel(RestAdapter.LogLevel.BASIC)
                .setRequestInterceptor(requestInterceptor)
                .setClient(new OkClient(new OkHttpClient()))
                .build();
D/Retrofit﹕ ---> HTTP POST 
05-13 12:22:16.746    7084-7084/ V/HistoryFragment﹕ ⇢ onDestroy()
05-13 12:22:16.746    7084-7084/ V/HistoryFragment﹕ ⇠ onDestroy [0ms]
05-13 12:22:17.310   7084-14052/ D/Retrofit﹕ <--- HTTP 200  (1638ms)
05-13 12:22:17.351   7084-14052/ D/Retrofit﹕ ---- ERROR 
05-13 12:22:17.361   7084-14052/ D/Retrofit﹕ java.io.InterruptedIOException
            at okio.Timeout.throwIfReached(Timeout.java:146)
            at okio.Okio$2.read(Okio.java:134)
            at okio.AsyncTimeout$2.read(AsyncTimeout.java:211)
            at okio.RealBufferedSource.read(RealBufferedSource.java:50)
            at com.squareup.okhttp.internal.http.HttpConnection$FixedLengthSource.read(HttpConnection.java:390)
            at okio.RealBufferedSource.read(RealBufferedSource.java:50)
            at okio.RealBufferedSource.exhausted(RealBufferedSource.java:60)
            at okio.InflaterSource.refill(InflaterSource.java:101)
            at okio.InflaterSource.read(InflaterSource.java:62)
            at okio.GzipSource.read(GzipSource.java:80)
            at okio.RealBufferedSource$1.read(RealBufferedSource.java:338)
            at retrofit.ExceptionCatchingTypedInput$ExceptionCatchingInputStream.read(ExceptionCatchingTypedInput.java:64)
            at java.io.InputStreamReader.read(InputStreamReader.java:233)
            at com.google.gson.stream.JsonReader.fillBuffer(JsonReader.java:1300)
            at com.google.gson.stream.JsonReader.nextQuotedValue(JsonReader.java:1030)
            at com.google.gson.stream.JsonReader.nextString(JsonReader.java:827)
            at com.google.gson.internal.bind.TypeAdapters$13.read(TypeAdapters.java:358)
            at com.google.gson.internal.bind.TypeAdapters$13.read(TypeAdapters.java:346)
            at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:103)
            at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:196)
            at com.google.gson.internal.bind.TypeAdapterRuntimeTypeWrapper.read(TypeAdapterRuntimeTypeWrapper.java:40)
            at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:81)
            at com.google.gson.internal.bind.CollectionTypeAdapterFactory$Adapter.read(CollectionTypeAdapterFactory.java:60)
            at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$1.read(ReflectiveTypeAdapterFactory.java:103)
            at com.google.gson.internal.bind.ReflectiveTypeAdapterFactory$Adapter.read(ReflectiveTypeAdapterFactory.java:196)
            at com.google.gson.Gson.fromJson(Gson.java:810)
            at com.google.gson.Gson.fromJson(Gson.java:775)
            at retrofit.converter.GsonConverter.fromBody(GsonConverter.java:63)
            at retrofit.RestAdapter$RestHandler.invokeRequest(RestAdapter.java:367)
            at retrofit.RestAdapter$RestHandler.access$100(RestAdapter.java:220)
            at retrofit.RestAdapter$RestHandler$1.invoke(RestAdapter.java:265)
            at retrofit.RxSupport$2.run(RxSupport.java:55)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:422)
            at java.util.concurrent.FutureTask.run(FutureTask.java:237)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
            at retrofit.Platform$Android$2$1.run(Platform.java:142)
            at java.lang.Thread.run(Thread.java:841)
05-13 12:22:17.361   7084-14052/ D/Retrofit﹕ ---- END ERROR
05-13 12:22:17.630   7084-14052/ D/dalvikvm﹕ GC_FOR_ALLOC freed 3987K, 24% free 13420K/17508K, paused 37ms, total 37ms
05-13 12:22:21.825    7084-7201/ D/dalvikvm﹕ GC_EXPLICIT freed 1109K, 28% free 12741K/17508K, paused 9ms+4ms, total 61ms
05-13 12:22:22.435    7084-7201/ I/dalvikvm﹕ hprof: dumping heap strings to "/data/data//files/suspected_leak_heapdump.hprof".
05-13 12:22:22.608    7084-7201/ I/dalvikvm﹕ hprof: heap dump completed (13422KB)
05-13 12:23:04.444   7084-14558/ D/LeakCanary﹕ In :3.0:20.
    * .view.activity.MainNavigationDrawerActivity has leaked:
    * GC ROOT thread java.lang.Thread.<Java Local> (named 'Retrofit-Idle')
    * references rx.internal.operators.OperatorMerge$MergeSubscriber.actual
    * references rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.val$child (anonymous class extends rx.Subscriber)
    * references rx.internal.operators.OperatorSubscribeOn$1$1$1.op (anonymous class extends rx.Subscriber)
    * references rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.child
    * references rx.observers.SafeSubscriber.actual
    * references rx.Observable$32.val$onNext (anonymous class extends rx.Subscriber)
    * references .view.fragment.HistoryFragment$$Lambda$1.arg$1 (anonymous class implements rx.functions.Action1)
    * references .view.fragment.HistoryFragment.multiStateView
    * references com.kennyc.view.MultiStateView.mContext
    * references android.support.v7.internal.view.ContextThemeWrapper.mBase
    * leaks .view.activity.MainNavigationDrawerActivity instance
    * Reference Key: dad2c68a-cb9e-46ac-a615-6404946d07be
    * Device: motorola motorola XT1032 falcon_retgb
    * Android Version: 4.4.4 API: 19
    * Durations: watch=5021ms, gc=163ms, heap dump=686ms, analysis=41561ms

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 31 (1 by maintainers)

Most upvoted comments

This is an old post, but for others who are searching for the cause to similar problems (like me…)

@JakeWharton said:

Thus far all stack traces in this issue are due to normal timeouts.

But the code of Okio (buldled with ‘com.squareup.okhttp3:okhttp:3.9.0’) is:

  /**
   * Throws an {@link InterruptedIOException} if the deadline has been reached or if the current
   * thread has been interrupted. This method doesn't detect timeouts; that should be implemented to
   * asynchronously abort an in-progress operation.
   */
  public void throwIfReached() throws IOException {
    if (Thread.interrupted()) {
      throw new InterruptedIOException("thread interrupted");
    }

    if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
      throw new InterruptedIOException("deadline reached");
    }
  }

So these stack traces match the description of the issue: unsubscribe interrupts the thread and throws this error.

+1, Anybody knows if this issue got solved?

Any solution so far? I am facing the same issue while using RxJava + retrofit to search as you type. Following is the code snippet.

    @Override
    public void onSearchTextChanged(Observable<String> queryObservable) {
        getCompositeDisposable().add(queryObservable
                .debounce(500, TimeUnit.MILLISECONDS)
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String text) throws Exception {
                        return !text.isEmpty();
                    }
                })
                .distinctUntilChanged()
                .switchMap(new Function<String, Observable<WikiSearchOutputModel>>() {
                    @Override
                    public Observable<WikiSearchOutputModel> apply(String query) throws Exception {
                        return callWikiApi(query);
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<WikiSearchOutputModel>() {
                    @Override
                    public void accept(WikiSearchOutputModel outputModel) throws Exception {
                        getMvpView().showSearchResults((ArrayList<Page>) outputModel.getQuery().getPages());
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.wtf("WikiSearchPresenter", throwable);
                    }
                })
        );
    }

    private Observable<WikiSearchOutputModel> callWikiApi(String queryParam) {
        return getDataManager().callWikiApi(queryParam);
    }

This is always causing error

09-01 17:50:48.926 711-711/io.rajsuvariya.wikisearch E/WikiSearchPresenter: thread interrupted
    java.io.InterruptedIOException: thread interrupted
        at okio.Timeout.throwIfReached(Timeout.java:145)
        at okio.Okio$1.write(Okio.java:76)
        at okio.AsyncTimeout$1.write(AsyncTimeout.java:180)
        at okio.RealBufferedSink.flush(RealBufferedSink.java:224)
        at okhttp3.internal.http2.Http2Writer.connectionPreface(Http2Writer.java:72)
        at okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:512)
        at okhttp3.internal.http2.Http2Connection.start(Http2Connection.java:503)
        at okhttp3.internal.connection.RealConnection.establishProtocol(RealConnection.java:280)
        at okhttp3.internal.connection.RealConnection.connect(RealConnection.java:162)
        at okhttp3.internal.connection.StreamAllocation.findConnection(StreamAllocation.java:257)
        at okhttp3.internal.connection.StreamAllocation.findHealthyConnection(StreamAllocation.java:135)
        at okhttp3.internal.connection.StreamAllocation.newStream(StreamAllocation.java:114)
        at okhttp3.internal.connection.ConnectInterceptor.intercept(ConnectInterceptor.java:42)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.cache.CacheInterceptor.intercept(CacheInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.internal.http.BridgeInterceptor.intercept(BridgeInterceptor.java:93)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RetryAndFollowUpInterceptor.intercept(RetryAndFollowUpInterceptor.java:126)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.logging.HttpLoggingInterceptor.intercept(HttpLoggingInterceptor.java:212)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:147)
        at okhttp3.internal.http.RealInterceptorChain.proceed(RealInterceptorChain.java:121)
        at okhttp3.RealCall.getResponseWithInterceptorChain(RealCall.java:200)
        at okhttp3.RealCall.execute(RealCall.java:77)
        at retrofit2.OkHttpCall.execute(OkHttpCall.java:180)
        at retrofit2.adapter.rxjava2.CallExecuteObservable.subscribeActual(CallExecuteObservable.java:42)
        at io.reactivex.Observable.subscribe(Observable.java:11194)
        at retrofit2.adapter.rxjava2.BodyObservable.subscribeActual(BodyObservable.java:34)
        at io.reactivex.Observable.subscribe(Observable.java:11194)
        at io.reactivex.internal.operators.observable.ObservableSwitchMap$SwitchMapObserver.onNext(ObservableSwitchMap.java:126)
        at io.reactivex.internal.operators.observable.ObservableDistinctUntilChanged$DistinctUntilChangedObserver.onNext(ObservableDistinctUntilChanged.java:85)
        at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:52)
        at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:113)
        at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceTimedObserver.emit(ObservableDebounceTimed.java:140)
        at io.reactivex.internal.operators.observable.ObservableDebounceTimed$DebounceEmitter.run(ObservableDebounceTimed.java:165)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
        at java.lang.Thread.run(Thread.java:764)

This issue is also affects caching in OkHttpClient if you use it in Retrofit < 2.0.

If you use OkHttpClient, than throwing exception by okio.Timeout causes setting hasJournalErrors boolean flag inside DiskLruCache to true which in it’s turn forbids following attempts to write the response to cache, destroying Cache-Control semantic until the process is restarted.