amazon-kinesis-client: KCL 2.0 stops consuming from some shards

We are in the process of upgrading several consumers to KCL 2.0. They are attached to a somewhat large stream (thousands of shards) and have been running on KCL 1.x for a long time without issue.

Today we ran into the following exception which caused some shards not to be consumed:

shardId-000000000634: Last request was dispatched at 2018-11-09T21:28:14.619Z, but no response as of 2018-11-09T21:28:50.632Z (PT36.013S).  Cancelling subscription, and restarting.
    at software.amazon.kinesis.lifecycle.ShardConsumer.healthCheck
    at software.amazon.kinesis.lifecycle.ShardConsumer.executeLifecycle
    at software.amazon.kinesis.coordinator.Scheduler.runProcessLoop
    at software.amazon.kinesis.coordinator.Scheduler.run

We’ve seen this exception before right after deploys, but they usually disappear within 30 minutes. Today it started happening during normal operation and it caused some shards not to be consumed at all. The max statistic of the SubscribeToShardEvent.MillisBehindLatest metric just kept increasing.

We are running on the latest commit (f52f2559ed) of the master branch. Any idea what could be happening?

Edit: should probably also mention that we let it run like this for over 2 hours and it never recovered. We’ve had to revert everything back to the old KCL.

About this issue

  • Original URL
  • State: open
  • Created 6 years ago
  • Reactions: 5
  • Comments: 19 (5 by maintainers)

Most upvoted comments

We are using KCL 2.0.5 and seeing a lot of these messages too, for example:

DateTime=2019-01-28 20:54:17,267 Application=myapp Thread=[kinesis-scheduler1] Logger=software.amazon.kinesis.lifecycle.ShardConsumer Type=ERROR Message=shardId-000000000001: Last request was dispatched at 2019-01-28T20:53:42.247Z, but no response as of 2019-01-28T20:54:17.267Z (PT35.02S).  Cancelling subscription, and restarting.

The 35 second value seems to be hard-coded in ShardConsumer#MAX_TIME_BETWEEN_REQUEST_RESPONSE, so we don’t have control over it.

There are also warnings like this, but I don’t know if the two are related:

DateTime=2019-01-28 21:27:40,571 Application=myapp Thread=[ShardRecordProcessor-0078] Logger=software.amazon.kinesis.lifecycle.ShardConsumer Type=WARN Message=shardId-000000000063: onError().  Cancelling subscription, and marking self as failed.
software.amazon.kinesis.retrieval.RetryableRetrievalException: ReadTimeout
	at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.errorOccurred(FanOutRecordsPublisher.java:142)
	at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher.access$700(FanOutRecordsPublisher.java:51)
	at software.amazon.kinesis.retrieval.fanout.FanOutRecordsPublisher$RecordFlow.exceptionOccurred(FanOutRecordsPublisher.java:516)
	at software.amazon.awssdk.services.kinesis.DefaultKinesisAsyncClient.lambda$subscribeToShard$1(DefaultKinesisAsyncClient.java:2102)
	at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
	at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:155)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.lambda$execute$0(AsyncRetryableStage.java:121)
	at java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:822)
	at java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:797)
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage$Completable.lambda$completeExceptionally$1(MakeAsyncHttpRequestStage.java:208)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: software.amazon.awssdk.core.exception.SdkClientException
	at software.amazon.awssdk.core.exception.SdkClientException$BuilderImpl.build(SdkClientException.java:97)
	at software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage$RetryExecutor.handle(AsyncRetryableStage.java:143)
	... 9 more
Caused by: io.netty.handler.timeout.ReadTimeoutException
What to expect from KCL release 2.2.2?

