quarkus: Exception logged by quarkus after SSE client is closed - "Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException"

Describe the bug I have a service method which SSE clients can use to subscribe to a stream. It returns a Multi. I have a second method which can be used to emit data. The SSE clients receive the data. Everything works fine, and as expected, except that after a client has disconnected, a warning is logged that it Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException.

Expected behavior No warning should be logged, as nothing abnormal has actually happened.

Actual behavior A warning is logged, as shown above.

To Reproduce Steps to reproduce the behavior:

  1. Create a method which clients can used to subscribe to SSEs:
    /**
     * allows a caller to subscribe to changes of a certain type of object
     */
    @GET
    @Path("/changes/{subscriberId}")
    @Produces(MediaType.SERVER_SENT_EVENTS)
    public Multi<AnObject> changes(@PathParam("subscriberId") String subscriberId, @QueryParam("type") List<String> type) {
        SubscriberModel subscriberModel = SUBSCRIBERS.computeIfAbsent(subscriberId, k -> new SubscriberModel());
        subscriberModel.setId(subscriberId);
        return Multi.createFrom()
                .emitter(e -> {
                    subscriberModel.setEmitter(e);
                    e.onTermination(() -> {
                        logger.info("Removing subscriber " + subscriberId);
                        e.complete();
                        SUBSCRIBERS.remove(subscriberId);

                        // even though the above works nicely, there is an exception logged by quarkus, afterwards.
                        // see https://stackoverflow.com/questions/61694510/how-to-handle-a-closedchannelexception-on-a-reactive-streams-http-connection-clo
                        //
                    });
                }, BackPressureStrategy.ERROR);
    }
  1. Create a method used to emit data to all subscribers:
    /**
     * test method in order to emit an object to ALL subscribers
     */
    @GET
    @Path("/emit")
    @Produces(MediaType.APPLICATION_JSON)
    public Response emit() {
        logger.info("emitting to " + SUBSCRIBERS.size() + " subscribers");
        SUBSCRIBERS.values().forEach(sm ->
                sm.emit(new AnObject(UUID.randomUUID(), "anObject"))
        );
        return Response.noContent().build();
    }
  1. subscribe using curl: curl -v -X GET localhost:8086/objects/changes/aClient

  2. hit ctrl+c to close that client. Nothing happens yet in Quarkus, but that is OK, because it is quite standard to use a heart beat to deal with that. This is also required in some SSE servlet implementations.

  3. emit an event using curl: curl -v -X GET localhost:8086/objects/emit

  4. the subscriber is correctly removed from the model

  5. the unexpected warning is logged

Configuration

# the following line is not really used, as the native libraries are not depended upon in the pom.
quarkus.vertx.prefer-native-transport=true
quarkus.http.port=8086

quarkus.log.file.enable=true
quarkus.log.file.level=INFO
quarkus.log.file.format=%d{HH:mm:ss} %-5p [%c{2.}] (%t) %s%e%n
quarkus.log.category."ch.maxant.kdc.objects".level=DEBUG

Screenshots

2020-05-10 14:26:12,366 INFO  [ch.max.kdc.obj.ObjectResource] (vert.x-eventloop-thread-6) Removing subscriber aClient
2020-05-10 14:30:59,200 WARN  [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@22021ebc(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
	at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
	at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
	at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
	at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
	at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
	at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
	at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
	at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
	at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
	at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:834)

Environment (please complete the following information):

  • Output of uname -a or ver: Linux hades-ubuntu 5.3.0-51-generic #44-Ubuntu SMP Wed Apr 22 21:09:44 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux

  • Output of java -version: openjdk 11.0.7 2020-04-14 OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu219.10) OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu219.10, mixed mode, sharing)

  • GraalVM version (if different from Java):

  • Quarkus version or git rev: 1.4.2.Final

  • Build tool (ie. output of mvnw --version or gradlew --version): Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /snap/intellij-idea-community/226/plugins/maven/lib/maven3 Java version: 11.0.7, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: en_US, platform encoding: UTF-8 OS name: “linux”, version: “5.3.0-51-generic”, arch: “amd64”, family: “unix”

Additional context (Add any other context about the problem here.)

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 8
  • Comments: 26 (15 by maintainers)

Commits related to this issue

Most upvoted comments

OK I can reproduce. This doesn’t happen on Undertow.

I have the same issue and added quarkus-undertow dependency for workaround

@stuartwdouglas does it ring a bell?