kinesis-aggregation: KinesisAggregator and KCL don't play nice together
Hi,
After a lot of struggling with making the Java version of the KinesisAggregator play nice with the KCL, I have finally narrowed down the issue.
Issue
On one side, I have an app that uses the KinesisAggregator to write aggregated Stream Records on a Kinesis Stream. I use PutRecord as the persist API method. This is working fine.
The records produced with the help of the KinesisAggregator end up on the stream. You can try this and you will notice no exceptions and all the CloudWatch metrics will show what you would expect.
On the other side of the stream, I have a Spark app that under the hood uses the KCL to ingest data from Kinesis. The cool thing about the KCL is that it should handle aggregated Stream Records transparently to the user, as if there is no aggregation. When running the said app however, I can see in CloudWatch that data is being read (from all the shards), but in Spark there is no data coming in …aparently.
Cause
After going deep into the KCL code, I found where the Kinesis User Records go to die:
if (effectiveHashKey.compareTo(startingHashKey) < 0
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
result.remove(result.size() - 1);
}
break;
}
What this essentially does/says is that for each of the User Records that were already extracted from a Stream Record there is a conditional check to verify that their attached Explicit Hash Key corresponds to the Explcit Hash Key of the Stream Record from where they were extracted. In other words, User Records need look as if they are coming from the shard from where they are actually coming from (as being part of a Stream Record). When this check fails for one User Record, all of the User Records from that Stream Record are being dropped. I guess this is a good thing, since otherwise I would have probably missed the issue 😀.
Now, the root cause of the problem is that the KinesisAggregator makes no check/enforcement or otherwise documents the fact that when one produces an aggregated Stream Record that goes to a specific shard (as indicated by its corresponding Explicit Hash Key), all of the User Records it contains must also have assigned Explicit Hash Keys that correspond to the Stream Record’s shard.
The example provided in the SampleAggregatorProducer.java file goes something like this:
private static void sendViaBatch(AmazonKinesis producer, String streamName, RecordAggregator aggregator) {
System.out.println("Creating " + ProducerConfig.RECORDS_TO_TRANSMIT + " records...");
for (int i = 1; i <= ProducerConfig.RECORDS_TO_TRANSMIT; i++) {
String pk = ProducerUtils.randomPartitionKey();
String ehk = ProducerUtils.randomExplicitHashKey(); // <---- NEW EXPLICIT HASH KEY FOR EVERY RECORD
byte[] data = ProducerUtils.randomData(i, ProducerConfig.RECORD_SIZE_BYTES);
// addUserRecord returns non-null when a full record is ready to
// transmit
AggRecord aggRecord = aggregator.addUserRecord(pk, ehk, data);
if (aggRecord != null) {
ForkJoinPool.commonPool().execute(() -> {
sendRecord(producer, streamName, aggRecord);
});
}
}
}
You can see how each new User Record receives a new Explicit Hash Key that does not necessarily correspond to the first one in the Stream Record (which is the one that is actually used by the Stream Record). If just one of the aggregated User Records references an Explicit Hash Key from a different shard, all the records die in the KCL 💀
Possible resolution
There are many ways in which this problem can be solved (these are just some suggestion from the top of my head):
- Document the issue. Leave the code be, it is valid after all, you just need to know how to use it.
- Modify the
validateExplicitHashKeymethod here so that it actually checks the Explicit Hash Key in relationship to the shard the User record is supposed to go to. That is to say, make it compatible with the KCL. - Modify the methods so that the same Explicit Hash Key is actually persisted to all of the User Records inside a Stream Record transparently.
Best regards, Cosmin
About this issue
- Original URL
- State: open
- Created 8 years ago
- Reactions: 7
- Comments: 15 (4 by maintainers)
What makes this issue even worse: The partition/hash keys are still transmitted per user record, not only per aggregated record. At the same time, I’m forced to choose the exact same partition/hash key for all user records in an aggregated record. This adds totally unnecessary overhead, in particular for small user records:
Consider small records of 100 bytes. The per user record overhead is about 70 bytes. Aggregating 1000 records adds 1000 times the same hash/partition key, and increases the agg record size by 70%, and decreases the maximum throughput by 40%.
@ssesha I believe I have a solution for this. Are you in a position to help me test at scale? Can you email meyersi@amazon.com if so?
@cosmincatalin @brentnash is documenting this limitation and force setting all User records in an Aggregated record to have the same EHK the way to go. Is there a pr for this? If not I could try and raise one.