reactive: Async Create hanging while publishing observable
Consider the following code:
var xs = Observable.Create<Unit>(async o =>
{
await Task.Delay(10);
o.OnError(new Exception());
}).Replay().RefCount();
xs.Subscribe(x => Console.WriteLine(x));
xs.Subscribe(x => Console.WriteLine(x), ex => Console.WriteLine(ex.Message));
await xs.DefaultIfEmpty();
The sequence above doesn’t throw any exceptions and never completes.
I made the following observations:
- Removing the first subscription enables error propagation - exception is thrown in
Subscribe
context (last line) - Removing
.Replay().RefCount()
enables error propagation - exception is thrown inSubscribe
context (last line) - Removing
await Task.Delay(10)
enables error propagation - exception is thrown inOnError
call (withinCreate
method). Surprisingly, switching twoSubscribe
methods makes exception thrown atSubscribe
context (last line).
That being said, I am asking whether the following issues are by design:
- Observable sequence in the above scenario never being completed
- The fact that exception is sometimes thrown inside
Create
method, and other times - atSubscribe
context.
If this is by design, what would you recommend as a workaround? How do I publish my sequences so that all of my clients (observers) can safely handle exceptions in this case? Current behavior seems so arbitrary, especially for library writers. It also makes debugging very painful. Please advise.
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Comments: 15
Users run into the implications of that convenience-error-rethrowing-
Subscribe
-extension all the time. The framework should maybe, in addition toObsolete
, define an attributeYesItsThereButNoReallyYouDontWantThisInYourCodeAttribute
😉The first one is synchronous, you get the exception directly on subscription. The second is asynchronous, so the test won’t observe the exception, but the subscription chain still will be broken (standard Rx operators unsubscribe and clean up when a downstream-call to OnError throws). As a rule of thumb, don’t use the
Subscribe
-extension that doesn’t handle exceptions because you might just run into situations just like this, which is especially harmful if the observable is shared throughReplay
andRefCount
.That’s due to the first call to
Subscribe
, which doesn’t observe the exception but only rethrows it. That’ll break the whole chain, every operator will unsubscribe from it’s source and you won’t get anything out ofxs
again. It’s unrelated to that specific overload ofCreate
. This code doesn’t terminate as well: