hudi: [SUPPORT] Deltastreamer Fails with AWSDmsAvroPayload
Describe the problem you faced
Deltastreamer ingest fails with AWSDmsAvroPayload, works without with identical configuration.
To Reproduce
Steps to reproduce the behavior:
- Enable --payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
- Run deltastreamer in a AWS EMR cluster.
Expected behavior
Deltastreamer creates hudi dataset.
Environment Description
-
Hudi version : 0.13.0
-
Spark version : 3.3.1
-
Hive version : 3.1.3
-
Hadoop version : Amazon 3.3.3
-
Storage (HDFS/S3/GCS…) : S3
-
Running on Docker? (yes/no) : no
Additional context
Bootstrapped 0.13.0 jars: hudi-spark3.3-bundle_2.12-0.13.0.jar hudi-utilities-slim-bundle_2.12-0.13.0.jar hudi-aws-bundle-0.13.0.jar
spark-submit
--master yarn
--jars /mnt1/hudi-jars/hudi-spark-bundle.jar,/mnt1/hudi-jars/hudi-utilities-slim-bundle.jar,/mnt1/hudi-jars/hudi-aws-bundle.jar
--deploy-mode cluster
--conf spark.serializer=org.apache.spark.serializer.KryoSerializer
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /mnt1/hudi-jars/hudi-utilities-slim-bundle.jar
--table-type COPY_ON_WRITE
--source-ordering-field replicadmstimestamp
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource
--target-base-path s3://<s3path>/<table>
--target-table table
--payload-class org.apache.hudi.common.model.AWSDmsAvroPayload
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
--hoodie-conf hoodie.datasource.write.recordkey.field=_id
--hoodie-conf hoodie.datasource.write.partitionpath.field=Loc
--hoodie-conf hoodie.deltastreamer.source.dfs.root=s3://<differents3path>/<table>
Stacktrace
23/03/23 16:46:38 ERROR HoodieCreateHandle: Error writing record HoodieRecord{key=HoodieKey { recordKey=463607 partitionPath=SGILB}, currentLocation='null', newLocation='null'}
java.util.NoSuchElementException: No value present in Option
at org.apache.hudi.common.util.Option.get(Option.java:89) ~[__app__.jar:0.13.0]
at org.apache.hudi.common.model.HoodieAvroRecord.prependMetaFields(HoodieAvroRecord.java:132) ~[__app__.jar:0.13.0]
at org.apache.hudi.io.HoodieCreateHandle.doWrite(HoodieCreateHandle.java:142) ~[__app__.jar:0.13.0]
at org.apache.hudi.io.HoodieWriteHandle.write(HoodieWriteHandle.java:175) ~[__app__.jar:0.13.0]
at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:98) ~[__app__.jar:0.13.0]
at org.apache.hudi.execution.CopyOnWriteInsertHandler.consume(CopyOnWriteInsertHandler.java:42) ~[__app__.jar:0.13.0]
at org.apache.hudi.common.util.queue.SimpleExecutor.execute(SimpleExecutor.java:67) ~[__app__.jar:0.13.0]
at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:80) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
at org.apache.hudi.execution.SparkLazyInsertIterable.computeNext(SparkLazyInsertIterable.java:39) ~[hudi-spark3.3-bundle_2.12-0.13.0.jar:0.13.0]
at org.apache.hudi.client.utils.LazyIterableIterator.next(LazyIterableIterator.java:119) ~[__app__.jar:0.13.0]
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:46) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) ~[scala-library-2.12.15.jar:?]
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) ~[scala-library-2.12.15.jar:?]
at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:223) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1535) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1445) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1509) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1332) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:327) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.scheduler.Task.run(Task.scala:138) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) ~[spark-core_2.12-3.3.1-amzn-0.jar:3.3.1-amzn-0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_362]
About this issue
- Original URL
- State: closed
- Created a year ago
- Reactions: 3
- Comments: 22 (12 by maintainers)
I was about to report exactly the same issue. The problem has to do with the DELETES. By the time
DeltaStreamerparses aDELETErecord in DMS.parquetfiles, I face the error above. I believe it should be relevant to #6590. My setup is the following: I have a simple Postgres database (with a simple example table) and I ingest the .parquet files using DMS. Then I’m running Hudi locally in order to create the table. The command I’m using is the following:To be precise, when I do a delete in the database and hudi tries to process it I get the following:
@yihua Yes, it works! Thank you!