reactor-core: OverflowException is thrown with bufferTimeout and delayElements operators
We have implemented a rate limiter using bufferTimeout
and delayElements
(see the TestRateLimiter
class in the code below).
Expected Behavior
No exceptions are thrown.
Actual Behavior
The following exception has thrown:
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
According to my tests it is happening when:
- the consumer is slower then the producer
- the producer is slow enough to reach the buffer timeout event in rate limiter instead of reaching the max buffer size
Based on that, I suspect that the problem was caused by multiple threads concurrently pushing the buffer downstream and it may become out of sync. Moving to single threaded processing in rate limiting solved the issue.
Steps to Reproduce
The following unit test can be used to reproduce the issue
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;
class RateLimiterOverflowTest {
private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
private static final int MESSAGE_COUNT = 50;
private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
private static final int LIMIT_FOR_DURATION = 5;
private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);
// when
var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(messageFlux);
}
@Test
void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
// given
var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));
// when
var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);
// then
verify(limitedMessageFlux);
}
private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
return createMessageFlux()
.transform(rateLimiter::limitRate)
.doOnError(error -> LOGGER.error(error.getMessage()))
.doOnNext(messageConsumer);
}
private void verify(Flux<String> messageFlux) {
StepVerifier.create(messageFlux)
.expectNextCount(MESSAGE_COUNT)
.verifyComplete();
}
private Flux<String> createMessageFlux() {
return Flux.create(sink -> {
IntStream.range(0, MESSAGE_COUNT)
.mapToObj(index -> "message" + index)
.forEach(message -> {
sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
sink.next(message);
});
sink.complete();
});
}
private static void sleep(Duration duration, String logMessage) {
try {
LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
Thread.sleep(duration.toMillis());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
public class TestRateLimiter {
private Integer limitForDuration;
private Duration duration;
public TestRateLimiter(Integer limitForDuration, Duration duration) {
this.limitForDuration = limitForDuration;
this.duration = duration;
}
public Flux<String> limitRate(Flux<String> flux) {
return flux
.bufferTimeout(limitForDuration, duration)
.delayElements(duration)
.flatMap(Flux::fromIterable);
}
}
}
Possible Solution
As a workaround Schedulers.single()
seems to be resolved the issue on bufferTimeout
and delayElements
operators:
.bufferTimeout(limitForDuration, duration, Schedulers.single())
.delayElements(duration, Schedulers.single())
Your Environment
- Reactor version(s) used: 3.4.17, 3.4.7
- JVM version (
java -version
): openjdk version “11.0.14” 2022-01-18 LTS OpenJDK Runtime Environment Zulu11.54+23-CA (build 11.0.14+9-LTS) OpenJDK 64-Bit Server VM Zulu11.54+23-CA (build 11.0.14+9-LTS, mixed mode) - OS and version (eg
uname -a
): Darwin Kernel Version 21.4.0: Mon Feb 21 20:35:58 PST 2022; root:xnu-8020.101.4~2/RELEASE_ARM64_T6000 arm64
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 4
- Comments: 15 (7 by maintainers)
@bensilvan we are working on it!
I also get this:
with reactor 3.5.4 when code is like:
I see few differences from originally posted issue:
sink
delayElements
in my casepublishOn
another thread+1 to solving this
Encountering the same issue and tried to use the proposed windowTimeout alternative. Got the same error as @almogtavor
Messages are emitted by a kafka-reactor source.
@janvojt
that is expected, you have more overhead with the new guarantees. Also, just a heads-up @chemicL is working on the port of windowTimeout functionality to the bufferTimeout, so it could be faster since there is no extra coordination as in the case of window which represents events as a flux
0 means concat map does not prefetch extra elements, so it is exactly 1 window at a time, thus only 1 window is collecting values into a list which is propagated later on into downstream
Feel free to create an issues if you have a reproducer
@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior? Also, when I tried this method, I’ve got an error of
reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
. Do you have an idea for why would this happen?This is unfortunately precisely one limitation of
buffer
-with-timeout: the time-related aspect is a hard constraint (I want to emit a buffer every x time period, no matter what) that doesn’t play well with slow downstream consumers and backpressure. If the downstream doesn’t make enough request, there’s no generic way of pausing time…I’m not sure there is anything we can do to efficiently fix that limitation, unfortunately 😞
NB:
windowTimeout
also suffers from the same limitation, only worse (because in windows the elements must be propagated down as fast and as close to realtime as possible). There is ongoing work on windowTimeout to try and improve the situation, but it has already taken months of effort…