reactive: IObservable from Observable.Create unable to complete
It appears that the following methods IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync) and IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync) do not correctly mark the resulting IObservable<TResult>
as complete when the asynchronous delegate completes.
This can be seen by the use of Stubs.Nop
for the completion delegate here:
var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);
and here:
var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);
I believe both of these Stubs.Nop
calls should be replaced with observer.OnComplete
.
About this issue
- Original URL
- State: closed
- Created 9 years ago
- Comments: 16
I got caught out by this again today!
It seems that using schedulers in combination with this Task returning overload can be incredibly dangerous:
This sometimes returns 9 elements and sometimes 10. When the async callback completes and thus completes the observer, there is no guarantee that the scheduler has actually called
OnNext
for the items. This wouldn’t be an issue if this overload didn’t automatically callOnCompleted
as our scheduledOnCompleted()
would occur after all the scheduled.OnNext
calls.