rxjs: If a hot observable's subscriber ignores errors, subsequent subscribers also ignore errors

If a subscriber subscribes to a hot observable with a next handler only (no error handler), then any subscribers that try to subscribe afterwards don’t have their next handler called.

RxJS version: rxjs@5.0.0-rc.4

Code to reproduce: https://plnkr.co/edit/0CgdiUZ7iyIzzBKpoBTL?p=preview

var observable = Rx.Observable.throw('some error').share();

// This subscriber cares about errors and gets them
observable.subscribe(function(res) {
  console.log("first subscriber, next " + res);
}, function(err) {
  console.log("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe(function(res) {
  console.log("second subscriber, next " + res);
});

// This subscriber cares about errors, but never gets them
// because the second subscriber did not handle them
observable.subscribe(function(res) {
  console.log("third subscriber, next " + res);
}, function(err) {
  console.log("third subscriber, error " + err);
});

Expected behavior: The first and third subscriber’s error handler should be called, and this output should be logged:

first subscriber, error some error
third subscriber, error some error

Actual behavior: Only the first subscriber’s error handler is called, the third subscriber’s error handler is not called. The below is logged instead:

first subscriber, error some error
  Uncaught some error (Rx.js:831)
    Observable.subscribe @ Rx.js:831
    (anonymous function) @ script.js:11

Additional information: Possibly related: #1420

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Reactions: 4
  • Comments: 19 (9 by maintainers)

Commits related to this issue

Most upvoted comments

Apologies I’m late to this party.

This is an issue called “Producer interference”. It’s been dealt with in the TC39 proposal, and it’s also actually dealt with in master for RxJS 6. Unfortunately, it’s not something we can fix in RxJS 5.x because it’s a breaking change (albeit a small one)

Basically to fix this we have to break the following:

try {
  of(1).map(() => { throw new Error('lol') }).subscribe();
} catch (err) {
  console.log('this code should definitely be hit in RxJS 5.5, but won't in Rx 6');
}

The problem exists, as I’m sure some of you have figured out, because unhandled errors are immediately rethrown in RxJS. This means that they’ll unwind the stack back to a loop that is notifying for a multicast, and break the loop.

The solution is quite simple, and makes sense for consistencies sake: Just schedule the throw on a timeout.

The truth is that putting a try/catch around a subscription is silly, and accommodating it doesn’t make any sense. So we’ve moved away from that potentially buggy behavior, and it actually cleaned up a lot of code for us.

FYI: The workaround for this is to use observeOn(asap) after any multicast if you’re worried about this behavior happening to you.

@trxcllnt

Subscriber’s default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

I understand about synchronous errors, and I’ve posted a second example that shows it applies for asynchronous errors.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber’s next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

The reason I mentioned that bug is this comment, which says that an unhandled error in one subscription terminates all other subscriptions to the same observable. This is what I’m seeing here which I think is unexpected behavior.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn’t stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can’t really be helped.

But this is interrupting the execution for subscribers with an error handler if a subscriber without an error handler happened to subscribe to the same observable.

I tried the same thing with RxJava and got what I expected:

Code:

Observable<Long> observable = Observable.interval(200, TimeUnit.MILLISECONDS).doOnNext((i) -> {
	if (i == 2) {
		throw new RuntimeException("some error");
	}
}).share();

observable.subscribe((res) -> {
	System.out.println("first subscriber, next " + res);
}, (err) -> {
	System.out.println("first subscriber, error " + err);
});

// This subscriber does not care about errors
observable.subscribe((res) -> {
	System.out.println("second subscriber, next " + res);
});

// This subscriber cares about errors, and gets them, as expected
observable.subscribe((res) -> {
	System.out.println("third subscriber, next " + res);
}, (err) -> {
	System.out.println("third subscriber, error " + err);
});

Output:

first subscriber, next 0
second subscriber, next 0
third subscriber, next 0
first subscriber, next 1
second subscriber, next 1
third subscriber, next 1
first subscriber, error java.lang.RuntimeException: some error
java.lang.RuntimeException: some error
	at com.example.Demo.lambda$0(Demo.java:53)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
	at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Exception in thread "RxComputationThreadPool-1" java.lang.RuntimeException: some error
	at com.example.Demo.lambda$0(Demo.java:53)
	at io.reactivex.internal.operators.observable.ObservableDoOnEach$DoOnEachObserver.onNext(ObservableDoOnEach.java:95)
	at io.reactivex.internal.operators.observable.ObservableInterval$IntervalObserver.run(ObservableInterval.java:74)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
third subscriber, error java.lang.RuntimeException: some error

@imgx64 Subscriber’s default behavior is to throw error unhandled errors. The alternative is swallowing them, and nobody wants that. An Observable synchronously emitting an error to a Subscriber without an error handler behaves the same as synchronously calling a function that throws without wrapping in a try/catch.

The issue you linked to is a discussion about whether we should catch errors thrown in the end Subscriber’s next handler and re-route them to the error handler, or if we should allow them to go uncaught. I was for it, because consumers could centralize all error handling logic in the error callback, instead of needing try/catches inside next and around the subscription point, to handle the case where in case the Observable synchronously emits.

Even if we did that though, unhandled errors would still end up interrupting the execution for Subscribers without an error handler. Rx doesn’t stray from any other implementation of the Subject/Observer pattern; errors thrown during dispatch interrupts dispatch. Can’t really be helped.