reactor-core: bufferTimeout overflows when sink produces requested amount of data

Expected behavior

Calling sink.next sink.requestedFromDownstream() times should never result in an error.

Actual behavior

A combination of delayed producer, multiple, slow consumers, and bufferTimeout results in reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests (see steps.)

Steps to reproduce

JUnit test case:

    @Test
    void bufferAllowsRequested() throws InterruptedException {
        ExecutorService workers = Executors.newFixedThreadPool(4);

        AtomicBoolean down = new AtomicBoolean();

        Flux.create(sink -> {
            produceRequestedTo(down, sink);
        }).bufferTimeout(400, Duration.ofMillis(200))
                .doOnError(t -> {
                    t.printStackTrace();
                    down.set(true);
                })
                .publishOn(Schedulers.fromExecutor(workers), 4)
                .subscribe(this::processBuffer);

        Thread.sleep(3500);

        workers.shutdownNow();

        assertFalse(down.get());
    }

    private void processBuffer(List<Object> buf) {
        System.out.println("Received " + buf.size());
        try {
            Thread.sleep(400);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void produceRequestedTo(AtomicBoolean down, FluxSink<Object> sink) {
        Thread thread = new Thread(() -> {
            while(!Thread.interrupted()) {
                try {
                    if (sink.requestedFromDownstream() > 0) {
                        System.out.println("Requested " + sink.requestedFromDownstream());
                        Thread.sleep(new Random().nextInt(1000));
                        IntStream.range(0, Math.min(1000, (int) sink.requestedFromDownstream())).forEach(sink::next);
                    } else {
                        Thread.sleep(200);
                    }
                } catch (InterruptedException ignored) {
                    break;
                } catch (Exception e) {
                    e.printStackTrace();
                    down.set(true);
                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

Reactor Core version

3.2.6

JVM version (e.g. java -version)

1.8.0_201

I think what is happening is that the slow producer is not quick enough, so bufferTimeout times out, produces an undersized buffer, which reduces upstream demand by 1 buffer, and thus we technically have overflow when exactly N * bufferSize items arrive.

The use case is adapting any pull based source (Kafka, a database, etc.) where we can and do want to respect back-pressure, and have plenty of data to meet the request with. All the onBackpresureBuffer methods are inappropriate as the source we are pulling from has far more data than we can hold in memory, and data loss is not an option.

IMO bufferTimeout should be prepared to buffer up to the amount requested before it’s first timeout following a full buffer. This is reasonably bounded demand, and in practice ought not to result in more items residing in memory than were requested.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Reactions: 2
  • Comments: 16 (8 by maintainers)

Commits related to this issue

Most upvoted comments

We ended up rolling our own BatchSubscriber based on the one @deripas linked to. It isn’t a very satisfying solution, but it did get us unblocked.

I have done some digging into this issue, and discovered that the scenario we are running into is actually explicitly tested for in the unit tests (FluxBufferTimeoutTest::bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow, https://github.com/reactor/reactor-core/blob/master/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java).

Probably the easiest ‘fix’ would be to document this behaviour. I’m not sure how you would succinctly describe the behaviour to people looking to use the function, and I suspect that for a lot of use cases this isn’t desired.

Actually fixing the issue is a bit of thornier problem. For one, there may be people relying on the existing behaviour, and it would probably be bad form to break things for them. And then there are the difficulties in actually implementing the fix.

I think I have identified two aspects of FluxBufferTimeout that would need addressing:

It is very greedy, to the extent that the backpressure doesn’t really work

When downstream requests n items with a call FluxBufferTimout, n * batchSize items are requested from upstream. This happens regardless of how many unfulfilled items have already been requested from upstream. For instance, consider a bufferTimeout with a batchSize of 10. FluxBufferTimout::request(1) results in 10 items requested from upstream. A single item is delivered before the timeout, leaving 9 outstanding. This one item is sent downstream, resulting in another call to FluxBufferTimout::request(1), which requests another 10 from upstream. There are now 19 outstanding requests. This is the scenario revealed in the logging performed by @vladykin’s example.

I think this could probably be fixed without breaking anybody. My proposed solution would be to keep an internal record of how many items have been requested from upstream, and only request enough from upstream to bring the number outstanding up to (requested * batchSize)

There would need to be internal buffering

Consider the following scenario. FluxBufferTimout::request(5) is called with a batchSize of 10, resulting in 50 items being requested from upstream. 5 items are delivered, with the buffer timing out between each one. Downstream takes its time processing these five batches, and does not request any more. There are still 45 outstanding items requested from upstream, enough for four full batches and one partial one. When these items are delivered from upstream, the current behaviour is to error. The behaviour that we would like would be for those batches to be stored on an internal queue, with new requests from downstream being fulfilled from that queue. The scheduling of the flush on timeout would need to be disabled while that internal queue has items on it.

I think this is probably the much more complicated change to implement, as well as breaking existing behaviour for those who rely on the current implementation

Pleased to know it’s not just me!

On Tue, 30 Jun 2020, 11:15 Maksym Hryhoriev, notifications@github.com wrote:

I have this error with reactor Kafka 1.2.2 and reactor core 3.3.7:

Flux.just(receiverOptions) .map(KafkaReceiver::create) .flatMap(kafkaReceiver -> kafkaReceiver.receive() .bufferTimeout(batchSize, Duration.ofMillis(50)) .concatMap(batch -> processBatch(kafkaReceiver, batch)) ).doOnError(err -> LOGGER.error(“Unexpected error in consuming Kafka events”, err))

I read a batch of Kafka messages using kafkaReceiver.receive() and then I want to process them all and only after it to ask for another batch. And processBatch may take a long time (200 ms - 10 sec)

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/reactor/reactor-core/issues/1557#issuecomment-651701026, or unsubscribe https://github.com/notifications/unsubscribe-auth/AEMXIURUODVP2EUFOHAZEWTRZG3MNANCNFSM4G6LK6KQ .