hudi: [SUPPORT] Deltastreamer Fails with AWSDmsAvroPayload

Describe the problem you faced

Deltastreamer ingest fails with AWSDmsAvroPayload, works without with identical configuration.

To Reproduce

Steps to reproduce the behavior:

  1. Enable --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
  2. Run deltastreamer in a AWS EMR cluster.

Expected behavior

Deltastreamer creates hudi dataset.

Environment Description

  • Hudi version : 0.13.0

  • Spark version : 3.3.1

  • Hive version : 3.1.3

  • Hadoop version : Amazon 3.3.3

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : no

Additional context

Bootstrapped 0.13.0 jars: hudi-spark3.3-bundle_2.12-0.13.0.jar hudi-utilities-slim-bundle_2.12-0.13.0.jar hudi-aws-bundle-0.13.0.jar

spark-submit 
--master yarn 
--jars /mnt1/hudi-jars/hudi-spark-bundle.jar,/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar,/mnt1/hudi-jars/hudi-aws-bundle.jar 
--deploy-mode cluster 
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer 
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /mnt1/hudi-jars/hudi-utilities-slim-bundle.jar 
--table-type COPY_ON_WRITE 
--source-ordering-field replicadmstimestamp 
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource 
--target-base-path s3://<s3path>/<table> 
--target-table table 
--payload-class org.apache.hudi.common.model.AWSDmsAvroPayload 
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator 
--hoodie-conf hoodie.datasource.write.recordkey.field=_id 
--hoodie-conf hoodie.datasource.write.partitionpath.field=Loc 
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://<differents3path>/<table>

Stacktrace

23/03/23 16:46:38 ERROR HoodieCreateHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=463607 partitionPath=SGILB}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:89) ~[__app__.jar:0.13.0]
	at org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132) ~[__app__.jar:0.13.0]
	at org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:142) ~[__app__.jar:0.13.0]
	at org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:175) ~[__app__.jar:0.13.0]
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98) ~[__app__.jar:0.13.0]
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42) ~[__app__.jar:0.13.0]
	at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) ~[__app__.jar:0.13.0]
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:39) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ~[__app__.jar:0.13.0]
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) ~[scala-library-2.12.15.jar:?]
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1509) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1332) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362]
	at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 3
  • Comments: 22 (12 by maintainers)

Most upvoted comments

I was about to report exactly the same issue. The problem has to do with the DELETES. By the time DeltaStreamer parses a DELETE record in DMS .parquet files, I face the error above. I believe it should be relevant to #6590. My setup is the following: I have a simple Postgres database (with a simple example table) and I ingest the .parquet files using DMS. Then I’m running Hudi locally in order to create the table. The command I’m using is the following:

./spark-submit \
--jars "/jars/hudi-utilities-bundle_2.12-0.13.0.jar,/jars/hudi-spark3.3-bundle_2.12-0.13.0.jar,/jars/aws-java-sdk-bundle-1.12.398.jar,/jars/hadoop-aws-3.3.4.jar,/jars/hudi-aws-bundle-0.13.0.jar" \
--master "local[2]" \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.ui.port=4040 \
--class "org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer" /jars/hudi-utilities-bundle_2.12-0.13.0.jar \
--source-class "org.apache.hudi.utilities.sources.ParquetDFSSource" \
--table-type COPY_ON_WRITE --op UPSERT \
--target-base-path s3a://cdc-spike/hudi/postgres/employee \
--target-table employee \
--min-sync-interval-seconds 60 \
--payload-class "org.apache.hudi.payload.AWSDmsAvroPayload" \
--hoodie-conf "hoodie.deltastreamer.source.dfs.root=s3a://cdc-spike/dms/public/employee/" \
--source-ordering-field _ingestion_timestamp \
--hoodie-conf hoodie.datasource.write.recordkey.field=id \
--hoodie-conf hoodie.datasource.write.hive_style_partitioning=true \
--hoodie-conf hoodie.datasource.write.partitionpath.field=salary

To be precise, when I do a delete in the database and hudi tries to process it I get the following:

23/03/23 17:08:10 INFO HoodieCreateHandle: New CreateHandle for partition :salary=__HIVE_DEFAULT_PARTITION__ with fileId a208ebd0-a29c-4f3b-8b46-e39c6ee6fa86-0
23/03/23 17:08:10 ERROR HoodieCreateHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=5 partitionPath=salary=__HIVE_DEFAULT_PARTITION__}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:89)
	at org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132)
	at org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:142)
	at org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:175)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98)
	at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42)
	at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80)
	at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:39)
	at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119)
	at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
	at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223)
	at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1509)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1332)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

@yihua Yes, it works! Thank you!