reactor-core: WindowTimeout backpressure
Expected behavior
windowTimeout
respects the backpressure
Actual behavior
Not verified if expected or not:
The receiver is overrun by more signals than expected (bounded queue...)
Steps to reproduce
Flux
.generate(sink -> {
System.out.println("generate");
sink.next(UUID.randomUUID().toString());
})
.log("source", Level.INFO, SignalType.REQUEST)
.limitRate(5)
.windowTimeout(10, Duration.ofSeconds(2))
.concatMap(batch -> batch.delayElements(Duration.ofMillis(200)))
.blockLast()
Reactor Core version
3.1.4.RELEASE
JVM version (e.g. java -version
)
Any
About this issue
- Original URL
- State: closed
- Created 6 years ago
- Reactions: 10
- Comments: 31 (10 by maintainers)
Commits related to this issue
- JAMES-3184 Throttling should work for long running streams As described in https://github.com/reactor/reactor-core/issues/1099 windowTimeout does not play well with backpressure. When the number of w... — committed to chibenwa/james-project by chibenwa 4 years ago
- JAMES-3184 Throttling should work for long running streams As described in https://github.com/reactor/reactor-core/issues/1099 windowTimeout does not play well with backpressure. When the number of w... — committed to apache/james-project by chibenwa 4 years ago
- Implement variant of windowTimeout with fairBackpressure (#3054) This commit adds a variant of windowTimeout that tries to honor backpressure of a slow downstream Subscriber better than the current... — committed to reactor/reactor-core by OlegDokuka 2 years ago
The workaround is for the use case and not for the reported issue. The issue remains and hurts really hard.
Hey, @JonathanGiles.
Bear with us, I’m working on new implementation for windowTimeout, and will drop a PR soon. Stay tuned.
In general, that is going to be a completelly new operator written from scratch. The implementation will not send new window until there is a demand from downstream. + in combination with prefetch strategy we should be able to have reflection of that missed demand to upstream, so the upstream should stop sending message eventualy. This is a compromise solution since new window is technically collecting elements, though the timer for next timeout is ignored and may be deffered till the next request from the downstream.
I will also workout an alternative strategy which is deferring window Close until there is no demand for the next window. In that case the timeout constrains will also be broken and window will remain open until the next request from the downstream
@bsideup I’ll try to descibe a, for us, common processing flow:
We have been using reactor-kafka (altough it has proven somewhat unstable with respect to error handling). Generally, we split the flow into separate rails at 1 by grouping on partition. For each rail, we batch the incoming events, up to some limit, before do a batch-insert to our database. This enables us to handle both real-time peek hours as well reprocess a topic, when needed, in a timely manner.
For latency reasons (as our load various throughout the day) we also want to limit the batching with respect to time. But the different times we’ve researched this, we haven’t been able to find a solution with reactor where we can do this and respect backpressure. As we can make fairly good guesses if an event will make a record complete, we could maybe utilise that “somehow”. But that seems to grow complicated pretty quickly as opposed to just “batch with timeout”.
I’m sure there are better ways to structure the processing and so on. But because we need to use a database as intermediate step to store partial records, as we’d otherwise have to start from way too far back on startup due to how long it can take before a record is complete, we seem to be somewhat stuck in trying to solve this with reactor.
In an attempt to generalise, the key characteristics to me are:
we want to start experimenting with an alternative implementation that keeps the window open despite timeout if there is no current demand from downstream. this would be a configurable feature, so the current behavior can be retained if a better fit.
FYI we just merged #2202 that allows doing “one-by-one” window processing (will be included in Reactor 3.4.0.M2, but you can try the SNAPSHOTs today)
It does not solve WindowTimeout, but at least we can now window + ACK every N seconds without loading unnecessary records into the memory, see this example: https://twitter.com/bsideup/status/1278268312808493062?s=20
@alex-lx I had exactly the same source - Kafka 😃
FYI here is a workaround for our use-case: https://github.com/bsideup/liiklus/blob/2184d62d400a6ab72775395ba3f1a1eca9f64910/examples/java/src/main/java/com/example/Consumer.java#L67-L72
Basically, just do:
This way the window will be closed either after receiving
size
items or by delay thanks to.sample()
, and it will respect the backpressure@JonathanGiles @anuchandy we’re not sure of the design and tradeoffs in #2822 and would like to discuss these and have you folks try it out before it gets merged. would you be able to do that (eg. by using https://jitpack.io or building locally yourselves)?
let’s follow up on that discussion in the PR itself)
@JonathanGiles @anuchandy I have created a draft PR with the first impl of new WindowTimeout which works identically to what was proposed by @anuchandy here -> https://github.com/reactor/reactor-core/issues/1099#issuecomment-907360191.
Feel free to give it a try and let me know whether it works as expected
I think this is another way of doing the same. I haven’t looked into any performance implication, just sharing here:
@bsideup that sounds absolutely fantastic!