netty: Thread blocked causing high latency and CPU uses
Specification:
We are using Netty version 4.1.91.Final with Epoll on a bare metal server with the below config:
OS: Centos 7.6
Java version: 11 (IBM OpenJ9) [JVM main params: -Xms28g -Xmx28g -Xmn14g -Xgc:concurrentScavenge]
CPU: 48 core
RAM: 64G
<netty.version>4.1.91.Final</netty.version>
<tcnative.version>2.0.59.Final</tcnative.version>
<bouncycastle.version>1.69</bouncycastle.version>
Use case:
Each of these servers receives 10-20k JSON request/sec from 100-500 other external servers on 20-50k established connections in normal times both on HTTP and HTTPS (netty-tcnative-boringssl-static) POST. These external servers also recycle connections in frequent intervals.
Sample code:
private Class<? extends ServerSocketChannel> getBestAvailableServerSocketChannel() {
if (Util.IS_OS_LINUX && Epoll.isAvailable()) {
Logger.INFO_LOG.info("Using EpollServerSocketChannel");
return EpollServerSocketChannel.class;
} else if (Util.IS_OS_MAC) {
Logger.INFO_LOG.info("Using KQueueServerSocketChannel");
return KQueueServerSocketChannel.class;
} else {
Logger.INFO_LOG.info("Using NioServerSocketChannel");
return NioServerSocketChannel.class;
}
}
...
...
// config SSLContext
SslProvider provider = SslProvider.isAlpnSupported(SslProvider.OPENSSL) ? SslProvider.OPENSSL : SslProvider.JDK;
sslCtx = SslContextBuilder.forServer(keyManagerFactory)
.sslProvider(provider)
/* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
* Please refer to the HTTP/2 specification for cipher requirements. */
// .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
.applicationProtocolConfig(new ApplicationProtocolConfig(
Protocol.ALPN,
// NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
SelectorFailureBehavior.NO_ADVERTISE,
// ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
SelectedListenerFailureBehavior.ACCEPT,
// support both HTTP2 and HTTP1.1
// ApplicationProtocolNames.HTTP_2,
ApplicationProtocolNames.HTTP_1_1
))
.sessionCacheSize(SSL_SESSION_CACHE_SIZE) // 1800
.sessionTimeout(SSL_SESSION_CACHE_TIMEOUT_SECOND) // 1024 * 100
.build();
...
...
// configure server
ServerBootstrap httpServer = new ServerBootstrap();
// ioThreadExecutors = CPU core / 2
// workerThreadExecutors = ioThreadExecutors * 8
httpServer = httpServer.group(ioThreadExecutors, workerThreadExecutors);
Class<? extends ServerSocketChannel> serverSocketChannelClass = builder.serverSocketChannelClass;
if (serverSocketChannelClass == null) {
serverSocketChannelClass = getBestAvailableServerSocketChannel();
}
httpServer.channel(serverSocketChannelClass) // Here, we specify to use the NioServerSocketChannel class which is used to instantiate a new Channel to accept incoming connections.
.handler(new LoggingHandler(logLevel))
.childHandler(channelInitializer)
/*
option() is for the NioServerSocketChannel that accepts incoming connections.
childOption() is for the Channels accepted by the parent ServerChannel, which is NioSocketChannel in this case.
https://netty.io/4.1/api/io/netty/channel/ChannelOption.html
*/
.option(ChannelOption.SO_BACKLOG, 2048)
/*
Allow multiple I/O thread to bind to same port. By default, each port will be managed by single dedicated I/O thread
https://stackoverflow.com/questions/67145817/netty-how-many-thread-is-accepting-connections-on-a-server-port-80
*/
.option(UnixChannelOption.SO_REUSEPORT, true)
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// https://netty.io/wiki/tcp-fast-open.html
// .option(ChannelOption.TCP_FASTOPEN, 1000)
// http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#11.0
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_SNDBUF, 10 * 1024 * 1024)
.childOption(ChannelOption.SO_RCVBUF, 10 * 1024 * 1024)
.childOption(ChannelOption.SO_REUSEADDR, true)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true);
if (serverSocketChannelClass == EpollServerSocketChannel.class) {
httpServer.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}
// for each port, bind it multiple times
for (int i = 0; i < channelThreadCount; i++) {
ChannelFuture f = httpServer.bind(port).sync();
channelFutureSet.add(f);
}
// channelInitializer class
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
int port = ch.localAddress().getPort();
// Add SSL handler first to encrypt and decrypt everything.
SslContext sslContext = sslContextMap.get(port);
if (sslContext != null) {
pipeline.addLast(sslContext.newHandler(ch.alloc()));
}
/*
On top of the SSL handler, add other handlers
A combination of HttpRequestDecoder and HttpResponseEncoder
Headers Validation takes time and most of the time it is not needed directly in the decoder/encoder
http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#5.0
*/
pipeline.addLast(new HttpServerCodec(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, DEFAULT_MAX_CHUNK_SIZE, false));
pipeline.addLast(new HttpServerKeepAliveHandler());
// It is useful when you don't want to take care of HTTP messages whose transfer encoding is 'chunked'
// MAX_REQUEST_CONTENT_LENGTH = 1048576; //1 mb
pipeline.addLast(new HttpObjectAggregator(MAX_REQUEST_CONTENT_LENGTH)); // inbound
// Business logic in seprate thead because it may take 10-100ms time and we don't want to block worker thread
// EventExecutorGroup handlerExecutorGroup = new DefaultEventExecutorGroup(core * 8, handlerThreadFactory);
pipeline.addLast(handlerExecutorGroup, "handler", new HttpEndServer(channelHandlerMap.get(port)));
}
///
public class HttpEndServer extends SimpleChannelInboundHandler<Object> implements HttpServer {
...
...
@Override
public void channelRead0(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof HttpRequest) {
HttpRequest request = (HttpRequest) msg;
ByteBuf content = null;
if (msg instanceof HttpContent) {
HttpContent httpContent = (HttpContent) msg;
content = httpContent.content();
}
QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
String path = queryStringDecoder.path();
RequestHandler requestHandler = handler.get(path);
if (requestHandler == null) {
requestHandler = handler.get(DEFAULT_PATH);
}
// call business handler
requestHandler.handle(ctx, this, request, content);
}
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
log.info("channel Writability Changed");
while (outboundQueue.relaxedPeek() != null) {
Object out = outboundQueue.relaxedPoll();
if (out != null) {
ChannelFuture cf = ctx.writeAndFlush(out);
/*
Adds ChannelFuture- Listener to receive notification after write completes
*/
cf.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// Write operation completes without error
log.debug("channelWritabilityChanged: Write successful.");
} else {
// Logs an error
log.error("channelWritabilityChanged: Write error. msg: {}", future.cause().getLocalizedMessage());
}
});
}
}
super.channelWritabilityChanged(ctx);
log.debug("Writeability Changed Event");
}
@Override
public void sendResponse(ChannelHandlerContext ctx, HttpRequest request, FullHttpResponse response) {
// HTTP 2
String streamId = getStreamId(request);
if (streamId != null && !streamId.isEmpty()) {
setStreamId(response, streamId);
}
// Get keep-alive info
boolean keepAlive = HttpUtil.isKeepAlive(request);
// Set keep-alive response header
if (keepAlive) {
// Add 'Content-Length' header only for a keep-alive connection.
response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
// Add keep alive header as per:
// - https://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
} else {
// Tell the client we're going to close the connection.
response.headers().set(CONNECTION, CLOSE);
}
// Write the response.
// no need to call flush for non-ssl HTTP/2 as channelReadComplete(...) will take care of it.
if (streamId != null && !streamId.isEmpty() && !isSSL) {
ctx.write(response);
} else {
/*
check before writeAndFlush if channel is active
otherwise we will get io.netty.channel.StacklessClosedChannelException
*/
if (ctx.channel().isWritable()) {
// Writes the data and flushes it
ChannelFuture cf = ctx.writeAndFlush(response);
/*
Adds ChannelFuture- Listener to receive notification after write completes
*/
cf.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// Write operation completes without error
log.debug("Write successful.");
} else {
// Logs an error
log.error("Write error. msg: {}", future.cause().getLocalizedMessage());
}
});
if (!keepAlive) {
cf.addListener(ChannelFutureListener.CLOSE);
}
} else {
//Channel not even open and therefore it's not writable
log.warn("channel is not writable so adding to queue.");
outboundQueue.add(response);
// ctx.close();
}
}
// if (!keepAlive) {
// // If keep-alive is off, close the connection once the content is fully written.
// ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
// }
}
}
There are many error msg of these 4 types:
- sendResponse$1(186): Write error. msg: SSLEngine closed already
- exceptionCaught(62): An exception is thrown. msg: recvAddress(…) failed: Connection reset by peer
- sendResponse(195): channel is not writable so adding to queue.
- exceptionCaught(62): An exception is thrown. msg: javax.net.ssl.SSLHandshakeException: error:10000070:SSL routines:OPENSSL_internal:BAD_PACKET_LENGTH
I also see latency shoots up very high sometimes (> 200ms) while normally it’s under 50ms. Is there anything wrong in the code?
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 16 (10 by maintainers)
Commits related to this issue
- Make Recycler faster on OpenJ9 Motivation: To avoid reference processing overhead while at the same time ensuring that we clear Recycler data structures that were used by now-terminated threads, we n... — committed to chrisvest/netty by chrisvest a year ago
- Make Recycler faster on OpenJ9 Motivation: To avoid reference processing overhead while at the same time ensuring that we clear Recycler data structures that were used by now-terminated threads, we n... — committed to chrisvest/netty by chrisvest a year ago
- Make Recycler faster on OpenJ9 (#13357) Motivation: To avoid reference processing overhead while at the same time ensuring that we clear Recycler data structures that were used by now-terminated t... — committed to netty/netty by chrisvest a year ago
- Make Recycler faster on OpenJ9 (#13357) Motivation: To avoid reference processing overhead while at the same time ensuring that we clear Recycler data structures that were used by now-terminated t... — committed to netty/netty by chrisvest a year ago
Jeez. We went this route to get away from reference processing overhead.
From the code, it looks like
Thread.isAlivewill fare better thanThread.getStateon J9. Looks pretty cheap in openjdk as well.Thanks for confirming @onlynishant whatever that’s not using J9
First problem: too many errors
First you should get rid of those likely…
While after that… I suggest to profile and check https://github.com/netty/netty/issues/12708 I won’t suggest checking 2 interface types in a row, it will likely cause a scalability issue leading to high cpu usage (in usr space). Try checking again Vs the specific type you expect to receive first, if possible. Said that, profiling is the way to go