reactor-core: Take doesn't limit requests

It seems like take isn’t implemented as a limit on the request, rather an instruction to cancel. This isn’t great when the publisher involves a network call and the request n tells the server to publish as much as it wants.

        var f = Flux.range(0, 10).map {
            if (it > 4) {
                throw RuntimeException("$it")
            } else {
                it
            }
        }.log().take(5);
        println(f.blockLast());

Output:

INFO: | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | request(unbounded)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(0)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(1)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(2)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(3)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | onNext(4)
Sep 23, 2017 9:19:37 AM reactor.util.Loggers$Slf4JLogger info
INFO: | cancel()
4

Expected:

INFO: | request(5)

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 48 (46 by maintainers)

Commits related to this issue

Most upvoted comments

+1 to @robertroeser’s comments. It seems pretty fundamental to me, surprising in a bad way and I suspect shows a strong bias to synchronous processing over use cases involving remote operations with higher cost.

In the case of rsocket-java, each request(N) isn’t done as N * request. The request N is sent to the server as a single message so the server may start a lot of work. In our case the server floods the network with results (potentially 100k), so it’s possible the client receives a lot of items before the cancel takes effect on the server.

I can workaround with limitRate, but I hope you reconsider this default across other operations.

BTW I noticed similar things with things like Zip operators, they tend to do more work than needed, and assume results are cheap and side effect free.

@thekalinga When a generic library is a pioneer in a new field, platform and style of programming, it has to set defaults on how its components should work upfront. We experts and contributors spent a lot of time thinking about, designing and implementing operators the way they are so that they work in not so obvious corner cases as well. Libraries evolve then on and incorporate reasonable user feedback. Unfortunately, some user feedback assume their use case is the predominant and perhaps the only and true way of performing things with the library. In addition, coming up with names for operations and customizations like magic spell words doesn’t make them just happen, make them work or even make sense a lot of times.

Customizing the control flow in Reactive Streams is difficult and for take, one has to capture the right time with the request call to enact the proper backpressure pattern while ensuring correct state tacking and not losing legitimate request amounts. In this light, giving a “custom strategy” is practically equivalent for the user to implement a custom operator from scratch - there is no nice callback API that could capture the complications and possibilities for such operators.

It currently doesn’t (stops requesting), but that could be a good solution, because lowTide == 0 doesn’t make much sense anyway. And it doesn’t violate the principle of least surprise when one compares limitRate(100) and limitRate(100, 100), probably 🤔

Just to be clear, I’m considering this only for the highTide, lowTide variant, so:

  • limitRate(100) would do one request(100), then request(75)
  • limiRate(100, 50) would do one request(100), then request(50)
  • limitRate(100, 100) would repeatedly request(100)

I do wonder whether this is at minimum a bug in FluxTake.java

	@Override
	public int getPrefetch() {
		return Integer.MAX_VALUE;
	}

Everything here is solvable if you don’t trust the underlying library and move everything up to the the application layer. But this behaviour is surprising and IMHO very biased.

So in two usecases I currently have the answer is mixed

  1. Given a generic CLI - no it can’t easily make assumptions about the request parameters. So ideally I would use the request amount. https://github.com/rsocket/rsocket-cli
  2. For a different specific product usecase, I actually want an infinite stream and am using request(n) for backpressure. limitRate() is exactly a perfect fit here.

But it’s an extremely leaky abstraction now, as reactor-core seems built around the assumption that work is cheap enough and side effect free (no customer request billing etc), and that as an optimisation, any operator I call can prefetch.