hudi: [SUPPORT] CoW: Hudi Upsert not working when there is a timestamp field in the composite key

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A batch process with updates to the existing tables in Datalake. These are Hive external partitioned tables with location pointed to a s3 directory. I’m working on PoC to migrate all the tables to Hudi. I did a bulk_insert for IDL and everything went fine. For upserts, I have a problem. My primary key combo has a timestamp field in it. I have added all the required config in my code. Data are getting duplicated because of the timestamp field generating differently in the recordkey.field while upsert operation. Below are my hudi options: hudi_options = { ‘hoodie.table.name’: ‘f_claim_mdcl_hudi_cow’, ‘hoodie.datasource.write.recordkey.field’: ‘claim_id,pat_id,claim_subm_dt,plac_of_srvc_cd,src_pri_psbr_id,src_plan_id’ ‘hoodie.datasource.write.partitionpath.field’: ‘src_sys_nm,yr_mth’, ‘hoodie.datasource.write.table.Type’: ‘COPY_ON_WRITE’, ‘hoodie.datasource.write.table.name’: ‘f_hudi_cow’, # ‘hoodie.combine.before.insert’: ‘false’, ‘hoodie.combine.before.upsert’: ‘true’, ‘hoodie.datasource.hive_sync.enable’: ‘true’, ‘hoodie.datasource.hive_sync.table’: ‘f_hudi_cow’, ‘hoodie.datasource.hive_sync.partition_fields’: ‘src_sys_nm,yr_mth’, ‘hoodie.datasource.hive_sync.partition_extractor_class’: ‘org.apache.hudi.hive.MultiPartKeysValueExtractor’, ‘hoodie.datasource.write.hive_style_partitioning’: ‘true’, ‘hoodie.datasource.hive_sync.database’: ‘us_commercial_datalake_app_commons_dev’, ‘hoodie.datasource.hive_sync.support_timestamp’: ‘true’, ‘hoodie.datasource.hive_sync.auto_create_db’:‘false’, ‘hoodie.datasource.write.keygenerator.class’: ‘org.apache.hudi.keygen.ComplexKeyGenerator’, ‘hoodie.datasource.write.row.writer.enable’: ‘true’, ‘hoodie.parquet.small.file.limit’: ‘600000000’, ‘hoodie.parquet.max.file.size’: ‘1000000000’, ‘hoodie.upsert.shuffle.parallelism’: ‘10000’, ‘hoodie.insert.shuffle.parallelism’: ‘10000’, ‘hoodie.clean.automatic’: ‘false’, ‘hoodie.cleaner.commits.retained’: 3, ‘hoodie.index.type’: ‘GLOBAL_SIMPLE’, ‘hoodie.simple.index.update.partition.path’:‘true’, ‘hoodie.metadata.enable’: ‘true’ }

df.write.format(“org.apache.hudi”).
options(**hudi_options).option(‘hoodie.datasource.write.operation’, ‘upsert’).
mode(“APPEND”).
save(“{s3_path}”) I don’t get any errors while processing. My record key for the bulk insert looks like this:

after Bulk insert, _hoodie_record_key: claim_id:10420217599403398158,pat_id:8607357348,claim_subm_dt:2020-11-21 00:00:00.0,plac_of_srvc_cd:INPATIENT HOSPITAL,src_pri_psbr_id:7605954,src_plan_id:0009659999

after Upsert , another key was added to the same record with _hoodie_record_key: claim_id:10420217599403398158,pat_id:8607357348,claim_subm_dt:1605916800000000,plac_of_srvc_cd:INPATIENT HOSPITAL,src_pri_psbr_id:7605954,src_plan_id:0009659999

To Reproduce

Steps to reproduce the behavior:

  1. Generate a set of records with timestamp as one of the primary keys in Hive external table stored on s3
  2. Load the same set of records with mode(“append”) and option(‘hoodie.datasource.write.operation’, ‘upsert’)
  3. Check for duplicates excluding in the data

Expected behavior

No duplicates in the data. Recordkey.field to remain the same for timestamp field and not get converted to long

Environment Description

  • Hudi version : 0.7.0 installed in EMR 5.33

  • Spark version : 2.4.7

  • Hive version : 2.3.7

  • Hadoop version : Amazon 2.10.1

  • Storage (HDFS/S3/GCS…) : s3

  • Running on Docker? (yes/no) : No

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 19 (12 by maintainers)

Most upvoted comments

@nsivabalan I set the row_writer property to False and ingested the data. Now, timestamp gets converted to their respective epoch seconds and long datatype in hoodie_key image

This actually solves my issue since during upsert, the key would be in sync with the IDL key. But bulk_insert with row.writer:False is very slow. It actually takes double the time for the same data ingestion.