reactor-core: Flux.mergeOrdered does not propagate onError when it happens
Flux.mergeOrdered does not propagate onError when it happens, but on the end of all inner fluxes. In case it is very long running, it can cause huge latencies in calculations as these are not notified soon some nested flux failed. It is even genrating values that are not correct as it cannot know order in case one inner flux fails. It simply treat is as completed.
Expected Behavior
mergeOrdered should emit onError immediately and call cancel on inner fluxes.
Actual Behavior
If plays completely all inner fluxes. In case one fail it just ignore it (treat as completed).
Steps to Reproduce
This example Should fail on number 2, but continue to evaluate and fails on end. It is simulating slow generator using delayElements.
@Test
public void test() {
try {
Flux<Integer> source1 = Flux.range(0, 20).delayElements(Duration.ofMillis(100));
Flux<Integer> source2 = Flux.range(0, 20)
.<Integer>handle((v , sink) -> {
if (v < 2) {
sink.next(v);
} else {
sink.error(new NumberFormatException());
}
});
Flux
.mergeOrdered(1, (i1, i2) -> i1.compareTo(i2), source1, source2)
.log()
.limitRate(1)
.blockLast(Duration.ofSeconds(3));
} catch (Exception e) {
e.printStackTrace();
throw e;
}
}
Output:
10-03-2021 11:33:29.421 [main] INFO reactor.Flux.PublishOn.1.info - | onSubscribe([Fuseable] FluxPublishOn.PublishOnSubscriber)
10-03-2021 11:33:29.426 [main] INFO reactor.Flux.PublishOn.1.info - | request(unbounded)
10-03-2021 11:33:29.429 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(0)
10-03-2021 11:33:29.429 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(0)
10-03-2021 11:33:29.429 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(1)
10-03-2021 11:33:29.430 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(1)
10-03-2021 11:33:29.434 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(2) <-- It should fail HERE
10-03-2021 11:33:29.434 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(3)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(4)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(5)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(6)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(7)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(8)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(9)
10-03-2021 11:33:29.435 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(10)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(11)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(12)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(13)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(14)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(15)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(16)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(17)
10-03-2021 11:33:29.436 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(18)
10-03-2021 11:33:29.437 [main] INFO reactor.Flux.PublishOn.1.info - | onNext(19)
10-03-2021 11:33:29.437 [main] ERROR reactor.Flux.PublishOn.1.error - | onError(java.lang.NumberFormatException)
10-03-2021 11:33:29.439 [main] ERROR reactor.Flux.PublishOn.1.error -
java.lang.NumberFormatException: null
at com.ca.apm.common.MergeOrderedTest.lambda$0(MergeOrderedTest.java:37)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.tryOnNext(FluxHandleFuseable.java:103)
at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.slowPath(FluxRange.java:303)
at reactor.core.publisher.FluxRange$RangeSubscriptionConditional.request(FluxRange.java:258)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.request(FluxHandleFuseable.java:243)
at reactor.core.publisher.FluxMergeOrdered$MergeOrderedInnerSubscriber.onSubscribe(FluxMergeOrdered.java:364)
at reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onSubscribe(FluxHandleFuseable.java:148)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:65)
- Reactor version(s) used: Reactor Core 3.3.12
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 15 (9 by maintainers)
Commits related to this issue
- Flux.mergeOrdered does not propagate onError when it happens #2640 — committed to koldat/reactor-core by koldat 3 years ago
- Flux.mergeOrdered does not propagate onError when it happens #2640 — committed to koldat/reactor-core by koldat 3 years ago
- Flux.mergeOrdered does not propagate onError when it happens #2640 — committed to koldat/reactor-core by koldat 3 years ago
- Flux.mergeOrdered does not propagate onError when it happens #2640 — committed to koldat/reactor-core by koldat 3 years ago
- Flux.mergeOrdered does not propagate onError when it happens #2640 — committed to koldat/reactor-core by koldat 3 years ago
@yamass I started work on PR, but then we decided to stop use (were not able to wait for release) of this operator so sorry I did not send it. Let me check again it should be a couple of lines. I will try to send patch today.