reactive: Async Create hanging while publishing observable

Consider the following code:

var xs = Observable.Create<Unit>(async o => 
{
    await Task.Delay(10);
    o.OnError(new Exception());
}).Replay().RefCount();

xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();

The sequence above doesn’t throw any exceptions and never completes.

I made the following observations:

  1. Removing the first subscription enables error propagation - exception is thrown in Subscribe context (last line)
  2. Removing .Replay().RefCount() enables error propagation - exception is thrown in Subscribe context (last line)
  3. Removing await Task.Delay(10) enables error propagation - exception is thrown in OnError call (within Create method). Surprisingly, switching two Subscribe methods makes exception thrown at Subscribe context (last line).

That being said, I am asking whether the following issues are by design:

  1. Observable sequence in the above scenario never being completed
  2. The fact that exception is sometimes thrown inside Create method, and other times - at Subscribe context.

If this is by design, what would you recommend as a workaround? How do I publish my sequences so that all of my clients (observers) can safely handle exceptions in this case? Current behavior seems so arbitrary, especially for library writers. It also makes debugging very painful. Please advise.

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 15

Most upvoted comments

Users run into the implications of that convenience-error-rethrowing-Subscribe-extension all the time. The framework should maybe, in addition to Obsolete, define an attribute YesItsThereButNoReallyYouDontWantThisInYourCodeAttribute 😉

The first one is synchronous, you get the exception directly on subscription. The second is asynchronous, so the test won’t observe the exception, but the subscription chain still will be broken (standard Rx operators unsubscribe and clean up when a downstream-call to OnError throws). As a rule of thumb, don’t use the Subscribe-extension that doesn’t handle exceptions because you might just run into situations just like this, which is especially harmful if the observable is shared through Replay and RefCount.

That’s due to the first call to Subscribe, which doesn’t observe the exception but only rethrows it. That’ll break the whole chain, every operator will unsubscribe from it’s source and you won’t get anything out of xs again. It’s unrelated to that specific overload of Create. This code doesn’t terminate as well:

[TestMethod]
public async Task TestRxHangs()
{
    var xs = Observable.Create<Unit>(o =>
    {
        Task.Delay(5).ContinueWith(_ => o.OnError(new Exception()));
        return Disposable.Empty;
    }).Replay().RefCount();

    xs.Subscribe(x => Console.WriteLine(x));
    xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message), () => Console.WriteLine("Completed"));

    await xs.DefaultIfEmpty();
}