reactor-core: OverflowException is thrown with bufferTimeout and delayElements operators

We have implemented a rate limiter using bufferTimeout and delayElements (see the TestRateLimiter class in the code below).

Expected Behavior

No exceptions are thrown.

Actual Behavior

The following exception has thrown:

reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:233)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
	at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
	at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

According to my tests it is happening when:

  • the consumer is slower then the producer
  • the producer is slow enough to reach the buffer timeout event in rate limiter instead of reaching the max buffer size

Based on that, I suspect that the problem was caused by multiple threads concurrently pushing the buffer downstream and it may become out of sync. Moving to single threaded processing in rate limiting solved the issue.

Steps to Reproduce

The following unit test can be used to reproduce the issue

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;

import java.time.Duration;
import java.util.function.Consumer;
import java.util.stream.IntStream;

class RateLimiterOverflowTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(RateLimiterOverflowTest.class);
    private static final int MESSAGE_COUNT = 50;
    private static final Duration MESSAGE_PRODUCER_DELAY = Duration.ofMillis(5);
    private static final Duration RATE_LIMITER_DURATION = Duration.ofMillis(5);
    private static final int LIMIT_FOR_DURATION = 5;
    private static final Consumer<String> SLOW_CONSUMER = message -> sleep(Duration.ofMillis(50), "consume " + message);

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiter() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, RATE_LIMITER_DURATION);

        // when
        var messageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(messageFlux);
    }

    @Test
    void shouldLimitRateWithSlowConsumerUsingEnabledRateLimiterWithExtremelyLowDuration() {
        // given
        var rateLimiter = new TestRateLimiter(LIMIT_FOR_DURATION, Duration.ofNanos(1));

        // when
        var limitedMessageFlux = createLimitedMessage(rateLimiter, SLOW_CONSUMER);

        // then
        verify(limitedMessageFlux);
    }

    private Flux<String> createLimitedMessage(TestRateLimiter rateLimiter, Consumer<String> messageConsumer) {
        return createMessageFlux()
                .transform(rateLimiter::limitRate)
                .doOnError(error -> LOGGER.error(error.getMessage()))
                .doOnNext(messageConsumer);
    }

    private void verify(Flux<String> messageFlux) {
        StepVerifier.create(messageFlux)
                .expectNextCount(MESSAGE_COUNT)
                .verifyComplete();
    }

    private Flux<String> createMessageFlux() {
        return Flux.create(sink -> {
            IntStream.range(0, MESSAGE_COUNT)
                    .mapToObj(index -> "message" + index)
                    .forEach(message -> {
                        sleep(MESSAGE_PRODUCER_DELAY, "generate " + message);
                        sink.next(message);
                    });
            sink.complete();
        });
    }

    private static void sleep(Duration duration, String logMessage) {
        try {
            LOGGER.info("{} [delay={}ms]", logMessage, duration.toMillis());
            Thread.sleep(duration.toMillis());
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public class TestRateLimiter {
        private Integer limitForDuration;
        private Duration duration;

        public TestRateLimiter(Integer limitForDuration, Duration duration) {
            this.limitForDuration = limitForDuration;
            this.duration = duration;
        }

        public Flux<String> limitRate(Flux<String> flux) {
            return flux
                    .bufferTimeout(limitForDuration, duration)
                    .delayElements(duration)
                    .flatMap(Flux::fromIterable);
        }
    }
}

Possible Solution

As a workaround Schedulers.single() seems to be resolved the issue on bufferTimeout and delayElements operators:

.bufferTimeout(limitForDuration, duration, Schedulers.single())
.delayElements(duration, Schedulers.single())

Your Environment

  • Reactor version(s) used: 3.4.17, 3.4.7
  • JVM version (java -version): openjdk version “11.0.14” 2022-01-18 LTS OpenJDK Runtime Environment Zulu11.54+23-CA (build 11.0.14+9-LTS) OpenJDK 64-Bit Server VM Zulu11.54+23-CA (build 11.0.14+9-LTS, mixed mode)
  • OS and version (eg uname -a): Darwin Kernel Version 21.4.0: Mon Feb 21 20:35:58 PST 2022; root:xnu-8020.101.4~2/RELEASE_ARM64_T6000 arm64

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 4
  • Comments: 15 (7 by maintainers)

Most upvoted comments

@bensilvan we are working on it!

I also get this:

... ERROR reactor.core.publisher.Operators Operator called default onErrorDropped
reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
    at reactor.core.Exceptions.failWithOverflow(Exceptions.java:237)
    at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.flushCallback(FluxBufferTimeout.java:227)
    at reactor.core.publisher.FluxBufferTimeout$BufferTimeoutSubscriber.lambda$new$0(FluxBufferTimeout.java:158)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.lang.Thread.run(Thread.java:829)

with reactor 3.5.4 when code is like:

Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
Disposable sinkSubscription = sink.asFlux()
  .bufferTimeout(10, ofMillis(1000))
  .publishOn(Schedulers.newSingle("..."))
  .flatMap(this::doSomething)
  .onErrorContinue((e, o) -> log.error(e))
  .subscribe();

I see few differences from originally posted issue:

  • I use sink
  • no delayElements in my case
  • publishOn another thread

+1 to solving this

Encountering the same issue and tried to use the proposed windowTimeout alternative. Got the same error as @almogtavor

2023-04-08 23:31:37.832 ERROR   --- [-367412941877-1] reactor.core.publisher.Operators         : Operator called default onErrorDropped

reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
	at reactor.core.Exceptions.failWithOverflow(Exceptions.java:236)
	at reactor.core.publisher.FluxWindowTimeout$WindowTimeoutSubscriber.onNext(FluxWindowTimeout.java:1901)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drainRegular(FluxGroupBy.java:567)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.drain(FluxGroupBy.java:652)
	at reactor.core.publisher.FluxGroupBy$UnicastGroupedFlux.onNext(FluxGroupBy.java:692)
	at reactor.core.publisher.FluxGroupBy$GroupByMain.onNext(FluxGroupBy.java:211)
	at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
	at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113)
	at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345)
	at reactor.core.publisher.FluxUsing$UsingFuseableSubscriber.onNext(FluxUsing.java:353)
	at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:453)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:724)
	at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onNext(FluxFlattenIterable.java:256)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runBackfused(FluxPublishOn.java:484)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:521)
	at reactor.core.scheduler.ExecutorScheduler$ExecutorTrackedRunnable.run(ExecutorScheduler.java:192)
	at reactor.core.scheduler.ImmediateScheduler$ImmediateSchedulerWorker.schedule(ImmediateScheduler.java:84)
	at reactor.core.scheduler.SingleWorkerScheduler.execute(SingleWorkerScheduler.java:64)
	at reactor.core.scheduler.ExecutorScheduler$ExecutorSchedulerWorker.schedule(ExecutorScheduler.java:252)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.trySchedule(FluxPublishOn.java:312)
	at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:237)
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
	at reactor.core.publisher.SinkManyUnicast.drainRegular(SinkManyUnicast.java:282)
	at reactor.core.publisher.SinkManyUnicast.drain(SinkManyUnicast.java:364)
	at reactor.core.publisher.SinkManyUnicast.tryEmitNext(SinkManyUnicast.java:237)
	at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
	at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:27)
	at reactor.kafka.receiver.internals.ConsumerEventLoop$PollEvent.run(ConsumerEventLoop.java:371)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
	at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Messages are emitted by a kafka-reactor source.

