amazon-kinesis-client: Checkpointing seems to fail after other worker has taken lease

Hi,

from time to time (every couple of days) we encounter the following situation (I’ve shortened all shard ids/sequence numbers to improve readability):

  • worker-a processes shard-01 and shardId-02.
  • worker-b is started and takes lease for shardId-02.
  • worker-a tries to checkpoint sequence number 00001 of shardId-02, but fails with the following reason:
com.amazonaws.services.kinesis.leases.impl.LeaseRenewer: Worker worker-a lost lease with key shardId-02 - discovered during update
  • worker-b says:
com.amazonaws.services.kinesis.leases.impl.LeaseTaker: Worker worker-b successfully took 1 leases: shardId-02
com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher: Initializing shard shardId-02 with 00001

The way I see it is, that worker-a seems to fail to checkpoint, but is successful instead. The fact, that worker-b then initializes itself with the same sequence number, tells me, that checkpointing succeeded anyways. Otherwise, worker-b would not know anything about that particular sequence number.

Could this be a bug in the KPL?

We are using version 1.6.3 of the KPL.

About this issue

  • Original URL
  • State: open
  • Created 8 years ago
  • Reactions: 2
  • Comments: 26 (6 by maintainers)

Most upvoted comments

I’ve been looking through the code, and I have a suspicion that there may be a race condition. There might be an update race between the lease renewer, and the checkpoint process if they overlap in just the right way.

I’ll need to do some more investigation before I can say for sure.

We’ve switched on debug for the LeaseManager as you have requested. This is the outcome:

DEBUG [2016-10-24 16:37:51,874] [7943c6c0a6617715] com.amazonaws.services.kinesis.leases.impl.LeaseManager: Updating lease {"checkpoint":{"subSequenceNumber":0,"sequenceNumber":"49566801942532034758569254971987374089724020857906397218"},"lastCounterIncrementNanos":20582979558973,"leaseCounter":846494,"ownerSwitchesSinceCheckpoint":0,"leaseKey":"shardId-000000000002","parentShardIds":[],"leaseOwner":"10.61.205.222","concurrencyToken":"825d226d-c1ed-4ffb-99d9-2807b164be0d"}
DEBUG [2016-10-24 16:38:03,124] [39cd72f2dea6953f] com.amazonaws.services.kinesis.leases.impl.LeaseManager: Renewing lease with key shardId-000000000002
DEBUG [2016-10-24 16:38:03,124] [7943c6c0a6617715] com.amazonaws.services.kinesis.leases.impl.LeaseManager: Lease update failed for lease with key shardId-000000000002 because the lease counter was not 846494
INFO [2016-10-24 16:38:03,124] [7943c6c0a6617715] com.amazonaws.services.kinesis.leases.impl.LeaseRenewer: Worker 10.61.205.222 lost lease with key shardId-000000000002 - discovered during update

Now, checkpointing fails with the following exception:

WARN [2016-10-24 16:38:03,127] [7943c6c0a6617715]  com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can't update checkpoint - instance doesn't hold the lease for this shard

Then we see the following logs:

DEBUG [2016-10-24 16:38:03,129] [39cd72f2dea6953f] com.amazonaws.services.kinesis.leases.impl.LeaseManager: Lease renewal failed for lease with key shardId-000000000002 because the lease counter was not 846494
INFO [2016-10-24 16:38:03,129] [39cd72f2dea6953f] com.amazonaws.services.kinesis.leases.impl.LeaseRenewer: Worker 10.61.205.222 lost lease with key shardId-000000000002

Now, we shut down the worker/record processor and start it again. Then we see the following message:

INFO [2016-10-24 16:39:25,180] [39cd72f2dea6953f] com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisDataFetcher: Initializing shard shardId-000000000002 with 49566801942532034758569254971987374089724020857906397218

@pfifer Currently, in our case, every consumer instance/pod processes more than one shard e.g 3. When we encounter this error instance doesn't hold the lease for this shard the current behavior is to catch the error and do a System.exit(1). This causes the consumer instance/pod to restart which in turn has a side effect of causing lags/interruptions on other shards which this consumer instance/pod was processing [i.e. other RecordProcessor objects]. Is that the right approach or should we just ignore the exception as some other consumer instance/pod has already started processing this shard?

By the way, I noticed that this could also happen if the checkpointing table is underprovisioned for writes. In that case, the workers will start failing to checkpoint (because they get throttled), and other workers will start stealing the lease.

this defect leads to streaming blocks spark with kinesis connector not being freed up, we id’ exact block id to this exception, full stack trace below, also filled jira with spark on the topic:

https://issues.apache.org/jira/browse/SPARK-19364

any immediate workarounds appreciated

SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/mnt/yarn/usercache/hadoop/filecache/24/__spark_libs__7928020266533182031.zip/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/usr/lib/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 17/01/26 13:52:00 ERROR KinesisRecordProcessor: ShutdownException: Caught shutdown exception, skipping checkpoint. com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException: Can’t update checkpoint - instance doesn’t hold the lease for this shard at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.setCheckpoint(KinesisClientLibLeaseCoordinator.java:120) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.advancePosition(RecordProcessorCheckpointer.java:216) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:137) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.RecordProcessorCheckpointer.checkpoint(RecordProcessorCheckpointer.java:103) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply$mcV$sp(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1$$anonfun$apply$1.apply(KinesisCheckpointer.scala:81) at scala.util.Try$.apply(Try.scala:192) at org.apache.spark.streaming.kinesis.KinesisRecordProcessor$.retryRandom(KinesisRecordProcessor.scala:144) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:81) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$checkpoint$1.apply(KinesisCheckpointer.scala:75) at scala.Option.foreach(Option.scala:257) at org.apache.spark.streaming.kinesis.KinesisCheckpointer.checkpoint(KinesisCheckpointer.scala:75) at org.apache.spark.streaming.kinesis.KinesisCheckpointer.org$apache$spark$streaming$kinesis$KinesisCheckpointer$$checkpointAll(KinesisCheckpointer.scala:103) at org.apache.spark.streaming.kinesis.KinesisCheckpointer$$anonfun$1.apply$mcVJ$sp(KinesisCheckpointer.scala:117) at org.apache.spark.streaming.util.RecurringTimer.triggerActionForNextInterval(RecurringTimer.scala:94) at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:106) at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:29)

Keeping it alive, Running into similar issues lately ‘amazon-kinesis-client’, version: ‘2.2.7’

2020-06-17 20:50:59 INFO  s.a.k.l.d.DynamoDBLeaseRenewer:317 - 7011360996539966560 7710753708534499685 - Worker 1d830f82-9319-4125-b286-72dad83d120d lost lease with key shardId-000000000071 - discovered during update                                      │
│ 2020-06-17 20:50:59 ERROR c.l.l.s.RecordProcessor:168 - 7011360996539966560 2247803056832779720 - Caught throwable while processing records. Aborting: Can't update checkpoint - instance doesn't hold the lease for this shard                               │
│

We experienced similar issue with cpu maxed out on t2.medium instance. We setup our cluster to c4.xlarge, which helped resolving this issue.