hudi: [SUPPORT] Using monotonically_increasing_id to generate record key causing duplicates on upsert

Describe the problem you faced For context we have tables that are snapshotted daily/weekly (eg. RDS export) that we then have Spark jobs convert into Hudi tables (ie. we overwrite the full table). We attach a column using monotonically_increasing_id (https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.functions.monotonically_increasing_id.html ) as a surrogate key if there is no primary key available in our metadata.

We’re seeing some really odd behavior where it seems like the same record is sometimes written twice with different record keys (which is overwriting other records), eg dummy example:

Example input:
Row 1: id = 1
Row 2: id = 2
Row 3: id = 3

Adding monotonically_increasing_id (the record key of the Hudi table)
Row 1: id = 1, monotonically_increasing_id = 1
Row 2: id = 2, monotonically_increasing_id = 2
Row 3: id = 3, monotonically_increasing_id = 8589934593

Hudi table becomes:
Row 1: id = 3, monotonically_increasing_id = 1
Row 2: id = 2, monotonically_increasing_id = 2
Row 3: id = 3, monotonically_increasing_id = 8589934593

The problem seems non-deterministic (ie. re-running on the same input will fix the issue), for example on one job we saw:

# of rows: 154982072
# of duplicate rows with different record keys: 813263

When an upsert happens, is there retry logic that does a “partial retry”? The docs for monotonically_increasing_id mention its generated from (partition_id, record_number) and we suspect for whatever reason some rows are upserted multiple times in different stages (and changes the partition_id and record_number).

To Reproduce Unknown, re-running over the same input leads to different results. I asked in Hudi Slack and someone mentioned keygenerator needs to be unique https://apache-hudi.slack.com/archives/C4D716NPQ/p1675336371420009?thread_ts=1675301744.998269&cid=C4D716NPQ

Expected behavior This seems like an issue with our usage of Hudi:

  1. What are the requirements for record key? Would using Spark’s uuid be safe? We found https://issues.apache.org/jira/browse/SPARK-23599

We have encountered this problem with Spark 3.1.2, resulting in duplicate values in a situation where a spark executor died. As suggested in the description, this error was hard to track down and difficult to replicate.

Is there a way to generate a surrogate key for a Hudi table?

  1. Would changing our operation to insert/bulk_insert fix the issue? Naively to me it seems like this will cause duplicates, ie the output becoming:
Row 1: id = 1, monotonically_increasing_id = 1
Row 1: id = 3, monotonically_increasing_id = 1
Row 2: id = 2, monotonically_increasing_id = 2
Row 3: id = 3, monotonically_increasing_id = 8589934593

Environment Description

We are running on EMR 6.9

  • Hudi version : 0.12.1

  • Spark version : 3.3.0

  • Hive version : 3.1.3

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : Yes (Spark on Docker)

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Reactions: 1
  • Comments: 19 (10 by maintainers)

Most upvoted comments

hey @jtmzheng : yeah, this might end up w/ duplicates or data loss on failure scenarios. we have put up https://github.com/apache/hudi/pull/8107 for this ask. If you can try and let us know if it works, it would be nice. we did test w/ failure scenario and validated that there is no data loss or duplicates w/ this patch.