rsocket-java: Payload reclaim code assumes synchronous onNext() dispatch which is not guaranteed by RS specification

I have been playing around with the latest code from the 0.5.x branch. I started with the following code from the examples subproject:

Flowable.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
  .doOnNext(toConsumer(x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}")))
  .blockingFirst()

which prints out the response as expected. However when trying to connect the Publisher that is returned from the requestResponse to Akka Streams with the following code:

Source.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
  .map { x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}"); x }
  .runWith(Sink.head)

I get an exception:

io.netty.util.IllegalReferenceCountException: refCnt: 0
        at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
        at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
        at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:415)
        at io.netty.buffer.PooledSlicedByteBuf.getInt(PooledSlicedByteBuf.java:188)
        at io.reactivesocket.transport.tcp.MutableDirectByteBuf.getInt(MutableDirectByteBuf.java:284)
        at io.reactivesocket.frame.FrameHeaderFlyweight.frameLength(FrameHeaderFlyweight.java:229)
        at io.reactivesocket.frame.FrameHeaderFlyweight.dataLength(FrameHeaderFlyweight.java:251)
        at io.reactivesocket.frame.FrameHeaderFlyweight.sliceFrameData(FrameHeaderFlyweight.java:202)
        at io.reactivesocket.Frame.getData(Frame.java:98)
        at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
        at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
        at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
        at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
        at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
        at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
        at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
        at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:410)
        at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
        at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
        at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
        at akka.actor.ActorCell.invoke(ActorCell.scala:495)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
        at akka.dispatch.Mailbox.run(Mailbox.scala:224)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I there a race condition between something a Publisher from reactivesocket expects and my call to getData?

P.S. currently I am publishing locally latest code of the 0.5.x branch. Would it be possible to get a milestone release, so it is easier to share and run these experiments?

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 17 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Thanks all for the feedback.

Here is my take (plan) on this:

  • Modify the current codebase to do an explicit copy before onNext().
  • At a later point in time, think about how to eliminate copying if there is a usecase to do that.
  • Stick to copying being the default mode. Not copying would always be an opt-in.

Does the above approach sound reasonable to everyone here?

\cc @stevegury @robertroeser @tmontgomery

@NiteshKant I agree with @drewhk, onNext is a signal, and it returning does not imply that any work has been performed, or even that the thing passed into onNext having reached its destination. The RS spec is very clear on this: https://github.com/reactive-streams/reactive-streams-jvm#asynchronous-vs-synchronous-processing

ReactiveSocket uses pooled frames and TCP transport uses Netty which uses reference counted buffers. Since there isn’t an indication of when a consumer is done with a buffer/payload/frame, there isn’t a convenient point at which we can release the pooled objects back to the pool.

I fully understand this.

Auto-Release after callback completes.

This is unfortunately breaking the RS contract. If you want to expose such interface then it cannot be a Reactive Streams Publisher but something else. I personally don’t think this is a good idea though. Remember that the whole point of RS is asynchronous boundaries, otherwise there wouldn’t be a need for the request() signal at all!

Provide an explicit way to dispose a pooled object.

This is the most flexible, but a rather dangerous option. The danger is if the onNext() call dispatches a signal to a queue, and the consuming party dies because of a bug, or other reasons, then the signals (containing the Payload) are GCd, but that will not take care of cleaning up the resource allocated by Netty.

Copy the buffer before handing off to the eventual consumer.

This is the safest bet when it comes to passing asynchronous boundaries.