RxJava: 2.x: CreateEmitter throws when onError called after dispose unlike 1.x

I have these two tests which behave differently in 1.x and 2.x.

@org.junit.Test
public void testRxJava1() throws Exception {
	Subscription subscription = rx.Observable.fromEmitter(emitter -> {
		emitter.onNext(1);
		emitter.setCancellation(() -> emitter.onError(new IllegalArgumentException()));
	}, Emitter.BackpressureMode.NONE)
			.subscribe(
				integer -> System.out.println(integer),
				throwable -> throwable.printStackTrace()
			);

	subscription.unsubscribe();
}

@org.junit.Test
public void testRxJava2() throws Exception {
	Disposable disposable = Observable.create(emitter -> {
		emitter.onNext(1);
		emitter.setCancellable(() -> emitter.onError(new IllegalArgumentException()));
	}).subscribe(
		integer -> System.out.println(integer),
		throwable -> throwable.printStackTrace()
	);

	disposable.dispose();
}

The 2.x CreateEmitter ends up throwing the exception as an uncaught exception, the 1.x Emitter just returns if the Subscription has already been unsubscribed.

The Wiki at Entering the Reactive World makes it sounds like both should behave the same.

What would be the correct usage in 2.x to get the same behaviour as in 1.x and not crashing the Application with the emitter.onError(...) call?

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 19 (13 by maintainers)

Most upvoted comments

See FlowableEmitter.tryOnError and similar methods since 2.1.1 .

Working solution is to manually check if the emitter is not disposed before calling emitter.onError() to prevent the throw from happening. Mainly just wanted to point out the difference in Emitters between 1.x and 2.x even though the Wiki states they should be the same

I wrote a small Android library to get location data from Google Play Services via rx’s Observables and I’ve recently stumbled on this issue as well.

It happens, albeit rarely, at this line: https://github.com/julioromano/RxLocation/blob/v0.11.0-beta/rxlocation/src/main/java/net/kjulio/rxlocation/LastLocationHelper.java#L34

What I’m trying to do about it is to guard all calls to emitter.onError() with an if(!emitter.isDisposed()) although this seems to me like a code smell.

Apart from debugging purposes I don’t see any scenario where one would want a call to emitter.onError() to actually pass the error thru when the Observable has already been disposed.

I bet my knowledge on the topic is limited so can somebody explain when such a scenario would be desirable (except for the debugging case) ?

The problem is that you then sweep actual programming errors under the rug and don’t crash when there’s an actual problem.

On Mon, Jan 23, 2017, 10:47 PM Daniel Brain notifications@github.com wrote:

Also hit this error unexpectedly. A note or something would be helpful.

I’m of the opinion that if the emitter is disposed errors should be ignored. At most logged. Maybe my interpretation of disposed is wrong. To me I’m essentially saying “I don’t care about the result any more” when I call dispose. Bubbling it up as an uncaught exception after it’s disposed is treating it like it’s an important issue.

In an Android application this means a crash dialog for something that happened in the background from something that was cancelled.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/ReactiveX/RxJava/issues/4880#issuecomment-274698076, or mute the thread https://github.com/notifications/unsubscribe-auth/AAEEEYm9L24dx849xQORO_pkRDlOgi7oks5rVXRVgaJpZM4K6-q8 .

Some blocking APIs throw an InterruptedException or other type of exception when they get cancelled or interrupted by a cancel/dispose call. Since the downstream expressed it no longer wants events, the exception has nowhere to go. One of the design goals of 2.x is to not drop such exceptions like in 1.x because they could be actually important. Therefore, it is expected you check isDisposed() if you don’t want to relay such cancellation-triggered exceptions.

See the wiki for further details.

I’ve been considering a pair of new methods: tryOnNext and tryOnError that return false if the value/error can’t be delivered.

I get the idea of not sweeping errors under the rug but I ran into a race condition because of this. The code was working perfectly in testing but we got productive crashes. I investigated and found this to throw an exception:

if (! emitter.isDisposed()) {
    emitter.onError( new SomeException() );
}

It happens when the emitter is disposed between the if (! emitter.isDisposed()) and the emitter.onError and the emitter is disposed because there is a timeout on the rx flow that creates the emitter.

As a workaround I put a wrapper around the emitter that catches our custom exceptions in onError but it would be convenient to have an official way to ignore exceptions in the onError.

Also hit this error unexpectedly. A note or something would be helpful.

I’m of the opinion that if the emitter is disposed errors should be ignored. At most logged. Maybe my interpretation of disposed is wrong. To me I’m essentially saying “I don’t care about the result any more” when I call dispose. Bubbling it up as an uncaught exception after it’s disposed is treating it like it’s an important issue.

In an Android application this means a crash dialog for something that happened in the background from something that was cancelled.

See the wiki about 2.x and error handling.

Hmm, just seeing this issue myself now. I understand the point of view that not emitting the error would be “sweeping it under the rug”, but how do we deal with the error if we no longer have a handler? I’m essentially now “sweeping it under the rug” in the new global handler.

For example, I have an instance that downloads certain assets while the user is logged in. If this is happening while the user logs out the Disposable listening to this download is disposed. And then the download fails and throws because the user is no longer logged in. So here we have a guaranteed error thrown that we definitely do not care about (user no longer logged in) so we “sweep it under the rug” anyway.

In 2.x, we signal all undeliverable exceptions, such as yours to the RxJavaPlugins.onError which by default forwards it to the thread’s default uncaught error handler. You can install an error handler via RxJavaPlugins.setOnError and do something with such errors.

But as a library would I want to set an onError handler in the RxJavaPlugins?