netty: Thread blocked causing high latency and CPU uses

Specification:

We are using Netty version 4.1.91.Final with Epoll on a bare metal server with the below config:

OS: Centos 7.6
Java version: 11 (IBM OpenJ9) [JVM main params: -Xms28g -Xmx28g -Xmn14g -Xgc:concurrentScavenge]
CPU: 48 core
RAM: 64G

<netty.version>4.1.91.Final</netty.version>
<tcnative.version>2.0.59.Final</tcnative.version>
<bouncycastle.version>1.69</bouncycastle.version>

Use case:

Each of these servers receives 10-20k JSON request/sec from 100-500 other external servers on 20-50k established connections in normal times both on HTTP and HTTPS (netty-tcnative-boringssl-static) POST. These external servers also recycle connections in frequent intervals.

Sample code:

private Class<? extends ServerSocketChannel> getBestAvailableServerSocketChannel() {
    if (Util.IS_OS_LINUX && Epoll.isAvailable()) {
      Logger.INFO_LOG.info("Using EpollServerSocketChannel");
      return EpollServerSocketChannel.class;
    } else if (Util.IS_OS_MAC) {
      Logger.INFO_LOG.info("Using KQueueServerSocketChannel");
      return KQueueServerSocketChannel.class;
    } else {
      Logger.INFO_LOG.info("Using NioServerSocketChannel");
      return NioServerSocketChannel.class;
    }
  }
...
...
// config SSLContext
SslProvider provider = SslProvider.isAlpnSupported(SslProvider.OPENSSL) ? SslProvider.OPENSSL : SslProvider.JDK;
sslCtx = SslContextBuilder.forServer(keyManagerFactory)
    .sslProvider(provider)
    /* NOTE: the cipher filter may not include all ciphers required by the HTTP/2 specification.
     * Please refer to the HTTP/2 specification for cipher requirements. */
//        .ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
    .applicationProtocolConfig(new ApplicationProtocolConfig(
        Protocol.ALPN,
        // NO_ADVERTISE is currently the only mode supported by both OpenSsl and JDK providers.
        SelectorFailureBehavior.NO_ADVERTISE,
        // ACCEPT is currently the only mode supported by both OpenSsl and JDK providers.
        SelectedListenerFailureBehavior.ACCEPT,
        // support both HTTP2 and HTTP1.1
//            ApplicationProtocolNames.HTTP_2,
        ApplicationProtocolNames.HTTP_1_1
    ))
    .sessionCacheSize(SSL_SESSION_CACHE_SIZE) // 1800
    .sessionTimeout(SSL_SESSION_CACHE_TIMEOUT_SECOND) // 1024 * 100
    .build();

...
...
// configure server
ServerBootstrap httpServer = new ServerBootstrap();

// ioThreadExecutors = CPU core / 2
// workerThreadExecutors = ioThreadExecutors * 8
httpServer = httpServer.group(ioThreadExecutors, workerThreadExecutors);

Class<? extends ServerSocketChannel> serverSocketChannelClass = builder.serverSocketChannelClass;
if (serverSocketChannelClass == null) {
  serverSocketChannelClass = getBestAvailableServerSocketChannel();
}

httpServer.channel(serverSocketChannelClass) // Here, we specify to use the NioServerSocketChannel class which is used to instantiate a new Channel to accept incoming connections.
        .handler(new LoggingHandler(logLevel))
        .childHandler(channelInitializer)
        /*
         option() is for the NioServerSocketChannel that accepts incoming connections.
         childOption() is for the Channels accepted by the parent ServerChannel, which is NioSocketChannel in this case.
         https://netty.io/4.1/api/io/netty/channel/ChannelOption.html
         */
        .option(ChannelOption.SO_BACKLOG, 2048)
        /*
        Allow multiple I/O thread to bind to same port. By default, each port will be managed by single dedicated I/O thread
        https://stackoverflow.com/questions/67145817/netty-how-many-thread-is-accepting-connections-on-a-server-port-80
         */
        .option(UnixChannelOption.SO_REUSEPORT, true)
        .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        // https://netty.io/wiki/tcp-fast-open.html
