reactor-core: onErrorContinue: limit of 256 errors in conjunction with Spring WebClient

Expected Behavior

onErrorContinue() should work for more than 256 errors.

Actual Behavior

onErrorContinue() blocks the Publisher after 256 errors, in case Flux.flatMap(…) is used to call Spring WebClient.

Steps to Reproduce

Referencing to https://github.com/reactor/reactor-core/issues/2011 the following now fails (as expected):

    @Test
    public void repoCase() throws InterruptedException {
        int numberOfRequests = 500;
        Set<Integer> sink = new HashSet<>();

        Flux
                .fromStream(IntStream.range(0, numberOfRequests).boxed())
                .map(sink::add)
                .flatMap(i -> Mono.error(new Exception("any")))
                .onErrorContinue((throwable, o) -> {})
                .subscribe();

        Thread.sleep(1000);
        Assertions.assertNotEquals(numberOfRequests, sink.size());
        Assertions.assertEquals(256, sink.size());
    }

However, slightly modifying this, e.g. calling the Spring WebClient to a non-existing URI (in our production system we saw the same with 5xx server errors, in combination with Mono.retryWhen(...) until a RetryExhaustedException was thrown), still lets the publisher stop publishing after 256 onError signals.

    @Test
    public void repoCaseWebClient() throws InterruptedException {
        int numberOfRequests = 500;
        Set<Integer> sink = new HashSet<>();

        Flux
                .fromStream(IntStream.range(0, numberOfRequests).boxed())
                .doOnNext(System.out::println)
                .map(sink::add)
                .flatMap(i -> webClientBuilder.build().get().uri("http://not-existing").exchange().doOnError(System.out::println))
                .onErrorContinue((throwable, o) -> {System.out.println(throwable.getMessage());})
                .subscribe();

        Thread.sleep(1000);
        Assertions.assertNotEquals(numberOfRequests, sink.size());
        Assertions.assertEquals(256, sink.size());
    }

I added some System.outs for clarification. Interestingly, the error of RequestHeadersSpec.exchange() appears only once in the stdout.

Possible Solution

Your Environment

  • reactor-core version used: 3.3.10
  • spring-webflux version: 2.2.10
  • reactor-netty version: 0.9.12
  • JVM version (java -version): 1.8.0_232
  • OS and version (eg uname -a): Darwin Kernel Version 20.2.0: Wed Dec 2 20:39:59 PST 2020; root:xnu-7195.60.75~1/RELEASE_X86_64 x86_64

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

yeah, I understand that frustration and onErrorContinue is something I regret adding to be honest, at least in its current form where nothing at first glance designates it as a last resort / hack.

It was indeed made for the case of “long running Flux that should never stop”, as a better performing alternative to doing all risky steps in flatMap(v -> doRiskyStep(v).onErrorResume(helper::logThrowableAndReturnEmpty)).

But since the Reactive Streams specification mandates that if a Flux<T> emits an onError, that Flux is terminated, there was no composition possible here: each and every operator implementation that can trigger an onError from an incoming onNext must “opt-in” into taking that scenario into account where it avoids signalling its downstream with onError, pushes the error in a side channel (the Consumer) and continue processing…

In a future version, we are thinking about changing the name of the pseudo-operator to better reflect its hacky /unsafe nature.