rsocket-java: Payload reclaim code assumes synchronous onNext() dispatch which is not guaranteed by RS specification
I have been playing around with the latest code from the 0.5.x branch. I started with the following code from the examples subproject:
Flowable.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
.doOnNext(toConsumer(x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}")))
.blockingFirst()
which prints out the response as expected. However when trying to connect the Publisher that is returned from the requestResponse to Akka Streams with the following code:
Source.fromPublisher(c.underlyingSocket.requestResponse(new PayloadImpl("Hello")))
.map { x => println(s"### client ### ${ByteBufferUtil.toUtf8String(x.getData)}"); x }
.runWith(Sink.head)
I get an exception:
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1407)
at io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1353)
at io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:415)
at io.netty.buffer.PooledSlicedByteBuf.getInt(PooledSlicedByteBuf.java:188)
at io.reactivesocket.transport.tcp.MutableDirectByteBuf.getInt(MutableDirectByteBuf.java:284)
at io.reactivesocket.frame.FrameHeaderFlyweight.frameLength(FrameHeaderFlyweight.java:229)
at io.reactivesocket.frame.FrameHeaderFlyweight.dataLength(FrameHeaderFlyweight.java:251)
at io.reactivesocket.frame.FrameHeaderFlyweight.sliceFrameData(FrameHeaderFlyweight.java:202)
at io.reactivesocket.Frame.getData(Frame.java:98)
at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
at akka.stream.alpakka.reactivesocket.scaladsl.HelloWorld$$anonfun$start$1$$anonfun$2.apply(HelloWorld.scala:113)
at akka.stream.impl.fusing.Map$$anon$8.onPush(Ops.scala:43)
at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:747)
at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:710)
at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:616)
at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:471)
at akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:410)
at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:603)
at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:618)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:529)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
I there a race condition between something a Publisher from reactivesocket expects and my call to getData?
P.S. currently I am publishing locally latest code of the 0.5.x branch. Would it be possible to get a milestone release, so it is easier to share and run these experiments?
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Comments: 17 (10 by maintainers)
Commits related to this issue
- Delete MimeTypes.md (#196) We removed default mimetypes from the spec, so this should probably go as well. — committed to ilayaperumalg/rsocket-java by benjchristensen 7 years ago
Thanks all for the feedback.
Here is my take (plan) on this:
onNext().Does the above approach sound reasonable to everyone here?
\cc @stevegury @robertroeser @tmontgomery
@NiteshKant I agree with @drewhk,
onNextis a signal, and it returning does not imply that any work has been performed, or even that the thing passed intoonNexthaving reached its destination. The RS spec is very clear on this: https://github.com/reactive-streams/reactive-streams-jvm#asynchronous-vs-synchronous-processingI fully understand this.
This is unfortunately breaking the RS contract. If you want to expose such interface then it cannot be a Reactive Streams Publisher but something else. I personally don’t think this is a good idea though. Remember that the whole point of RS is asynchronous boundaries, otherwise there wouldn’t be a need for the
request()signal at all!This is the most flexible, but a rather dangerous option. The danger is if the onNext() call dispatches a signal to a queue, and the consuming party dies because of a bug, or other reasons, then the signals (containing the Payload) are GCd, but that will not take care of cleaning up the resource allocated by Netty.
This is the safest bet when it comes to passing asynchronous boundaries.