quarkus: Exception logged by quarkus after SSE client is closed - "Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException"
Describe the bug
I have a service method which SSE clients can use to subscribe to a stream. It returns a Multi. I have a second method which can be used to emit data. The SSE clients receive the data. Everything works fine, and as expected, except that after a client has disconnected, a warning is logged that it Failed to mark a promise as failure because it has failed already: DefaultChannelPromise failure: java.nio.channels.ClosedChannelException
.
Expected behavior No warning should be logged, as nothing abnormal has actually happened.
Actual behavior A warning is logged, as shown above.
To Reproduce Steps to reproduce the behavior:
- Create a method which clients can used to subscribe to SSEs:
/**
* allows a caller to subscribe to changes of a certain type of object
*/
@GET
@Path("/changes/{subscriberId}")
@Produces(MediaType.SERVER_SENT_EVENTS)
public Multi<AnObject> changes(@PathParam("subscriberId") String subscriberId, @QueryParam("type") List<String> type) {
SubscriberModel subscriberModel = SUBSCRIBERS.computeIfAbsent(subscriberId, k -> new SubscriberModel());
subscriberModel.setId(subscriberId);
return Multi.createFrom()
.emitter(e -> {
subscriberModel.setEmitter(e);
e.onTermination(() -> {
logger.info("Removing subscriber " + subscriberId);
e.complete();
SUBSCRIBERS.remove(subscriberId);
// even though the above works nicely, there is an exception logged by quarkus, afterwards.
// see https://stackoverflow.com/questions/61694510/how-to-handle-a-closedchannelexception-on-a-reactive-streams-http-connection-clo
//
});
}, BackPressureStrategy.ERROR);
}
- Create a method used to emit data to all subscribers:
/**
* test method in order to emit an object to ALL subscribers
*/
@GET
@Path("/emit")
@Produces(MediaType.APPLICATION_JSON)
public Response emit() {
logger.info("emitting to " + SUBSCRIBERS.size() + " subscribers");
SUBSCRIBERS.values().forEach(sm ->
sm.emit(new AnObject(UUID.randomUUID(), "anObject"))
);
return Response.noContent().build();
}
-
subscribe using curl:
curl -v -X GET localhost:8086/objects/changes/aClient
-
hit ctrl+c to close that client. Nothing happens yet in Quarkus, but that is OK, because it is quite standard to use a heart beat to deal with that. This is also required in some SSE servlet implementations.
-
emit an event using curl:
curl -v -X GET localhost:8086/objects/emit
-
the subscriber is correctly removed from the model
-
the unexpected warning is logged
Configuration
# the following line is not really used, as the native libraries are not depended upon in the pom.
quarkus.vertx.prefer-native-transport=true
quarkus.http.port=8086
quarkus.log.file.enable=true
quarkus.log.file.level=INFO
quarkus.log.file.format=%d{HH:mm:ss} %-5p [%c{2.}] (%t) %s%e%n
quarkus.log.category."ch.maxant.kdc.objects".level=DEBUG
Screenshots
2020-05-10 14:26:12,366 INFO [ch.max.kdc.obj.ObjectResource] (vert.x-eventloop-thread-6) Removing subscriber aClient
2020-05-10 14:30:59,200 WARN [io.net.cha.AbstractChannelHandlerContext] (vert.x-eventloop-thread-6) Failed to mark a promise as failure because it has failed already: DefaultChannelPromise@22021ebc(failure: java.nio.channels.ClosedChannelException), unnotified cause: java.nio.channels.ClosedChannelException
at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
: io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.util.internal.ReferenceCountUpdater.toLiveRealRefCnt(ReferenceCountUpdater.java:74)
at io.netty.util.internal.ReferenceCountUpdater.release(ReferenceCountUpdater.java:138)
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100)
at io.netty.handler.codec.http.DefaultHttpContent.release(DefaultHttpContent.java:92)
at io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88)
at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:867)
at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:715)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:762)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:765)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:788)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:756)
at io.vertx.core.net.impl.ConnectionBase.write(ConnectionBase.java:124)
at io.vertx.core.net.impl.ConnectionBase.lambda$queueForWrite$2(ConnectionBase.java:215)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:834)
Environment (please complete the following information):
-
Output of
uname -a
orver
: Linux hades-ubuntu 5.3.0-51-generic #44-Ubuntu SMP Wed Apr 22 21:09:44 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux -
Output of
java -version
: openjdk 11.0.7 2020-04-14 OpenJDK Runtime Environment (build 11.0.7+10-post-Ubuntu-2ubuntu219.10) OpenJDK 64-Bit Server VM (build 11.0.7+10-post-Ubuntu-2ubuntu219.10, mixed mode, sharing) -
GraalVM version (if different from Java):
-
Quarkus version or git rev: 1.4.2.Final
-
Build tool (ie. output of
mvnw --version
orgradlew --version
): Apache Maven 3.6.3 (cecedd343002696d0abb50b32b541b8a6ba2883f) Maven home: /snap/intellij-idea-community/226/plugins/maven/lib/maven3 Java version: 11.0.7, vendor: Ubuntu, runtime: /usr/lib/jvm/java-11-openjdk-amd64 Default locale: en_US, platform encoding: UTF-8 OS name: “linux”, version: “5.3.0-51-generic”, arch: “amd64”, family: “unix”
Additional context (Add any other context about the problem here.)
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 8
- Comments: 26 (15 by maintainers)
Commits related to this issue
- Prevent double free on exception Fixes #9194 — committed to stuartwdouglas/quarkus by stuartwdouglas 3 years ago
- Prevent double free on exception Fixes #9194 — committed to gsmet/quarkus by stuartwdouglas 3 years ago
- Prevent double free on exception Fixes #9194 — committed to gsmet/quarkus by stuartwdouglas 3 years ago
OK I can reproduce. This doesn’t happen on Undertow.
I have the same issue and added quarkus-undertow dependency for workaround
@stuartwdouglas does it ring a bell?