amazon-kinesis-client: Checkpointing seems to fail after other worker has taken lease
Hi,
from time to time (every couple of days) we encounter the following situation (I’ve shortened all shard ids/sequence numbers to improve readability):
worker-aprocessesshard-01andshardId-02.worker-bis started and takes lease forshardId-02.worker-atries to checkpoint sequence number00001ofshardId-02, but fails with the following reason:
com.amazonaws.services.kinesis.leases.impl.LeaseRenewer: Worker worker-a lost lease with key shardId-02 - discovered during update
worker-bsays:
com.amazonaws.services.kinesis.leases.impl.LeaseTaker: Worker worker-b successfully took 1 leases: shardId-02
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher: Initializing shard shardId-02 with 00001
The way I see it is, that worker-a seems to fail to checkpoint, but is successful instead.
The fact, that worker-b then initializes itself with the same sequence number, tells me, that checkpointing succeeded anyways. Otherwise, worker-b would not know anything about that particular sequence number.
Could this be a bug in the KPL?
We are using version 1.6.3 of the KPL.
About this issue
- Original URL
- State: open
- Created 8 years ago
- Reactions: 2
- Comments: 26 (6 by maintainers)
I’ve been looking through the code, and I have a suspicion that there may be a race condition. There might be an update race between the lease renewer, and the checkpoint process if they overlap in just the right way.
I’ll need to do some more investigation before I can say for sure.
We’ve switched on debug for the LeaseManager as you have requested. This is the outcome:
Now, checkpointing fails with the following exception:
Then we see the following logs:
Now, we shut down the worker/record processor and start it again. Then we see the following message:
@pfifer Currently, in our case, every consumer instance/pod processes more than one shard e.g 3. When we encounter this error
instance doesn't hold the lease for this shardthe current behavior is to catch the error and do aSystem.exit(1). This causes the consumer instance/pod to restart which in turn has a side effect of causing lags/interruptions on other shards which this consumer instance/pod was processing [i.e. other RecordProcessor objects]. Is that the right approach or should we just ignore the exception as some other consumer instance/pod has already started processing this shard?By the way, I noticed that this could also happen if the checkpointing table is underprovisioned for writes. In that case, the workers will start failing to checkpoint (because they get throttled), and other workers will start stealing the lease.
this defect leads to streaming blocks spark with kinesis connector not being freed up, we id’ exact block id to this exception, full stack trace below, also filled jira with spark on the topic:
https://issues.apache.org/jira/browse/SPARK-19364
any immediate workarounds appreciated
SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException: Caught shutdown exception, skipping checkpoint. com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can’t update checkpoint - instance doesn’t hold the lease for this shard at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75) at scala.Option.foreach(Option.scala:257) at org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75) at org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117) at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94) at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106) at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)
Keeping it alive, Running into similar issues lately ‘amazon-kinesis-client’, version: ‘2.2.7’
We experienced similar issue with cpu maxed out on t2.medium instance. We setup our cluster to c4.xlarge, which helped resolving this issue.