hudi: [SUPPORT] HoodieKeyException: recordKey value: "null"

Tips before filing an issue

Describe the problem you faced

Write operation using builk_insert fails, when writing to non-empty Hudi table. It does not fail if table is empty.

To Reproduce

Steps to reproduce the behavior:

  1. Create new table and write some data with bulk_insert option.
  2. Write the same data batch to this table with bulk_insert option.

Hudi settings:

val unpartitionDataConfig = Map(
    HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[
      NonPartitionedExtractor
    ].getName,
    KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
  )

private def options(
      table: String,
      primaryKey: String,
      database: String,
      operation: String
  ): Map[String, String] =
    Map(
      OPERATION.key -> operation,
      PRECOMBINE_FIELD.key -> EventTimestampColumn,
      RECORDKEY_FIELD.key -> primaryKey,
      TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL,
      TBL_NAME.key -> table,
      "hoodie.consistency.check.enabled" -> "true",
      HIVE_SYNC_MODE.key -> "jdbc",
      HIVE_SYNC_ENABLED.key -> "true",
      HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
      HIVE_DATABASE.key -> database,
      HIVE_TABLE.key -> table,
      UPSERT_PARALLELISM_VALUE.key -> "4",
      DELETE_PARALLELISM_VALUE.key -> "4",
      BULKINSERT_PARALLELISM_VALUE.key -> "4"
    ) ++ unpartitionDataConfig

  def writerOptions(
      table: String,
      primaryKey: String,
      database: String
  ): Map[String, String] = {
    val operation = BULK_INSERT_OPERATION_OPT_VAL
    options(
      table,
      primaryKey,
      database,
      operation
    ) ++ unpartitionDataConfig
  }

Spark main code:

val options = writerOptions(
    tableName,
    primaryKey,
    database
  )
session.read.format("parquet")
.load(inputPath)
.write
.format("hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(targetPath)

Expected behavior

Data is overwritten when second step is finished. No logical duplicates in the table.

Environment Description

  • Hudi version : 0.9 (“org.apache.hudi” %% “hudi-spark3-bundle” % “0.9.0”) Self-package in fat jar with Spark app.

  • Spark version : 3.1.2 (EMR)

  • Hive version : AWS Glue

  • Hadoop version : Hadoop 3.2.1 (EMR)

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : no

Additional context

  1. It is dynamic issues. Sometimes it works fine, so that my Spark job can successfully overwrite the table with bulk_insert.
  2. I am using JVM concurrency via Scala Spark code by writing several tables via Spark in parallel. Perhaps that leads to some Hudi / Spark thread-safety issue?

Stacktrace

21/10/07 12:03:18 INFO YarnScheduler: Killing all running tasks in stage 17: Stage cancelled
21/10/07 12:03:18 INFO DAGScheduler: ResultStage 17 (save at HoodieSparkSqlWriter.scala:463) failed in 3.282 s due to Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 32) (ip-10-100-160-252.....local executor 1): org.apache.spark.SparkException: Failed to execute user defined function(UDFRegistration$$Lambda$2098/1888531409: (struct<here comes my table schema in struct format.... it has many columns and they have different logical types>) => string)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
	at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42)
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306)
	at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "my_primary_key_column_here" cannot be null or empty.
	at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
	at org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator.getRecordKey(NonpartitionedAvroKeyGenerator.java:60)
	at org.apache.hudi.keygen.NonpartitionedKeyGenerator.getRecordKey(NonpartitionedKeyGenerator.java:50)
	at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
	at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:75)
	at org.apache.spark.sql.UDFRegistration.$anonfun$register$352(UDFRegistration.scala:777)
	... 22 more

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 16 (8 by maintainers)

Most upvoted comments

@novakov-alexey @Carl-Zhou-CN Thanks for making the fix and the test. such a great collaboration!

As JIRA was filed here, closing this issue.

https://issues.apache.org/jira/browse/HUDI-2582