@janvojt

roughly 5-6 times slower than bufferUntilChanged (in the cases when it did not crash obviously…).

that is expected, you have more overhead with the new guarantees. Also, just a heads-up @chemicL is working on the port of windowTimeout functionality to the bufferTimeout, so it could be faster since there is no extra coordination as in the case of window which represents events as a flux

@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior?

0 means concat map does not prefetch extra elements, so it is exactly 1 window at a time, thus only 1 window is collecting values into a list which is propagated later on into downstream

Also, when I tried this method, I’ve got an error of reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...). Do you have an idea for why would this happen?

Feel free to create an issues if you have a reproducer

@OlegDokuka What does the prefetch value of 0 represent? Can you explain how does this accomplishes the buffer timeout behavior? Also, when I tried this method, I’ve got an error of reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...). Do you have an idea for why would this happen?

This is unfortunately precisely one limitation of buffer-with-timeout: the time-related aspect is a hard constraint (I want to emit a buffer every x time period, no matter what) that doesn’t play well with slow downstream consumers and backpressure. If the downstream doesn’t make enough request, there’s no generic way of pausing time…

I’m not sure there is anything we can do to efficiently fix that limitation, unfortunately 😞

NB: windowTimeout also suffers from the same limitation, only worse (because in windows the elements must be propagated down as fast and as close to realtime as possible). There is ongoing work on windowTimeout to try and improve the situation, but it has already taken months of effort…