reactor-core: Subscriber error is causing share operator to fail the source flux and cancels all the clients

According to the share operator JavaDocs: “When all subscribers have cancelled it will cancel the source”. My understanding of such statement is that problems of each and every subscriber are handled in isolation, and should not impact shared-flux itself. In fact, if I decide to create an empty subscription with no-args subscribe call, the shared flux should live until complete or error signal is propagated by the source.

Flux<String>shared = Flux
                .create(producer::set)
                .log()
                .share();
shared.subscribe(); // Stay alive until producer issues complete or error

Unfortunately, this is not true when one of the subscribers is going to fail without proper error handler provided. In such case, the shared flux and all the remaining subscribers will be canceled as well. This renders share as unusable and enforces custom implementation of the same behavior.

Expected Behavior

Subscriber error should cancel subscriber’s flux and not shared flux.

Actual Behavior

Subscriber error is cancelling shared flux and thus all other subscribers are cancelled.

Steps to Reproduce

 package test;

import java.util.concurrent.atomic.AtomicReference;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.BaseSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.test.StepVerifier;

class SharedProblem {
    private static final Logger LOGGER = LoggerFactory.getLogger(SharedProblem.class.getName());
    private static final String TEST_MESSAGE = "Test";

    @Test
    void shouldNotCancelSharedWhenNoSubscriberError() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe(); // This is keep alive subscription

        flux.subscribe(o -> LOGGER.debug("First client : {}", o));
        flux.subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    @Test
    void shouldNotCancelSharedWhenSubscriberThrowsExceptionAndErrorHandlerIsAbsent() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe(); // This is keep alive subscription

        flux
                .log()
                .subscribe(new BaseSubscriber<String>() {
                    @Override
                    protected void hookOnNext(String value) {
                        throw new IllegalStateException("I'm failing");
                    }

                    @Override
                    public void dispose() {
                        super.dispose();
                    }
                });
        flux
                .log()
                .subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    @Test
    void shouldNotCancelSharedWhenSubscriberThrowsExceptionAndErrorHandlerIsPresent() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe(); // This is keep alive subscription

        flux
                .log()
                .subscribe(new BaseSubscriber<String>() {
                    @Override
                    protected void hookOnNext(String value) {
                        throw new IllegalStateException("I'm failing");
                    }

                    @Override
                    protected void hookOnError(Throwable throwable) {
                        LOGGER.error("Error {}", throwable);
                    }

                    @Override
                    public void dispose() {
                        super.dispose();
                    }
                });
        flux
                .log()
                .subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    @Test
    void shouldNotCancelSharedWhenConsumerThrowsExceptionAndErrorHandlerIsAbsent() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe(); // This is keep alive subscription

        flux
                .log()
                .subscribe(e -> {
                    throw new IllegalStateException("I'm failing");
                });
        flux
                .log()
                .subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    @Test
    void shouldNotCancelSharedWhenConsumerThrowsExceptionAndErrorHandlerIsPresent() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe(); // This is keep alive subscription

        flux
                .log()
                .subscribe(e -> {
                    throw new IllegalStateException("I'm failing");
                }, e -> LOGGER.debug("Error handler", e));
        flux
                .log()
                .subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    @Test
    void shouldNotCancelSharedWorkAround() {
        final AtomicReference<FluxSink<String>> producer = new AtomicReference<>();
        final Flux<String> flux = createSharedFlux(producer);
        flux.subscribe();// This is keep alive subscription

        flux
                .doOnNext(o -> {
                    throw new IllegalStateException("I'm failing");
                })
                .onErrorContinue((e, o) -> LOGGER.debug("I've failed, but I can survive {}", o, e))
                .subscribe();
        flux
                .log()
                .subscribe(o -> LOGGER.debug("Second client : {}", o));

        StepVerifier
                .create(flux)
                .then(() -> sendMessage(producer))
                .then(() -> sendComplete(producer))
                .expectNext(TEST_MESSAGE)
                .expectComplete()
                .log()
                .verify();
    }

    private Flux<String> createSharedFlux(AtomicReference<FluxSink<String>> producer) {
        return Flux
                .create(producer::set)
                .log()
                .share();
    }

    private FluxSink<String> sendMessage(AtomicReference<FluxSink<String>> producer) {
        return producer
                .get()
                .next(TEST_MESSAGE);
    }

    private void sendComplete(AtomicReference<FluxSink<String>> producer) {
        producer
                .get()
                .complete();
    }

}

Possible Solution

The best possible solution is to have the same response on errors like in groupBy operator. In groupBy when one of the generated streams is failing, it is recreated and may continue its work. The most important here is that source flux is not impacted. Similar behavior is expected from the share operator. The source Flux should survive and continue its work, and just the failing client shall be canceled. One of the provided test cases is showing how one can make sure that the failing Flux is not going to break anything else. However, there is no real WA when third party subscriber is involved e.g. WebFlux. In fact, the problem was detected in code that takes advantage of Spring WebFlux. Most likely, the root cause is that client has detached and was canceled, there was no new element propagated for long time period, and WebFlux (or servlet container) has triggered some timeout which changed the subscriber’s state further. As a result, and incoming element has triggered an exception in WebFlux subscriber and propagated the failure to the shared flux.

Your Environment

  • Reactor version(s) used: Reactor version(s) used: 3.3.1.RELEASE
  • JVM version (javar -version): 1.8.0_202-release-1483-b58 amd64
  • OS and version (eg uname -a): Windows Version 6.1 (Build 7601: SP1)

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 1
  • Comments: 21 (10 by maintainers)

Most upvoted comments

@phnxfire I would open a question on stackoverflow tagged with spring-webflux + reactor-netty so that either the spring team or @violetagg can share some insight on that.

@simonbasle thanks for your answer. This try didn’t resolve the issue yet. at what moment I should set the .autoConnect(1) ? as soon as I create the publisher object?

In my scenario, I have a class acting as the websocket handler, which instantiate a messagePublisher that implements Consumer<FluxSink<DataBuffer>>.

Then I have (in my handler) a Flux<DataBuffer> publisher = Flux.create(messagePublisher).share();

messagePublisher uses :

private final BlockingQueue<DataBuffer> queue = new LinkedBlockingQueue<>(); private final Executor executor = Executors.newSingleThreadExecutor();

then the override method ‘accept’ is called that’s when I get a FluxSink<dataBuffer> and try to sink.next() -> Here it identifies that the state of the Flux is cancelled.

The example I’m working on is pretty much this one:

https://github.com/kkojot/webflux-vue-websocket/blob/master/spring-backend/src/main/java/com/kojotdev/blog/webfluxvuewebsocket/GreetingsPublisher.java

http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/

Thanks for your help.

@phnxfire share() is an alias for publish().refCount(1), and refCount has this behavior of detecting cancellations and cancelling the source once subscribers reach 0. you could try to use .publish().autoConnect(1) instead, since autoConnect doesn’t care about cancellations (ie client “disconnections”).

Indeed, the root cause of our issue is an error in subscriber’s onNext. The point is that you cannot control proper implementation of subscribers, and as interfaces are allowing to create subscribers without error handlers, one can expect the worst. In my case the intention is to have infinite event stream which is consumed only when one is interested and subscribed. One can decide whether he is willing to observe the event stream or not. It is unacceptable to have situation when such a observer is breaking the source, and interfere with other subscribers. On the other hand, if a subscriber’a fault should break everything then the issue is in groupBy operator which is able to silently recreate GroupedFlux and continue without fail.