reactor-core: onErrorContinue: limit of 256 errors in conjunction with Spring WebClient
Expected Behavior
onErrorContinue() should work for more than 256 errors.
Actual Behavior
onErrorContinue() blocks the Publisher after 256 errors, in case Flux.flatMap(…) is used to call Spring WebClient.
Steps to Reproduce
Referencing to https://github.com/reactor/reactor-core/issues/2011 the following now fails (as expected):
@Test
public void repoCase() throws InterruptedException {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
Flux
.fromStream(IntStream.range(0, numberOfRequests).boxed())
.map(sink::add)
.flatMap(i -> Mono.error(new Exception("any")))
.onErrorContinue((throwable, o) -> {})
.subscribe();
Thread.sleep(1000);
Assertions.assertNotEquals(numberOfRequests, sink.size());
Assertions.assertEquals(256, sink.size());
}
However, slightly modifying this, e.g. calling the Spring WebClient to a non-existing URI (in our production system we saw the same with 5xx server errors, in combination with Mono.retryWhen(...)
until a RetryExhaustedException was thrown), still lets the publisher stop publishing after 256 onError
signals.
@Test
public void repoCaseWebClient() throws InterruptedException {
int numberOfRequests = 500;
Set<Integer> sink = new HashSet<>();
Flux
.fromStream(IntStream.range(0, numberOfRequests).boxed())
.doOnNext(System.out::println)
.map(sink::add)
.flatMap(i -> webClientBuilder.build().get().uri("http://not-existing").exchange().doOnError(System.out::println))
.onErrorContinue((throwable, o) -> {System.out.println(throwable.getMessage());})
.subscribe();
Thread.sleep(1000);
Assertions.assertNotEquals(numberOfRequests, sink.size());
Assertions.assertEquals(256, sink.size());
}
I added some System.out
s for clarification. Interestingly, the error of RequestHeadersSpec.exchange()
appears only once in the stdout.
Possible Solution
Your Environment
- reactor-core version used: 3.3.10
- spring-webflux version: 2.2.10
- reactor-netty version: 0.9.12
- JVM version (
java -version
): 1.8.0_232 - OS and version (eg
uname -a
): Darwin Kernel Version 20.2.0: Wed Dec 2 20:39:59 PST 2020; root:xnu-7195.60.75~1/RELEASE_X86_64 x86_64
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 15 (8 by maintainers)
yeah, I understand that frustration and
onErrorContinue
is something I regret adding to be honest, at least in its current form where nothing at first glance designates it as a last resort / hack.It was indeed made for the case of “long running
Flux
that should never stop”, as a better performing alternative to doing all risky steps inflatMap(v -> doRiskyStep(v).onErrorResume(helper::logThrowableAndReturnEmpty))
.But since the Reactive Streams specification mandates that if a
Flux<T>
emits anonError
, thatFlux
is terminated, there was no composition possible here: each and every operator implementation that can trigger anonError
from an incomingonNext
must “opt-in” into taking that scenario into account where it avoids signalling its downstream withonError
, pushes the error in a side channel (theConsumer
) and continue processing…In a future version, we are thinking about changing the name of the pseudo-operator to better reflect its hacky /unsafe nature.