rsocket-java: CancellationException: Inbound has been canceled on channel-based communication

Hi,

After upgrading from rsocket-java:1.0.2 to 1.1.0 I am experiencing cancel-signals not being propagated, but instead being captured by the default onErrorDropped.

Here is a stack-trace:

Operator called default onErrorDropped
java.util.concurrent.CancellationException: Inbound has been canceled
	at io.rsocket.core.RequestChannelResponderSubscriber.tryTerminate(RequestChannelResponderSubscriber.java:357)
	at io.rsocket.core.RequestChannelResponderSubscriber.handleCancel(RequestChannelResponderSubscriber.java:345)
	at io.rsocket.core.RSocketResponder.handleFrame(RSocketResponder.java:217)
	at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
	at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
	at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129)
	at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48)
	at io.rsocket.resume.ResumableDuplexConnection$FrameReceivingSubscriber.onNext(ResumableDuplexConnection.java:323)
	at io.rsocket.resume.ResumableDuplexConnection$FrameReceivingSubscriber.onNext(ResumableDuplexConnection.java:287)
	at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:118)
	at io.rsocket.core.SetupHandlingDuplexConnection.onNext(SetupHandlingDuplexConnection.java:19)
	at com.XXXXX.eventstream.MdcContextLifter.onNext(MdcContextLifter.java:27)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at com.XXXXX.eventstream.MdcContextLifter.onNext(MdcContextLifter.java:27)
	at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
	at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:256)
	at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362)
	at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:383)
	at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

I see that there was a commit affecting the RequestChannelResponderSubscriber, but I am not sure if it relates to this issue.

Steps to Reproduce

Cancel a channel-based communication and eventually you’ll experience this problem. (Heads up, the error does not reproduce all the time)

Your Environment

  • RSocket version(s) used: 1.1.0
  • Other relevant libraries versions (eg. netty, …): reactor-core: 3.4.0
  • Platform (eg. JVM version (javar -version) or Node version (node --version)): OpenJDK 64-Bit Server VM AdoptOpenJDK (build 11.0.6+10, mixed mode
  • OS and version (eg uname -a): Darwin XXXXXX 19.6.0 Darwin Kernel Version 19.6.0: Mon Aug 31 22:12:52 PDT 2020; root:xnu-6153.141.2~1/RELEASE_X86_64 x86_64

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (6 by maintainers)

Most upvoted comments

This will be improved in 1.1.1

the milestone due date has now passed by for 9 days. What is the current ETA for that release?

@derTobsch if we talk about spring, you should do the following

@ConnectionMapping
void handleConnection(RSocketRequester requester) {
	requester.rsocket().onClose().doFinally(terminalSignal -> {
	 // write the logic to handling client disconnection
	})
	.subscribe();
}

Thanks! I already did that, but the think is that I need more information and not only the terminalSignal. So with your input and a fresh coffee I thought this is not what I wanted and so I listen to the onbeforeunload in js and send the correct message.

Hi @OlegDokuka, as you pointed out the issue was unrelated, I was able to migrate to 1.1.0 without problem, so closing the ticket.

Thanks