RxJava: BehaviorSubject concurrent subscription and sending is broken

BehaviorSubject should ensure that the last notification always reaches the subscriber. When the subscription and sending happens concurrently, there is a high probability that this property gets broken.

Test has been prepared that easily reproduces the error. https://gist.github.com/andrask/fc06abfd70daa6f91edb#file-behaviorsubjectsubscribeandsendconcurrently-java

The test involves two threads: 1) trying to subscribe 2) trying to send next. These are carefully coordinated to allow real concurrent execution. The test is that the subscriber must receive the sent value. The issue almost certainly happens in a few hundred retries.

Note that with a Thread.sleep(1) the issue goes away.

Note that the test is something I distilled from what I saw in my production code. There may be little issues with it but the concurrency problem certainly exists as it is reproducibly just by stepping through the code.

About this issue

  • Original URL
  • State: closed
  • Created 10 years ago
  • Comments: 21 (12 by maintainers)

Most upvoted comments

Well, it’s hard to divide subscribe() and setting of initial value (which is ~onNext) for me.

What I see at the moment:

  1. Current implementation of BehaviorSubject.create(defaultValue) sets default value to volatile Object latest field of SubjectSubscriptionManager.
  2. When you call onNext() it also sets new value to that volatile field without any synchronization.
  3. When you subscribe() it won’t start with defaultValue that you’ve passed as initial but it will emit that volatile latest state instead -> that leads to the effect that @sregg sees.

And now I’m not sure that toSerialized() will help with initial value because even though it synchronizes onNext, emission of the default value goes trough different lock object and still may happen concurrently with onNext

@sregg as temporary solution you can try to create BehaviorSubject without default value, convert it toSerialized() and set default value via onNext before giving reference to the subject to other threads, this should give you threading consistency you want to achieve.

Probably we need to define this behavior better either in javadoc or even change current impementation to emit default initial value first and only then emit volatile latest.

@akarnokd will be great to get your comment here!

Subscription is not an on* method and therefore this is not expected.

On Tue, May 24, 2016 at 7:38 PM Artem Zinnatullin notifications@github.com wrote:

This is expected behavior.

If you you write to the subject from multiple threads and need consistency of on*() events -> you can convert your subject to SerializedSubject via subject.toSerialized().

Javadoc of Subject.toSerialized():

/** * Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads. *

* When you use an ordinary {@link Subject} as a {@link Subscriber}, you must take care not to call its * {@link Subscriber#onNext} method (or its other {@code on} methods) from multiple threads, as this could * lead to non-serialized calls, which violates * the Observable contract and creates an * ambiguity in the resulting Subject. *

* To protect a {@code Subject} from this danger, you can convert it into a {@code SerializedSubject} with * code like the following: *

{@code * mySafeSubject = myUnsafeSubject.toSerialized(); * }
* * @return SerializedSubject wrapping the current Subject */

— You are receiving this because you are subscribed to this thread. Reply to this email directly or view it on GitHub https://github.com/ReactiveX/RxJava/issues/1184#issuecomment-221433827