hudi: [BUG] Example from Hudi Quick start doesnt work!

Describe the problem you faced I am trying to merge CDC json data into snapshot. For this I first took the dataframe from existing parquet (snapshot folder) and tried to write to s3 in hudi format. I get the below error.

A clear and concise description of the problem.

  1. I am running in spark shell with 3 executors ; each with 3GB memory, 1core. For driver: 1core. 1gb memory.
  2. Below is the code with the markup where its failing.

To Reproduce

import org.apache.hudi.QuickstartUtils
import org.apache.hudi.common.model.HoodieAvroPayload
import org.apache.hudi.common.model.WriteOperationType
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions
import org.apache.spark.SparkConf
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.api.java.function.Function
import org.apache.spark.sql.Dataset
import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

import java.util
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.spark.sql.SaveMode.Append
import org.apache.spark.sql.SaveMode.Overwrite

    val snapshotDf = Application.spark().read.parquet("s3://bucket/snapshots-test/dbdump/_bid_9223370348443853913/")
    val cdcSchema = SparkUtils.getSchema("s3://bucket/schemas/dbdump-schema.json")
    val cdcDf = Application.spark().read.schema(cdcSchema).json("s3://bucket/inputs/dbdump/")
    /* done */

    /* merge them */
    snapshotDf.registerTempTable("snapshot");
    val snapshotDf2 = Application.spark().sql("select * from snapshot where cdc_oid is not null and cdc_oid !='' ")
    val snapshotDf3 = snapshotDf2.withColumn("hash", lit(col("cdc_oid").hashCode() %1000) )

/* BELOW is taken from Quick Start guide of HUDI : https://hudi.apache.org/docs/quick-start-guide */
    snapshotDf3.write.format("hudi").options(QuickstartUtils.getQuickstartWriteConfigs())
      .option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY, "cdc_pk")
      .option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY, "cdc_oid")
      .option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY, "hash")
      .option(TBL_NAME.key(), "GE11")
      .mode(Overwrite)
      .save("s3://bucket/snapshots-hudi/ge11/snapshot");

Steps to reproduce the behavior:

  1. Run the above program

Expected behavior

  1. The hudi table should have been created from the snapshot parquet files
  2. The merge should have happened from CDC , but before this itself things failed
  • Hudi version :hudi-spark3.3-bundle_2.12-0.12.3.jar

  • Spark version : 3.3.0

  • Storage (HDFS/S3/GCS…) : s3

  • Running on Docker? (yes/no) : no

Stacktrace

07-07 14:11:10  WARN DAGScheduler: Broadcasting large task binary with size 1033.6 KiB
07-07 14:12:24  ERROR HoodieSparkSqlWriter$: UPSERT failed with errors
org.apache.hudi.exception.HoodieException: Write to Hudi failed
  at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:148)
  at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
  at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:103)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:114)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$7(SQLExecution.scala:139)
  at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107)
  at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:224)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:139)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:245)
  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:138)
  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:100)
  at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:96)
  at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:177)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:615)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
  at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:591)
  at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:96)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:83)
  at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:81)
  at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:124)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
  at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 24 (8 by maintainers)

Most upvoted comments

Ok, will recheck. thank you for the update. Will post back.