//        .option(ChannelOption.TCP_FASTOPEN, 1000)
        // http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#11.0
        .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
        .childOption(ChannelOption.SO_SNDBUF, 10 * 1024 * 1024)
        .childOption(ChannelOption.SO_RCVBUF, 10 * 1024 * 1024)
        .childOption(ChannelOption.SO_REUSEADDR, true)
        .childOption(ChannelOption.SO_KEEPALIVE, true)
        .childOption(ChannelOption.TCP_NODELAY, true);

if (serverSocketChannelClass == EpollServerSocketChannel.class) {
  httpServer.option(EpollChannelOption.EPOLL_MODE, EpollMode.EDGE_TRIGGERED);
}


// for each port, bind it multiple times
for (int i = 0; i < channelThreadCount; i++) {
  ChannelFuture f = httpServer.bind(port).sync();
  channelFutureSet.add(f);
}


// channelInitializer class

@Override
  public void initChannel(SocketChannel ch) {
    ChannelPipeline pipeline = ch.pipeline();
    int port = ch.localAddress().getPort();

    // Add SSL handler first to encrypt and decrypt everything.
    SslContext sslContext = sslContextMap.get(port);
    if (sslContext != null) {
      pipeline.addLast(sslContext.newHandler(ch.alloc()));
    }

    /*
    On top of the SSL handler, add  other handlers
    A combination of HttpRequestDecoder and HttpResponseEncoder

    Headers Validation takes time and most of the time it is not needed directly in the decoder/encoder
    http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#5.0
     */
    pipeline.addLast(new HttpServerCodec(DEFAULT_MAX_INITIAL_LINE_LENGTH, DEFAULT_MAX_HEADER_SIZE, DEFAULT_MAX_CHUNK_SIZE, false));

    pipeline.addLast(new HttpServerKeepAliveHandler());

    // It is useful when you don't want to take care of HTTP messages whose transfer encoding is 'chunked'
    // MAX_REQUEST_CONTENT_LENGTH = 1048576; //1 mb
    pipeline.addLast(new HttpObjectAggregator(MAX_REQUEST_CONTENT_LENGTH)); // inbound


    // Business logic in seprate thead because it may take 10-100ms time and we don't want to block worker thread
    // EventExecutorGroup handlerExecutorGroup = new DefaultEventExecutorGroup(core * 8, handlerThreadFactory);
    pipeline.addLast(handlerExecutorGroup, "handler", new HttpEndServer(channelHandlerMap.get(port)));

  }



///
public class HttpEndServer extends SimpleChannelInboundHandler<Object> implements HttpServer {


  ...
  ...

@Override
  public void channelRead0(ChannelHandlerContext ctx, Object msg) {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;
      ByteBuf content = null;
      if (msg instanceof HttpContent) {
        HttpContent httpContent = (HttpContent) msg;
        content = httpContent.content();
      }

      QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.uri());
      String path = queryStringDecoder.path();

