hudi: [SUPPORT] Duplicate records in COW table within same partition path

Describe the problem you faced We have written IOT data from Kafka to Azure blob using Deltastreamer utility in continuous mode and are querying the table through Presto. We are seeing duplicate records with the same _hoodie_record_key but different commit file number and different parquet file in the same partition path.

 _hoodie_commit_time |   _hoodie_commit_seqno   |                           _hoodie_record_key                            | _hoodie_partition_path |                             _hoodie_file_name                             | master_timestamp |   timest
---------------------+--------------------------+-------------------------------------------------------------------------+------------------------+---------------------------------------------------------------------------+------------------+---------
 20211206002458      | 20211206002458_1_4116796 | vehicle_identification_number:P53ACDCB2AKA00081,timestamp:1638708846929 | dt=2021-12-05          | 5885895a-78d1-468b-9e7b-045d77644d1c-0_1-1706-2959_20211206002458.parquet |    1638708851906 | 16387088
 20211206120116      | 20211206120116_1_1745292 | vehicle_identification_number:P53ACDCB2AKA00081,timestamp:1638708846929 | dt=2021-12-05          | df619ce7-cd21-41fc-9e6b-68386748bde4-0_1-470-1174_20211206120116.parquet  |    1638708851906 | 1638708

Deltastreamer configs:

#base properties
hoodie.upsert.shuffle.parallelism=500
hoodie.insert.shuffle.parallelism=500
hoodie.delete.shuffle.parallelism=50
hoodie.bulkinsert.shuffle.parallelism=10
hoodie.embed.timeline.server=true
hoodie.filesystem.view.type=EMBEDDED_KV_STORE
hoodie.compact.inline=false

#cleaning
hoodie.cleaner.policy=KEEP_LATEST_COMMITS
hoodie.cleaner.commits.retained=1
hoodie.clean.async=true

#archival
hoodie.keep.min.commits=2
hoodie.keep.max.commits=10

#datasource properties
hoodie.deltastreamer.schemaprovider.registry.url=http://10.xx.yy.zz:8081/subjects/dp.hmi.quectel.event.lpe.packet.v2/versions/latest
hoodie.datasource.write.recordkey.field=vehicle_identification_number,timestamp
hoodie.deltastreamer.source.kafka.topic=dp.hmi.quectel.event.lpe.packet.v2
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.CustomKeyGenerator
hoodie.datasource.write.partitionpath.field=timestamp:TIMESTAMP

hoodie.deltastreamer.keygen.timebased.timestamp.type=EPOCHMILLISECONDS
hoodie.deltastreamer.keygen.timebased.input.timezone=UTC
hoodie.deltastreamer.keygen.timebased.output.timezone=UTC
hoodie.deltastreamer.keygen.timebased.output.dateformat='dt='yyyy-MM-dd


#kafka props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
schema.registry.url=http://10.xx.yy.zz:8081

#prometheus
hoodie.metrics.pushgateway.host=k8s-prometheus-pushgateway.observability.svc.cluster.local
hoodie.metrics.pushgateway.port=9091
hoodie.metrics.on=true
hoodie.deltastreamer.kafka.source.maxEvents=10000000
hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY

Environment Description

  • Hudi version : 0.9.0

  • Spark version : 2.4.4

  • Hive version : 3.1.2

  • Hadoop version : 3.1.2

  • Storage (HDFS/S3/GCS…) : Azure Blob

  • Running on Docker? (yes/no) : K8s

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 17 (17 by maintainers)

Most upvoted comments

I see you are using “INSERT” as the operation type. If your incoming records have duplicates, those can reflect as duplicates in hudi as well. only with “UPSERT” we de-dup explicitly. for “INSERT”, you need to set this config https://hudi.apache.org/docs/configurations/#hoodiecombinebeforeinsert to true if you want hudi to dedup before ingesting to hudi.

Can you try setting this and let us know how it goes.

@stym06 : Can you give this a try https://github.com/apache/hudi/pull/3222. would help to certify the patch too.