reactive: IObservable from Observable.Create unable to complete

It appears that the following methods IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<IDisposable>> subscribeAsync) and IObservable<TResult> Create<TResult>(Func<IObserver<TResult>, CancellationToken, Task<Action>> subscribeAsync) do not correctly mark the resulting IObservable<TResult> as complete when the asynchronous delegate completes.

This can be seen by the use of Stubs.Nop for the completion delegate here:

var taskCompletionObserver = new AnonymousObserver<IDisposable>(d => subscription.Disposable = d ?? Disposable.Empty, observer.OnError, Stubs.Nop);

and here:

var taskCompletionObserver = new AnonymousObserver<Action>(a => subscription.Disposable = a != null ? Disposable.Create(a) : Disposable.Empty, observer.OnError, Stubs.Nop);

I believe both of these Stubs.Nop calls should be replaced with observer.OnComplete.

About this issue

  • Original URL
  • State: closed
  • Created 9 years ago
  • Comments: 16

Commits related to this issue

Most upvoted comments

I got caught out by this again today!

It seems that using schedulers in combination with this Task returning overload can be incredibly dangerous:

Observable.Create<int>(async (obs, ct) => {
    var scheduledObs = obs.NotifyOn(TaskPoolScheduler.Default);

    for (var i = 0; i < 10; ++i)
    {
        await Task.Delay(50);
        scheduledObs.OnNext(i);
    }
    scheduledObs.OnCompleted();
})
.ToList()
.Wait()
.Dump();

This sometimes returns 9 elements and sometimes 10. When the async callback completes and thus completes the observer, there is no guarantee that the scheduler has actually called OnNext for the items. This wouldn’t be an issue if this overload didn’t automatically call OnCompleted as our scheduled OnCompleted() would occur after all the scheduled .OnNext calls.