aws-sdk-java-v2: Many concurrent async requests cause "NoSuchElementException: HttpStreamClientHandler#0-body-publisher"

I’m creating many short-lived requests concurrently using the *AsyncClient, and this fails with:

java.util.concurrent.CompletionException: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher

	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
	at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
	at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:160)
	at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:143)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.exceptionOccurred(MakeAsyncHttpRequestStage.java:185)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$exceptionCaught$1(ResponseHandler.java:120)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:132)
	at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.exceptionCaught(ResponseHandler.java:119)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
	at com.typesafe.netty.HandlerSubscriber.exceptionCaught(HandlerSubscriber.java:157)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
	at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
	at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
	at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
	at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:850)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:364)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1273)
	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1084)
	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
	at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1080)
	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:434)
	at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
	at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
	at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
	at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
	... 28 more

This can be reproduced by creating a do-nothing lambda function called “test” in your AWS account, and then running this JUnit test a couple of times:

package repro.bug;

import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvocationType;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;

public class CallLambdaLots {

  public static final int CONCURRENCY = 1000;
  LambdaAsyncClient lambda = LambdaAsyncClient.create();

  @Test
  @SuppressWarnings("unchecked")
  public void joinAllAtOnce() {
    CompletableFuture.allOf(
            IntStream.range(1, CONCURRENCY)
                .mapToObj(this::callLambda)
                .toArray(
                    size ->
                        (CompletableFuture<InvokeResponse>[])
                            Array.newInstance(CompletableFuture.class, size)))
        .join();
  }

  @Test
  public void joinOneAtATime() {
    IntStream.range(1, CONCURRENCY)
        .mapToObj(this::callLambda)
        .collect(Collectors.toList())
        .forEach(CompletableFuture::join);
  }

  private CompletableFuture<InvokeResponse> callLambda(int payload) {
    return lambda.invoke(
        InvokeRequest.builder()
            .functionName("test")
            .invocationType(InvocationType.Event)
            .payload(ByteBuffer.wrap(Integer.toString(payload).getBytes()))
            .build());
  }
}

On machines with lots of cores, you have to crank CONCURRENCY up quite a lot to hit the race condition, but from a small lambda function with 1-2 cores I hit this even with 50-100 concurrent requests.

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Comments: 17 (6 by maintainers)

Commits related to this issue

Most upvoted comments

I’m getting this issue on 2.13.33

That issue still happens in version 2.10.41

I am using SDK version 2.9.4 and still have the same problem

I’m still hitting this on 2.3.9 SDK with concurrent uploads to S3 with the Async Client. It happens less when I buffer the requests into smaller batches and block on the batch to complete. Example stack trace:

