aws-sdk-java-v2: Many concurrent async requests cause "NoSuchElementException: HttpStreamClientHandler#0-body-publisher"
I’m creating many short-lived requests concurrently using the *AsyncClient
, and this fails with:
java.util.concurrent.CompletionException: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:593)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:160)
at software.amazon.awssdk.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:143)
at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at software.amazon.awssdk.http.pipeline.stages.MakeAsyncHttpRequestStage$ResponseHandler.exceptionOccurred(MakeAsyncHttpRequestStage.java:185)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.lambda$exceptionCaught$1(ResponseHandler.java:120)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.runAndLogError(ResponseHandler.java:132)
at software.amazon.awssdk.http.nio.netty.internal.ResponseHandler.exceptionCaught(ResponseHandler.java:119)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at com.typesafe.netty.HandlerSubscriber.exceptionCaught(HandlerSubscriber.java:157)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:264)
at io.netty.channel.AbstractChannelHandlerContext.fireExceptionCaught(AbstractChannelHandlerContext.java:256)
at io.netty.channel.ChannelInboundHandlerAdapter.exceptionCaught(ChannelInboundHandlerAdapter.java:131)
at io.netty.channel.AbstractChannelHandlerContext.invokeExceptionCaught(AbstractChannelHandlerContext.java:285)
at io.netty.channel.AbstractChannelHandlerContext.notifyHandlerException(AbstractChannelHandlerContext.java:850)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:364)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1273)
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1084)
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:489)
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:428)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:644)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:579)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:496)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:458)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.NoSuchElementException: HttpStreamsClientHandler#0-body-publisher
at io.netty.channel.DefaultChannelPipeline.getContextOrDie(DefaultChannelPipeline.java:1080)
at io.netty.channel.DefaultChannelPipeline.remove(DefaultChannelPipeline.java:434)
at com.typesafe.netty.http.HttpStreamsHandler.removeHandlerIfActive(HttpStreamsHandler.java:328)
at com.typesafe.netty.http.HttpStreamsHandler.handleReadHttpContent(HttpStreamsHandler.java:189)
at com.typesafe.netty.http.HttpStreamsHandler.channelRead(HttpStreamsHandler.java:165)
at com.typesafe.netty.http.HttpStreamsClientHandler.channelRead(HttpStreamsClientHandler.java:148)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
... 28 more
This can be reproduced by creating a do-nothing lambda function called “test” in your AWS account, and then running this JUnit test a couple of times:
package repro.bug;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Test;
import software.amazon.awssdk.services.lambda.LambdaAsyncClient;
import software.amazon.awssdk.services.lambda.model.InvocationType;
import software.amazon.awssdk.services.lambda.model.InvokeRequest;
import software.amazon.awssdk.services.lambda.model.InvokeResponse;
public class CallLambdaLots {
public static final int CONCURRENCY = 1000;
LambdaAsyncClient lambda = LambdaAsyncClient.create();
@Test
@SuppressWarnings("unchecked")
public void joinAllAtOnce() {
CompletableFuture.allOf(
IntStream.range(1, CONCURRENCY)
.mapToObj(this::callLambda)
.toArray(
size ->
(CompletableFuture<InvokeResponse>[])
Array.newInstance(CompletableFuture.class, size)))
.join();
}
@Test
public void joinOneAtATime() {
IntStream.range(1, CONCURRENCY)
.mapToObj(this::callLambda)
.collect(Collectors.toList())
.forEach(CompletableFuture::join);
}
private CompletableFuture<InvokeResponse> callLambda(int payload) {
return lambda.invoke(
InvokeRequest.builder()
.functionName("test")
.invocationType(InvocationType.Event)
.payload(ByteBuffer.wrap(Integer.toString(payload).getBytes()))
.build());
}
}
On machines with lots of cores, you have to crank CONCURRENCY
up quite a lot to hit the race condition, but from a small lambda function with 1-2 cores I hit this even with 50-100 concurrent requests.
About this issue
- Original URL
- State: closed
- Created 7 years ago
- Comments: 17 (6 by maintainers)
Commits related to this issue
- Fix race codition between read and finalizeRequest HttpStreamsClientHandler ignores the LastHttpContent decoded by the HTTP codec when we get an empty response from the service, but if the channel ge... — committed to dagnir/aws-sdk-java-v2 by dagnir 6 years ago
- Fix race codition between read and finalizeRequest HttpStreamsClientHandler ignores the LastHttpContent decoded by the HTTP codec when we get an empty response from the service, but if the channel ge... — committed to aws/aws-sdk-java-v2 by dagnir 6 years ago
- Fix race codition between read and finalizeRequest HttpStreamsClientHandler ignores the LastHttpContent decoded by the HTTP codec when we get an empty response from the service, but if the channel ge... — committed to aws/aws-sdk-java-v2 by dagnir 6 years ago
I’m getting this issue on 2.13.33
That issue still happens in version 2.10.41
I am using SDK version 2.9.4 and still have the same problem
I’m still hitting this on 2.3.9 SDK with concurrent uploads to S3 with the Async Client. It happens less when I buffer the requests into smaller batches and block on the batch to complete. Example stack trace:
Example code using Reactor library:
I seem to be running into this with S3 pretty consistently. I am using v7 of the JDK. Code below is in scala.
""software.amazon.awssdk" % "s3" % "2.0.0-preview-7","
Here is how i am using S3
s3AsyncClient.putObject(putObjReq, new MyPublisher(config.objSize * 1024))
my producer is below. This works for a single occurrence, but fails when multiple requests are queued rapidly.This is my stack trace
Quick update:
So far after more testing and debugging, the problems seems to be unique to Lambda’s
invoke
call withInvocationType.EVENT
. It doesn’t seem to occur with other event types oninvoke
. Also, I’m unable to reproduce the the error using another service likeDynamoDB
in V2, and I’m not seeing any errors in V1 for Lambda.The cause of the error appears to be that for some requests, after the client has finished flushing the request to Lambda, we immediately see a
LastHttpContent
message being read from the HTTP codec; no previousResponse
message. This causesHttpStreamsClientHandler
to attempt to remove a non-existent-body-publisher
handler.One thing I’ve found with the async client is that replacing the
HttpClientCodec
with a new one each time a channel is leased from the pool fixes issue, so maybe there is some edge case causing the codec to enter a bad state after a response from the server is received and before a new one is written.