RxJava: Ignore(don't save reference to) disposable returning after subscription. RxJava2

Hello, and thx for your great work. I’m Android developer and I faced with some situation. I use Observable.create(…) to wrap listeners and move into reactive-world:

  Observable.create(emitter -> {
            SharedPreferences sharedPreferences = getSharedPreferences();
            SharedPreferences.OnSharedPreferenceChangeListener listener = (sharedPreferences, key) -> emitter.onNext(sharedPreferences.getBoolean("Some key", false));
            sharedPreferences.registerOnSharedPreferenceChangeListener(listener);
            emitter.setCancellable(() -> sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener));
        });

The problem is, this listener is stroring as WeakReference at SharedPreferences. In that case, only Disposable, returning after Observable.subscribe(…) method will save strong-reference to this listener.

Sometimes it does not neccessary for me to persist that Disposable, because i won’t dispose() my subscription. But in this case( don’t store reference to Disposable), my listener will be cleared by GC and my Observable stops emit items.

My question: Whether it is valid to not store Disposable, returning after subscribe(…) or not in common case. Or my code is wrong and I need to create some proxy-class, that will store strong-references in this case

Thx!

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 16 (9 by maintainers)

Most upvoted comments

@akarnokd Nice solution! in our project we decided to go for SharedPrefs decorator that stores strong refs

Is registerOnSharedPreferenceChangeListener RxJava ? Let’s see a simplified example:

WeakReference<Integer> wr = new WeakReference<>(123456789);

Observable<Integer> obs = Observable.fromCallable(() -> wr.get());

System.gc();
Thread.sleep(200);

obs.test()
  .assertFailure(NullPointerException.class);

Unsurprisingly, RxJava failed to keep 123456789 alive.

RxJava can’t change such weak sources into strong ones and also can’t make you hold strong references to its own components.

Your case has nothing to do with RxJava. I assume the weak reference property of registerOnSharedPreferenceChangeListener was there all along and when you manually registered a listener, you kept a strong reference to the listener somewhere until it could be released. Now RxJava is taking the place of the listener thus you have to keep a strong reference of the observer or disposable you use/get back to prevent losing the listener. (This is no contradiction to my answer to @einmalfel question as he didn’t specify what sources he meant and my answer assumed standard RxJava sources that don’t have any weak references anywhere near them.)

In a similar scenario, I used a IdentityHashMap to store the references, and removed the references on cancellation.

If the question had been, how to work around such weak listeners, that’s a more interesting question. For example, have a companion task that keeps a reference to the listener as part of the Observable setup:

private Observable<Boolean> observePreferences() {
    return Observable.create(emitter -> {
            SharedPreferences sharedPreferences = getSharedPreferences();
            SharedPreferences.OnSharedPreferenceChangeListener listener = 
                (sharedPreferences, key) ->
                     emitter.onNext(sharedPreferences.getBoolean("some_pref_key", false));
            sharedPreferences.registerOnSharedPreferenceChangeListener(listener);

            Disposable d = Schedulers.io().schedulePeriodicallyDirect(
               () -> log(listener.toString()), 100000, TimeUnit.DAYS);

            emitter.setCancellable(() -> 
                  sharedPreferences.unregisterOnSharedPreferenceChangeListener(listener);
                  d.dispose();
            });
      });
}

Let’s see:

  • onSubscribe called? Yes, by create itself
  • onXXX methods called sequentially? Yes, the sharedPreferences signals from the main thread, right?
  • onXXX stops if the consumer disposes? Yes, the setCancellation call after the register call ensures this.

Verdict: contract honored.