java.util.concurrent.CompletionException: software.amazon.awssdk.core.exception.SdkClientException
at software.amazon.awssdk.utils.CompletableFutureUtils.errorAsCompletionException(CompletableFutureUtils.java:61)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage.lambda$execute$0(AsyncExecutionFailureExceptionReportingStage.java:50)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryErrorIfNeeded(AsyncRetryableStage.java:167)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:119)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:104)
at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.onError(MakeAsyncHttpRequestStage.java:236)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.handleFailure(NettyRequestExecutor.java:228)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.lambda$writeRequest$6(NettyRequestExecutor.java:182)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.addListener(DefaultPromise.java:162)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:95)
at io.netty.channel.DefaultChannelPromise.addListener(DefaultChannelPromise.java:30)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.writeRequest(NettyRequestExecutor.java:167)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequest(NettyRequestExecutor.java:157)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.makeRequestListener(NettyRequestExecutor.java:124)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:504)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:483)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:94)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool$AcquireListener.operationComplete(BetterFixedChannelPool.java:324)
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:511)
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:485)
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:424)
at io.netty.util.concurrent.DefaultPromise.setSuccess(DefaultPromise.java:94)
at io.netty.channel.pool.SimpleChannelPool.notifyHealthCheck(SimpleChannelPool.java:245)
at io.netty.channel.pool.SimpleChannelPool.doHealthCheck(SimpleChannelPool.java:226)
at io.netty.channel.pool.SimpleChannelPool.acquireHealthyFromPoolOrNew(SimpleChannelPool.java:194)
at io.netty.channel.pool.SimpleChannelPool.acquire(SimpleChannelPool.java:164)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool.acquire0(BetterFixedChannelPool.java:168)
at software.amazon.awssdk.http.nio.netty.internal.utils.BetterFixedChannelPool.acquire(BetterFixedChannelPool.java:137)
at software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool.acquire0(HttpOrHttp2ChannelPool.java:76)
at software.amazon.awssdk.http.nio.netty.internal.http2.HttpOrHttp2ChannelPool.lambda$acquire$0(HttpOrHttp2ChannelPool.java:70)
at io.netty.util.concurrent.PromiseTask$RunnableAdapter.call(PromiseTask.java:38)
at io.netty.util.concurrent.PromiseTask.run(PromiseTask.java:73)
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:474)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:909)
at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException: null
at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
at software.amazon.awssdk.core.internal.util.ThrowableUtils.asSdkException(ThrowableUtils.java:98)
at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.retryIfNeeded(AsyncRetryableStage.java:118)
... 42 common frames omitted
Caused by: java.lang.IllegalArgumentException: Duplicate handler name: HttpStreamsClientHandler#0-body-subscriber
at io.netty.channel.DefaultChannelPipeline.checkDuplicateName(DefaultChannelPipeline.java:1101)
at io.netty.channel.DefaultChannelPipeline.filterName(DefaultChannelPipeline.java:302)
at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:319)
at io.netty.channel.DefaultChannelPipeline.addAfter(DefaultChannelPipeline.java:308)
at com.typesafe.netty.http.HttpStreamsHandler.unbufferedWrite(HttpStreamsHandler.java:287)
at com.typesafe.netty.http.HttpStreamsHandler.flushNext(HttpStreamsHandler.java:334)
at com.typesafe.netty.http.HttpStreamsHandler.write(HttpStreamsHandler.java:227)
at com.typesafe.netty.http.HttpStreamsClientHandler.write(HttpStreamsClientHandler.java:30)
at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:738)
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:801)
at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)
at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:837)
at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1071)
at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:304)
at software.amazon.awssdk.http.nio.netty.internal.NettyRequestExecutor.writeRequest(NettyRequestExecutor.java:166)
... 27 common frames omitted

Example code using Reactor library:

        Flux.fromIterable(siteIndices.entrySet())
                .buffer(MAX_CONCURRENT_REQ)
                .doOnError(t -> logger.error(t.getMessage(), t))
                .subscribe(siteIndicesBatch -> {

                    // Block on a batch to complete up to a timeout
                    final int batchSize = siteIndicesBatch.size();
                    final CountDownLatch latch = new CountDownLatch(batchSize);

                    Flux.fromIterable(siteIndicesBatch)
                            .doOnError(t -> logger.error(t.getMessage(), t))
                            .map(e -> {

                                final RadarSite radarSite = e.getKey();
                                final Set<String> fileSet = e.getValue();

                                // Filter old or invalid feed file names
                                fileSet.removeIf(f -> !feedIndices.isValid(f, minAllowedTime));

                                final String indexData = String.join("\n", fileSet);

                                final AsyncRequestBody reqBody = AsyncRequestBody.fromString(indexData, Charsets.UTF_8);
                                final String s3Key = feedPaths.indexFilePath(radarSite);

                                final PutObjectRequest putObjReq = PutObjectRequest.builder()
                                        .bucket(bucket)
                                        .key(s3Key)
                                        .contentEncoding("UTF-8")
                                        .contentType("text/plain")
                                        .build();

                                logger.debug("Adding index file: bucket [{}], path [{}]", bucket, s3Key);

                                return s3AsyncClient.putObject(putObjReq, reqBody)
                                        .handleAsync((resp, t) -> {
                                            latch.countDown();
                                            if(t != null) {
                                                logger.error(t.getMessage(), t);
                                            }
                                            return resp;
                                        });

                            })
                            .filter(Objects::nonNull)
                            .subscribe();

                    try {
                        if(!latch.await(30L, TimeUnit.SECONDS)) {
                            logger.warn("Timeout waiting batch of size {} to complete", batchSize);
                        }
                    } catch (InterruptedException ignored) {}


                });

