reactor-core: bufferTimeout overflows when sink produces requested amount of data
Expected behavior
Calling sink.next
sink.requestedFromDownstream()
times should never result in an error.
Actual behavior
A combination of delayed producer, multiple, slow consumers, and bufferTimeout results in reactor.core.Exceptions$OverflowException: Could not emit buffer due to lack of requests
(see steps.)
Steps to reproduce
JUnit test case:
@Test
void bufferAllowsRequested() throws InterruptedException {
ExecutorService workers = Executors.newFixedThreadPool(4);
AtomicBoolean down = new AtomicBoolean();
Flux.create(sink -> {
produceRequestedTo(down, sink);
}).bufferTimeout(400, Duration.ofMillis(200))
.doOnError(t -> {
t.printStackTrace();
down.set(true);
})
.publishOn(Schedulers.fromExecutor(workers), 4)
.subscribe(this::processBuffer);
Thread.sleep(3500);
workers.shutdownNow();
assertFalse(down.get());
}
private void processBuffer(List<Object> buf) {
System.out.println("Received " + buf.size());
try {
Thread.sleep(400);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void produceRequestedTo(AtomicBoolean down, FluxSink<Object> sink) {
Thread thread = new Thread(() -> {
while(!Thread.interrupted()) {
try {
if (sink.requestedFromDownstream() > 0) {
System.out.println("Requested " + sink.requestedFromDownstream());
Thread.sleep(new Random().nextInt(1000));
IntStream.range(0, Math.min(1000, (int) sink.requestedFromDownstream())).forEach(sink::next);
} else {
Thread.sleep(200);
}
} catch (InterruptedException ignored) {
break;
} catch (Exception e) {
e.printStackTrace();
down.set(true);
}
}
});
thread.setDaemon(true);
thread.start();
}
Reactor Core version
3.2.6
JVM version (e.g. java -version
)
1.8.0_201
I think what is happening is that the slow producer is not quick enough, so bufferTimeout times out, produces an undersized buffer, which reduces upstream demand by 1 buffer, and thus we technically have overflow when exactly N * bufferSize items arrive.
The use case is adapting any pull based source (Kafka, a database, etc.) where we can and do want to respect back-pressure, and have plenty of data to meet the request with. All the onBackpresureBuffer
methods are inappropriate as the source we are pulling from has far more data than we can hold in memory, and data loss is not an option.
IMO bufferTimeout should be prepared to buffer up to the amount requested before it’s first timeout following a full buffer. This is reasonably bounded demand, and in practice ought not to result in more items residing in memory than were requested.
About this issue
- Original URL
- State: closed
- Created 5 years ago
- Reactions: 2
- Comments: 16 (8 by maintainers)
Commits related to this issue
- fix #1557 FluxBufferTimeout greediness tweaks - Rename variable - Update copyright headers - Check added value in test — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
- Merge pull request #1 from EleanorRoseLegg/1557-buffer-timeout-greediness fix #1557 FluxBufferTimeout greediness — committed to EleanorRoseLegg/reactor-core by EleanorRoseLegg 5 years ago
- fix #1557: Use StepVerifier for unit test - The unit test has been replaced by one which makes use of the StepVerifier. - Cosmetic fix to follow braces style — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
- fix #1557: Use StepVerifier for unit test - The unit test has been replaced by one which makes use of the StepVerifier. - Cosmetic fix to follow braces style — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
- fix #1557 - Throw exception on receipt of unrequested item - Simplify decrementing of outstanding value - Throw an exception on receipt of an unexpected item, as determined by the number of outsta... — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
- fix #1557 - early return from callback on error — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
- fix #1557 Call onDiscard after inError when unrequested element received - Call onDiscard for the unrequested element - Call onDiscardMultiple for the values already in the buffer - This change fi... — committed to EleanorRoseLegg/reactor-core by EleanorLegg 5 years ago
We ended up rolling our own BatchSubscriber based on the one @deripas linked to. It isn’t a very satisfying solution, but it did get us unblocked.
I have done some digging into this issue, and discovered that the scenario we are running into is actually explicitly tested for in the unit tests (FluxBufferTimeoutTest::bufferWithTimeoutThrowingExceptionOnTimeOrSizeIfDownstreamDemandIsLow, https://github.com/reactor/reactor-core/blob/master/reactor-core/src/test/java/reactor/core/publisher/FluxBufferTimeoutTest.java).
Probably the easiest ‘fix’ would be to document this behaviour. I’m not sure how you would succinctly describe the behaviour to people looking to use the function, and I suspect that for a lot of use cases this isn’t desired.
Actually fixing the issue is a bit of thornier problem. For one, there may be people relying on the existing behaviour, and it would probably be bad form to break things for them. And then there are the difficulties in actually implementing the fix.
I think I have identified two aspects of FluxBufferTimeout that would need addressing:
It is very greedy, to the extent that the backpressure doesn’t really work
When downstream requests n items with a call FluxBufferTimout, n * batchSize items are requested from upstream. This happens regardless of how many unfulfilled items have already been requested from upstream. For instance, consider a bufferTimeout with a batchSize of 10. FluxBufferTimout::request(1) results in 10 items requested from upstream. A single item is delivered before the timeout, leaving 9 outstanding. This one item is sent downstream, resulting in another call to FluxBufferTimout::request(1), which requests another 10 from upstream. There are now 19 outstanding requests. This is the scenario revealed in the logging performed by @vladykin’s example.
I think this could probably be fixed without breaking anybody. My proposed solution would be to keep an internal record of how many items have been requested from upstream, and only request enough from upstream to bring the number outstanding up to (requested * batchSize)
There would need to be internal buffering
Consider the following scenario. FluxBufferTimout::request(5) is called with a batchSize of 10, resulting in 50 items being requested from upstream. 5 items are delivered, with the buffer timing out between each one. Downstream takes its time processing these five batches, and does not request any more. There are still 45 outstanding items requested from upstream, enough for four full batches and one partial one. When these items are delivered from upstream, the current behaviour is to error. The behaviour that we would like would be for those batches to be stored on an internal queue, with new requests from downstream being fulfilled from that queue. The scheduling of the flush on timeout would need to be disabled while that internal queue has items on it.
I think this is probably the much more complicated change to implement, as well as breaking existing behaviour for those who rely on the current implementation
Pleased to know it’s not just me!
On Tue, 30 Jun 2020, 11:15 Maksym Hryhoriev, notifications@github.com wrote: