rsocket-java: Server dies on keep-alive ack timeout

The rsocket server dies on keep-alive ack timeout. I’ve tried adding onErrorResume, but to no avail. How can I prevent the server from closing its socket on error?

Error

[2019-05-20 11:12:21.438] ERROR [parallel-1] RegistryRSocketServer: Error occurred during session
io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)
at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)
at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)

io.rsocket.exceptions.ConnectionErrorException: No keep-alive acks for 60000 ms
at io.rsocket.keepalive.KeepAliveConnection.lambda$startKeepAlives$1(KeepAliveConnection.java:97)at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:137)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)20 May 2019  at reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)
at io.rsocket.keepalive.KeepAliveHandler.doCheckTimeout(KeepAliveHandler.java:112)
at io.rsocket.keepalive.KeepAliveHandler$Server.onIntervalTick(KeepAliveHandler.java:128)
at io.rsocket.keepalive.KeepAliveHandler.lambda$start$0(KeepAliveHandler.java:63)
at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:130)
at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:123)
at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

Version: 0.12.2-RC2

Server Code

server = RSocketFactory
        .receive()
        .frameDecoder(ZERO_COPY)
        .addConnectionPlugin(micrometerDuplexConnectionInterceptor)
        .errorConsumer(e -> log.error("Error occurred during session", e))
        .acceptor(socketAcceptor)
        .transport(serverTransport)
        .start()
        .onErrorResume(e -> Mono.empty())
        .subscribe();

Acceptor
@Override
  public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
    return Mono.just(new RegistryRSocket(scheduler));
  }

requestStream
return Mono.just(payload)
        .map(this::getRequestFromPayload)
        .flux()
       .map(/*..Does Something..*/)
        .onErrorResume(throwable ->
            Flux.just(createPayloadFromThrowable(throwable)));

private Payload createPayloadFromThrowable(Throwable t) {
    return ByteBufPayload.create(ErrorFrameFlyweight.encode(DEFAULT, 0, t));
  }

Any help would be greatly appreciated

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 27 (12 by maintainers)

Most upvoted comments

@mostroverkhov It seems to be much, much more resilient now. I actually managed to run out of ENIs in amazon, so haven’t been able to do all the testing I’ve wanted so far. I’ll report back on the issue tomorrow and let you know if it resolves it or not, it seems like it does. Thank you for your help.