I seem to be running into this with S3 pretty consistently. I am using v7 of the JDK. Code below is in scala.

""software.amazon.awssdk" % "s3" % "2.0.0-preview-7","

Here is how i am using S3

s3AsyncClient.putObject(putObjReq, new MyPublisher(config.objSize * 1024)) my producer is below. This works for a single occurrence, but fails when multiple requests are queued rapidly.


// Used to generate an infinite stream if required
class MyPublisher(streamSz: Long) extends AsyncRequestProvider {
    
  def contentLength: Long = streamSz
  def subscribe(s: Subscriber[_ >: ByteBuffer]): Unit = {
    println("New subscription")
    s.onSubscribe(new MySubscription(s, streamSz)) 
  }

  object MySubscription {
    val SIZE: Int = 1024;
    val initBuf: Array[Byte] = (for (i <- 0 to (SIZE - 1)) yield 'b'.toByte).toArray

    val s3Buffer: ByteBuffer = ByteBuffer.allocateDirect(SIZE)
      .put(initBuf, 0, SIZE)
      .asReadOnlyBuffer()
  }

  class MySubscription(s: Subscriber[_ >: ByteBuffer], sz: Long) extends Subscription {
    var currPos: Long = 0
    val localBuf = ByteBuffer.allocate(sz.toInt) 
      //MySubscription.s3Buffer.duplicate()
    var complete : AtomicBoolean = new AtomicBoolean(false)

    def cancel = {}
    def request(b: Long) = if (currPos != sz) {
      
      val c = localBuf.capacity()
      val r = if ((sz - currPos) < c) (sz - currPos).toInt else c
      
      //println("limit: %d, currPos: %d, sz: %d, b:%d".format(r,currPos,sz,b))

      localBuf.position(0)
      localBuf.limit(r)
      currPos += r
      
      s.onNext(localBuf)

      //println("Updated currPos:%d, b:%d".format(currPos,b))
      
      if (b > 1) request(b - 1) 
    }  else  {
        if(complete.getAndSet(true) == false) {
          s.onComplete()
          //println("s is complete")
        }
    }


  }
}

This is my stack trace

[info] 15:58:29.041 ERROR s.a.a.h.n.n.internal.ResponseHandler Exception processing request: software.amazon.awssdk.http.DefaultSdkHttpFullRequest@5fb65596
[info] java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
[info] 	at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1089)
[info] 	at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:443)
[info] 	at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
[info] 	at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
[info] 	at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
[info] 	at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] 	at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
[info] 	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
[info] 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
[info] 	at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] 	at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1379)
[info] 	at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1158)
[info] 	at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1193)
[info] 	at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
[info] 	at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
[info] 	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] 	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
[info] 	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
[info] 	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
[info] 	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
[info] 	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
[info] 	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
[info] 	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
[info] 	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
[info] 	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
[info] 	at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
[info] 	at java.lang.Thread.run(Thread.java:745)

Quick update:

So far after more testing and debugging, the problems seems to be unique to Lambda’s invoke call with InvocationType.EVENT. It doesn’t seem to occur with other event types on invoke. Also, I’m unable to reproduce the the error using another service like DynamoDB in V2, and I’m not seeing any errors in V1 for Lambda.

The cause of the error appears to be that for some requests, after the client has finished flushing the request to Lambda, we immediately see a LastHttpContent message being read from the HTTP codec; no previous Response message. This causes HttpStreamsClientHandler to attempt to remove a non-existent -body-publisher handler.

One thing I’ve found with the async client is that replacing the HttpClientCodec with a new one each time a channel is leased from the pool fixes issue, so maybe there is some edge case causing the codec to enter a bad state after a response from the server is received and before a new one is written.