amazon-kinesis-client: KCL version 1.6.3 doesn't shutdown worker cleanly

I saw in 1.6.2 release notes that the ability to cleanly shutdown a worker was implemented.

I’ve put together a reproduction that will demonstrate that this still throws an exception by the worker when attempting to perform the final checkpoint.

See the README for instructions on running the repro scenario: https://github.com/matthewbogner/kinesis-stress-example

During shutdown of the consumer, you’ll see the following exception which shouldn’t be occurring:

2016-06-22 17:03:47,979 ERROR [pool-2-thread-1] [net.ibogner.kinesis.RecordProcessor:41] Failed to checkpoint
com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:172) ~[amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) ~[amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:77) ~[amazon-kinesis-client-1.6.3.jar:na]
    at net.ibogner.kinesis.RecordProcessor.checkpoint(RecordProcessor.java:39) [classes/:na]
    at net.ibogner.kinesis.RecordProcessor.processRecords(RecordProcessor.java:25) [classes/:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask.call(ProcessTask.java:169) [amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:49) [amazon-kinesis-client-1.6.3.jar:na]
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator.call(MetricsCollectingTaskDecorator.java:24) [amazon-kinesis-client-1.6.3.jar:na]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_71]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_71]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_71]

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Reactions: 14
  • Comments: 16 (8 by maintainers)

Commits related to this issue

Most upvoted comments

@akumariiit @stsatlantis We have the same issue. Any update on this? I don’t know which process is blocking this shutdown procedure. @pfifer

The shutdown of the worker effectively triggers that instance of the worker to lose all it’s leases. Lease loss causes the event loop to call shutdown, instead of process records, on the next iteration. The event loop in worker will only dispatch a single task for a shard at a time. So if a processRecords call is being handled when shutdown is called, nothing will happen until the processRecords call completes. At the same time the lease manager marks the lease as no longer held. So if the record processor calls checkpoint it will receive an exception. This is the body of the race condition, and why your test program shows it reasonably well. The test program calls checkpoint on every processRecords calls. The only way the test program wouldn’t cause an exception was if the call to Worker.shutdown() occurs before the next call to ShardConsumer.getNextTask.

I have some ideas on how to improve shutdown, but I still need to think through them some more.

There are some approaches that would allow you to the shutdown behavior you expect. Most of them would require that processRecords determine when it’s time to shutdown, and stop processing records.