RxJava: Take Doesn't Reduce Large RequestN

When a child subscriber submits a large requestN value, such as Long.MAX_VALUE, the ‘take’ operator does not reduce it as expected.

For example, in the following where a default subscribe happens and requests Long.MAX_VALUE up, it is expected that the take(10) would adjust the requested size to the max value that take will permit through.

someFlowable.take(10).subscribe(s)

Here is a unit test:

    @Test
    public void testDoesntRequestMoreThanNeededFromUpstream2() throws InterruptedException {
        final AtomicLong requests = new AtomicLong();
        TestSubscriber<Long> ts = new TestSubscriber<Long>();
        Flowable.interval(100, TimeUnit.MILLISECONDS)
            .doOnRequest(new LongConsumer() {
                @Override
                public void accept(long n) {
                    System.out.println(n);
                    requests.addAndGet(n);
            }})
            .take(2)
            .subscribe(ts);
        
        ts.awaitTerminalEvent();
        ts.assertComplete();
        ts.assertNoErrors();
        assertEquals(2, requests.get());
    }

This errors with:

java.lang.AssertionError: expected:<2> but was:<9223372036854775807>
	at org.junit.Assert.fail(Assert.java:88)
	at org.junit.Assert.failNotEquals(Assert.java:834)
	at org.junit.Assert.assertEquals(Assert.java:645)
	at org.junit.Assert.assertEquals(Assert.java:631)
	at io.reactivex.internal.operators.flowable.FlowableTakeTest.testDoesntRequestMoreThanNeededFromUpstream2(FlowableTakeTest.java:419)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:497)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:86)
	at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:459)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:678)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:382)
	at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:192)

Is there a reason that take in RxJava 2 does not behave this way and reduce the requestN value to the limit?

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 23 (15 by maintainers)

Most upvoted comments

Yep, I’m not a fan of that behaviour and I’m sure I’ll encounter it tying together service calls as our organisation moves more logic behind services. There’s a broader consideration here I suppose which is how much should users of the RxJava api know or expect of the request behaviour of operators. We don’t really have a guiding principle in place that helps us here. For me I’d like to see operators only request what they need and where for performance considerations it makes sense to request in batches (across async boundaries for instance) then it should be configurable. flatMap and observeOn are configurable in this regard.

I’ve had a quick look through the list of operators and I’ve identified the operators that I’d prefer only requested what they need. To have constrained requesting behaviour on these operators as well as the ability to configure flatMap, observeOn and call rebatchRequests is enough control over request patterns for my needs.

Note that all these below request Long.MAX_VALUE:

elementAt
firstXXX
take

The operators above are the only operators I saw that have a definite upper bound that is less than Long.MAX_VALUE on the number of items required to complete.