hudi: [BUG] Example from Hudi Quick start doesnt work!
Describe the problem you faced I am trying to merge CDC json data into snapshot. For this I first took the dataframe from existing parquet (snapshot folder) and tried to write to s3 in hudi format. I get the below error.
A clear and concise description of the problem.
- I am running in spark shell with 3 executors ; each with 3GB memory, 1core. For driver: 1core. 1gb memory.
- Below is the code with the markup where its failing.
To Reproduce
import org.apache.hudi.QuickstartUtils
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession
import java.util
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.sql.SaveMode.Overwrite
val snapshotDf = Application.spark().read.parquet("s3://bucket/snapshots-test/dbdump/_bid_9223370348443853913/")
val cdcSchema = SparkUtils.getSchema("s3://bucket/schemas/dbdump-schema.json")
val cdcDf = Application.spark().read.schema(cdcSchema).json("s3://bucket/inputs/dbdump/")
/* done */
/* merge them */
snapshotDf.registerTempTable("snapshot");
val snapshotDf2 = Application.spark().sql("select * from snapshot where cdc_oid is not null and cdc_oid !='' ")
val snapshotDf3 = snapshotDf2.withColumn("hash", lit(col("cdc_oid").hashCode() %1000) )
/* BELOW is taken from Quick Start guide of HUDI : https://hudi.apache.org/docs/quick-start-guide */
snapshotDf3.write.format("hudi").options(QuickstartUtils.getQuickstartWriteConfigs())
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "cdc_oid")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hash")
.option(TBL_NAME.key(), "GE11")
.mode(Overwrite)
.save("s3://bucket/snapshots-hudi/ge11/snapshot");
Steps to reproduce the behavior:
- Run the above program
Expected behavior
- The hudi table should have been created from the snapshot parquet files
- The merge should have happened from CDC , but before this itself things failed
-
Hudi version :hudi-spark3.3-bundle_2.12-0.12.3.jar
-
Spark version : 3.3.0
-
Storage (HDFS/S3/GCS…) : s3
-
Running on Docker? (yes/no) : no
Stacktrace
07-07 14:11:10 WARN DAGScheduler: Broadcasting large task binary with size 1033.6 KiB
07-07 14:12:24 ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 24 (8 by maintainers)
Ok, will recheck. thank you for the update. Will post back.