Case 1: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you replaced or 
modified thread/queue configurations of SchedulerThreadPoolExecutor and faced situations 
like data loss or shard getting struck or intermittent pause in data consumpton, please 
continue reading, else go to next case. These issues are potentially due to undelivered 
executor service events.
 	a. 	This release ensures data loss does not happen, by preventing subsequent events 
 		from getting delivered upon an event delivery failure and later restarting the subscription. 
 		This may, however, lead to intermittent pause in data consumption, as KCL checks for 
 		exceptionally completed subscriptions every 35 secponds. Refer section d for mitigation. 
 		This scenario is identified by the log messages,
 			- ERROR s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000035: Received unexpected ack for the active subscription shardId-000000000077-005. Throwing.
 			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
 	b. 	This release prevents the shard stuck situations that happened due to undelivered 
 		control plane messages. [NOTE : Control plane messages are responsible for fetching 
 		further events from KDS service. This includes subscribeToShard() API call and reactive 
 		layer's request(N) calls]
 	c. 	This release, have improved logging that will capture undelivered control plane 
 		messages for troubleshooting. Any undelivered control plane message might still lead 
 		to temporary pause in data consumption. Refer section d for mitigation. This is identified 
 		by the log message,
 			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000035: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
 	d. 	Mitigation for undelivered messages: The undelivered messages are primarily due to 
 		reduced SchedulerThreadPoolExecutor capacity. The customers are expected to assess 
 		the state of the SchedulerThreadPoolExecutor using the following diagnostic log messages 
 		and take appropriate actions like reducing the RecordProcessor.processRecords' Time or 
 		scaling out the application or increase the number of threads in the executor.
 			i. 	ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads and 
 				queueSize are consistently high.
 					- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=100, coreThreads=0, leasesOwned=40, largestPoolSize=2, maximumPoolSize=2147483647)
 			ii.	RejectedTaskEvent ERROR log emitted when SchedulerThreadPoolExecutor fails to 
 				execute any event
 					- ERROR s.a.k.c.DiagnosticEventLogger [NONE] - Review your thread configuration to prevent task rejections. Task rejections will slow down your application and some shards may stop processing. Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=0, coreThreads=0, leasesOwned=0, largestPoolSize=0, maximumPoolSize=2147483647) - io.reactivex.exceptions.UndeliverableException: java.util.concurrent.RejectedExecutionException: Test exception.

Case 2: If you are consuming KCL 2.x version (prior to KCL 2.2.2) and you did NOT modify 
or replace SchedulerThreadPoolExecutor and faced shard stuck situations or frequent 
intermittent pause in data consumption, please continue reading, else go to next case. 
These issues are potentially due to submitting more tasks to SchedulerThreadPoolExecutor, 
than it can handle, leading to delayed execution of submitted tasks.
	a. 	The common symptom of this situation is the following log message,
			- ERROR s.a.k.l.ShardConsumerSubscriber [NONE] - shardId-000000000077: Last request was dispatched at 2019-08-21T01:16:22.340Z, but no response as of 2019-08-21T01:16:57.902Z (PT35.562S).  Cancelling subscription, and restarting.
	b. 	This release has more diagnostic log messages to identify the issues around the 
		congested SchedulerThreadPoolExecutor
			i. 	FanOutRecordsPublisher WARN log indicating high time (over 11 seconds) taken 
				by SchedulerThreadPoolExecutor to deliver an event to ShardConsumer. 
				Ideally delivery time should be less than a second. If this is consistently 
				high, refer section c.
					- WARN  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is high at 14534 millis. Check the ExecutorStateEvent logs to see the state of the executor service. Also check if the RecordProcessor's processing time is high.
			ii. FanOutRecordsPublisher DEBUG log to check the current event delivery time 
				by SchedulerThreadPoolExecutor. Ideally this should be less than a second. 
				If this is consistently high, refer section c.
					- DEBUG  s.a.k.r.f.FanOutRecordsPublisher [NONE] - shardId-000000000077: Record delivery time to shard consumer is at 401 millis
			iii.ExecutorStateEvent INFO log emitted every 30 seconds to check if activeThreads 
				is consistently high. activeThreads is considered high if it is more than 2X 
				the number of worker leases. If this is consistently high, refer section c.
 					- INFO s.a.k.c.DiagnosticEventLogger - Current thread pool executor state: ExecutorStateEvent(executorName=SchedulerThreadPoolExecutor, currentQueueSize=0, activeThreads=450, coreThreads=0, leasesOwned=64, largestPoolSize=520, maximumPoolSize=2147483647)
	c. 	The customers are expected to assess the state of the SchedulerThreadPoolExecutor using 
		the above diagnostic log messages and take appropriate mitigations like reducing the 
		RecordProcessor.processRecords' Time or scaling out the application. 
		

Case 3. All customers of KCL 2.x, prior to 2.2.2 release, were in a blind spot to throttling or 
any other exception from Cloudwatch metrics publish calls. Now these exceptions are made visible 
and we expect customers to take appropriate actions like increasing the Cloudwatch Put API TPS 
to fix the throttling issue or increasing the concurrent connections of the cloudwatch client 
to fix the limited connections issue.
	a. 	Increasing the concurrency of client - CloudWatchAsyncClient.builder().region(region).httpClientBuilder(NettyNioAsyncHttpClient.builder().maxConcurrency(Integer.MAX_VALUE)).build();
	b. 	Cloudwatch limit increase - https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_limits.html

@ShibaBandit How long did the call to processRecords take? The warning you’re getting is telling you that the call to processRecords has taken more than the configured time in this case it looks like 35 seconds. The last time data arrived warning happens to be emitted at the same time. If your processRecords was blocked for 5 to 10 minutes than the KCL will need to wait for your record processor to finish before getting more data from Kinesis. This is the break you see in the metrics, and the read timeout.