reactor-core: WindowTimeout backpressure

Expected behavior

windowTimeout respects the backpressure

Actual behavior

Not verified if expected or not: The receiver is overrun by more signals than expected (bounded queue...)

Steps to reproduce

Flux
        .generate(sink -> {
            System.out.println("generate");
            sink.next(UUID.randomUUID().toString());
        })
        .log("source", Level.INFO, SignalType.REQUEST)
        .limitRate(5)
        .windowTimeout(10, Duration.ofSeconds(2))
        .concatMap(batch -> batch.delayElements(Duration.ofMillis(200)))
        .blockLast()

Reactor Core version

3.1.4.RELEASE

JVM version (e.g. java -version)

Any

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 10
  • Comments: 31 (10 by maintainers)

Commits related to this issue

Most upvoted comments

The workaround is for the use case and not for the reported issue. The issue remains and hurts really hard.

@simonbasle @OlegDokuka I would love to hear your thoughts on any path forward for this issue. Thanks!

Hey, @JonathanGiles.

Bear with us, I’m working on new implementation for windowTimeout, and will drop a PR soon. Stay tuned.

In general, that is going to be a completelly new operator written from scratch. The implementation will not send new window until there is a demand from downstream. + in combination with prefetch strategy we should be able to have reflection of that missed demand to upstream, so the upstream should stop sending message eventualy. This is a compromise solution since new window is technically collecting elements, though the timer for next timeout is ignored and may be deffered till the next request from the downstream.

I will also workout an alternative strategy which is deferring window Close until there is no demand for the next window. In that case the timeout constrains will also be broken and window will remain open until the next request from the downstream

@bsideup I’ll try to descibe a, for us, common processing flow:

  1. Read events from Kafka
  2. Save a batch of events to a database, that’ll eventually form a “complete” record
  3. Commit offsets
  4. “Complete” records are sent downstream for further processing

We have been using reactor-kafka (altough it has proven somewhat unstable with respect to error handling). Generally, we split the flow into separate rails at 1 by grouping on partition. For each rail, we batch the incoming events, up to some limit, before do a batch-insert to our database. This enables us to handle both real-time peek hours as well reprocess a topic, when needed, in a timely manner.

For latency reasons (as our load various throughout the day) we also want to limit the batching with respect to time. But the different times we’ve researched this, we haven’t been able to find a solution with reactor where we can do this and respect backpressure. As we can make fairly good guesses if an event will make a record complete, we could maybe utilise that “somehow”. But that seems to grow complicated pretty quickly as opposed to just “batch with timeout”.

I’m sure there are better ways to structure the processing and so on. But because we need to use a database as intermediate step to store partial records, as we’d otherwise have to start from way too far back on startup due to how long it can take before a record is complete, we seem to be somewhat stuck in trying to solve this with reactor.

In an attempt to generalise, the key characteristics to me are:

  • batch-op towards an external service
  • latency requirements
  • respect backpressure

we want to start experimenting with an alternative implementation that keeps the window open despite timeout if there is no current demand from downstream. this would be a configurable feature, so the current behavior can be retained if a better fit.

FYI we just merged #2202 that allows doing “one-by-one” window processing (will be included in Reactor 3.4.0.M2, but you can try the SNAPSHOTs today)

It does not solve WindowTimeout, but at least we can now window + ACK every N seconds without loading unnecessary records into the memory, see this example: https://twitter.com/bsideup/status/1278268312808493062?s=20

@alex-lx I had exactly the same source - Kafka 😃

FYI here is a workaround for our use-case: https://github.com/bsideup/liiklus/blob/2184d62d400a6ab72775395ba3f1a1eca9f64910/examples/java/src/main/java/com/example/Consumer.java#L67-L72

Basically, just do:

.window(size)
.concatMap(
    batch -> batch
        .delayUntil(process)
        .sample(windowDuration)
        .delayUntil(closeSubWindow),
    1
)

This way the window will be closed either after receiving size items or by delay thanks to .sample(), and it will respect the backpressure

@JonathanGiles @anuchandy we’re not sure of the design and tradeoffs in #2822 and would like to discuss these and have you folks try it out before it gets merged. would you be able to do that (eg. by using https://jitpack.io or building locally yourselves)?

let’s follow up on that discussion in the PR itself)

@JonathanGiles @anuchandy I have created a draft PR with the first impl of new WindowTimeout which works identically to what was proposed by @anuchandy here -> https://github.com/reactor/reactor-core/issues/1099#issuecomment-907360191.

Feel free to give it a try and let me know whether it works as expected

I think this is another way of doing the same. I haven’t looked into any performance implication, just sharing here:

/**
 * Split given {@code source} {@link Flux} sequence into multiple {@link Flux} windows containing
 * {@code maxBatchSize} elements (or less for the final window) and starting from the first item.
 *
 * @param source The source stream to split into {@link Flux} windows.
 * @param maxBatchSize the maximum number of items to emit in the window before closing it
 * @param maxWaitTime the maximum {@link Duration} since the window was opened before closing it
 * @param timer a time-capable {@link Scheduler} instance to run on
 *
 * @return a {@link Flux} of {@link Flux} windows based on element count and duration
 */
public Flux<List<String>> windowWithTimeout(Flux<String> source,
                                         int maxBatchSize,
                                         Duration maxWaitTime,
                                         Scheduler timer) {

    return Flux.defer(() -> {
        final int[] currentBatchSize = new int[1];
        final String cutMagicString = "$CUT$";

        final Scheduler cuttingTimer = timer == null ?
                Schedulers.newSingle("cutting-timer") :
                timer;

        final Flux<String> sourceCutter = Flux.interval(maxWaitTime, cuttingTimer)
                .map(x -> cutMagicString)
                .onBackpressureLatest();

        return sourceCutter.mergeWith(source)
                .bufferUntil(e -> {
                    if (e.equalsIgnoreCase(cutMagicString)) {
                        currentBatchSize[0] = 0;
                        return true;
                    } else {
                        currentBatchSize[0]++;
                        if (currentBatchSize[0] >= maxBatchSize) {
                            currentBatchSize[0] = 0;
                            return true;
                        } else {
                            return false;
                        }
                    }
                });
    });
}

@bsideup that sounds absolutely fantastic!