      RequestHandler requestHandler = handler.get(path);
      if (requestHandler == null) {
        requestHandler = handler.get(DEFAULT_PATH);
      }
      // call business handler
      requestHandler.handle(ctx, this, request, content);
    }
  }

  @Override
  public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
    log.info("channel Writability Changed");
    while (outboundQueue.relaxedPeek() != null) {
      Object out = outboundQueue.relaxedPoll();
      if (out != null) {
        ChannelFuture cf = ctx.writeAndFlush(out);
      /*
      Adds ChannelFuture- Listener to receive notification after write completes
       */
        cf.addListener((ChannelFutureListener) future -> {
          if (future.isSuccess()) {
            // Write operation completes without error
            log.debug("channelWritabilityChanged: Write successful.");
          } else {
            // Logs an error
            log.error("channelWritabilityChanged: Write error. msg: {}", future.cause().getLocalizedMessage());
          }
        });
      }
    }

    super.channelWritabilityChanged(ctx);
    log.debug("Writeability Changed Event");
  }

  @Override
  public void sendResponse(ChannelHandlerContext ctx, HttpRequest request, FullHttpResponse response) {

    // HTTP 2
    String streamId = getStreamId(request);

    if (streamId != null && !streamId.isEmpty()) {
      setStreamId(response, streamId);
    }

    // Get keep-alive info
    boolean keepAlive = HttpUtil.isKeepAlive(request);

    // Set keep-alive response header
    if (keepAlive) {
      // Add 'Content-Length' header only for a keep-alive connection.
      response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
      // Add keep alive header as per:
      // - https://www.w3.org/Protocols/HTTP/1.1/draft-ietf-http-v11-spec-01.html#Connection
      response.headers().set(CONNECTION, HttpHeaderValues.KEEP_ALIVE);
    } else {
      // Tell the client we're going to close the connection.
      response.headers().set(CONNECTION, CLOSE);
    }

    // Write the response.
    // no need to call flush for non-ssl HTTP/2 as channelReadComplete(...) will take care of it.
    if (streamId != null && !streamId.isEmpty() && !isSSL) {
      ctx.write(response);
    } else {
      /*
       check before writeAndFlush if channel is active
       otherwise we will get io.netty.channel.StacklessClosedChannelException
       */
      if (ctx.channel().isWritable()) {
        // Writes the data and flushes it
        ChannelFuture cf = ctx.writeAndFlush(response);
      /*
      Adds ChannelFuture- Listener to receive notification after write completes
       */
        cf.addListener((ChannelFutureListener) future -> {
          if (future.isSuccess()) {
            // Write operation completes without error
            log.debug("Write successful.");
          } else {
            // Logs an error
            log.error("Write error. msg: {}", future.cause().getLocalizedMessage());
          }
        });

        if (!keepAlive) {
          cf.addListener(ChannelFutureListener.CLOSE);
        }
      } else {
        //Channel not even open and therefore it's not writable
        log.warn("channel is not writable so adding to queue.");
        outboundQueue.add(response);
//        ctx.close();
      }
    }

//    if (!keepAlive) {
//      // If keep-alive is off, close the connection once the content is fully written.
//      ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
//    }
  }
}

There are many error msg of these 4 types:

  • sendResponse$1(186): Write error. msg: SSLEngine closed already
  • exceptionCaught(62): An exception is thrown. msg: recvAddress(…) failed: Connection reset by peer
  • sendResponse(195): channel is not writable so adding to queue.
  • exceptionCaught(62): An exception is thrown. msg: javax.net.ssl.SSLHandshakeException: error:10000070:SSL routines:OPENSSL_internal:BAD_PACKET_LENGTH

I also see latency shoots up very high sometimes (> 200ms) while normally it’s under 50ms. Is there anything wrong in the code?

Screenshot 2023-04-20 at 4 40 08 PM

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 16 (10 by maintainers)

Commits related to this issue

Most upvoted comments

it seems J9 Thread.getStateImpl got some weird scalability problem

Jeez. We went this route to get away from reference processing overhead.

From the code, it looks like Thread.isAlive will fare better than Thread.getState on J9. Looks pretty cheap in openjdk as well.

Thanks for confirming @onlynishant whatever that’s not using J9

First problem: too many errors

There are many error msg of these 4 types…

First you should get rid of those likely…

While after that… I suggest to profile and check https://github.com/netty/netty/issues/12708 I won’t suggest checking 2 interface types in a row, it will likely cause a scalability issue leading to high cpu usage (in usr space). Try checking again Vs the specific type you expect to receive first, if possible. Said that, profiling is the way to go