amazon-kinesis-client: KCL version 1.6.3 doesn't shutdown worker cleanly
I saw in 1.6.2 release notes that the ability to cleanly shutdown a worker was implemented.
I’ve put together a reproduction that will demonstrate that this still throws an exception by the worker when attempting to perform the final checkpoint.
See the README for instructions on running the repro scenario: https://github.com/matthewbogner/kinesis-stress-example
During shutdown of the consumer, you’ll see the following exception which shouldn’t be occurring:
2016-06-22 17:03:47,979 ERROR [pool-2-thread-1] [net.ibogner.kinesis.RecordProcessor:41] Failed to checkpoint
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:172) ~[amazon-kinesis-client-1.6.3.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) ~[amazon-kinesis-client-1.6.3.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:77) ~[amazon-kinesis-client-1.6.3.jar:na]
at net.ibogner.kinesis.RecordProcessor.checkpoint(RecordProcessor.java:39) [classes/:na]
at net.ibogner.kinesis.RecordProcessor.processRecords(RecordProcessor.java:25) [classes/:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:169) [amazon-kinesis-client-1.6.3.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [amazon-kinesis-client-1.6.3.jar:na]
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [amazon-kinesis-client-1.6.3.jar:na]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Reactions: 14
- Comments: 16 (8 by maintainers)
Commits related to this issue
- Release 1.7.1 of the Amazon Kinesis Client Library * General * Allow disabling shard synchronization at startup. * Applications can disable shard synchronization at startup. Disabling shard sy... — committed to pfifer/amazon-kinesis-client by pfifer 8 years ago
- Release 1.7.1 of the Amazon Kinesis Client Library (#116) * General * Allow disabling shard synchronization at startup. * Applications can disable shard synchronization at startup. Disabling... — committed to awslabs/amazon-kinesis-client by pfifer 8 years ago
- Gracefully shutdown kinesis record processor to avoid "Can't update checkpoint - instance doesn't hold the lease for this shard" errors. Spark's KinesisRecordProcessor to implement Kinesis IShutdownNo... — committed to jari-kujansuu/spark by jari-kujansuu 4 years ago
@akumariiit @stsatlantis We have the same issue. Any update on this? I don’t know which process is blocking this shutdown procedure. @pfifer
The shutdown of the worker effectively triggers that instance of the worker to lose all it’s leases. Lease loss causes the event loop to call shutdown, instead of process records, on the next iteration. The event loop in worker will only dispatch a single task for a shard at a time. So if a processRecords call is being handled when shutdown is called, nothing will happen until the processRecords call completes. At the same time the lease manager marks the lease as no longer held. So if the record processor calls checkpoint it will receive an exception. This is the body of the race condition, and why your test program shows it reasonably well. The test program calls checkpoint on every processRecords calls. The only way the test program wouldn’t cause an exception was if the call to
Worker.shutdown()occurs before the next call to ShardConsumer.getNextTask.I have some ideas on how to improve shutdown, but I still need to think through them some more.
There are some approaches that would allow you to the shutdown behavior you expect. Most of them would require that processRecords determine when it’s time to shutdown, and stop processing records.