RxJava: BehaviorSubject concurrent subscription and sending is broken
BehaviorSubject should ensure that the last notification always reaches the subscriber. When the subscription and sending happens concurrently, there is a high probability that this property gets broken.
Test has been prepared that easily reproduces the error. https://gist.github.com/andrask/fc06abfd70daa6f91edb#file-behaviorsubjectsubscribeandsendconcurrently-java
The test involves two threads: 1) trying to subscribe 2) trying to send next. These are carefully coordinated to allow real concurrent execution. The test is that the subscriber must receive the sent value. The issue almost certainly happens in a few hundred retries.
Note that with a Thread.sleep(1)
the issue goes away.
Note that the test is something I distilled from what I saw in my production code. There may be little issues with it but the concurrency problem certainly exists as it is reproducibly just by stepping through the code.
About this issue
- Original URL
- State: closed
- Created 10 years ago
- Comments: 21 (12 by maintainers)
Well, it’s hard to divide
subscribe()
and setting of initial value (which is~onNext
) for me.What I see at the moment:
BehaviorSubject.create(defaultValue)
sets default value tovolatile Object latest
field ofSubjectSubscriptionManager
.onNext()
it also sets new value to thatvolatile
field without any synchronization.subscribe()
it won’t start withdefaultValue
that you’ve passed as initial but it will emit thatvolatile latest
state instead -> that leads to the effect that @sregg sees.And now I’m not sure that
toSerialized()
will help with initial value because even though it synchronizesonNext
, emission of the default value goes trough different lock object and still may happen concurrently withonNext
…@sregg as temporary solution you can try to create
BehaviorSubject
without default value, convert ittoSerialized()
and set default value viaonNext
before giving reference to the subject to other threads, this should give you threading consistency you want to achieve.Probably we need to define this behavior better either in javadoc or even change current impementation to emit
default
initial value first and only then emitvolatile latest
.@akarnokd will be great to get your comment here!
Subscription is not an on* method and therefore this is not expected.
On Tue, May 24, 2016 at 7:38 PM Artem Zinnatullin notifications@github.com wrote: