hudi: [SUPPORT] HoodieKeyException: recordKey value: "null"
Tips before filing an issue
- Have you gone through our FAQs? Yes. Right address is https://hudi.apache.org/learn/faq/
Describe the problem you faced
Write operation using builk_insert fails, when writing to non-empty Hudi table. It does not fail if table is empty.
To Reproduce
Steps to reproduce the behavior:
- Create new table and write some data with bulk_insert option.
- Write the same data batch to this table with bulk_insert option.
Hudi settings:
val unpartitionDataConfig = Map(
HIVE_PARTITION_EXTRACTOR_CLASS.key -> classOf[
NonPartitionedExtractor
].getName,
KEYGENERATOR_CLASS_NAME.key -> classOf[NonpartitionedKeyGenerator].getName
)
private def options(
table: String,
primaryKey: String,
database: String,
operation: String
): Map[String, String] =
Map(
OPERATION.key -> operation,
PRECOMBINE_FIELD.key -> EventTimestampColumn,
RECORDKEY_FIELD.key -> primaryKey,
TABLE_TYPE.key -> COW_TABLE_TYPE_OPT_VAL,
TBL_NAME.key -> table,
"hoodie.consistency.check.enabled" -> "true",
HIVE_SYNC_MODE.key -> "jdbc",
HIVE_SYNC_ENABLED.key -> "true",
HIVE_SUPPORT_TIMESTAMP_TYPE.key -> "true",
HIVE_DATABASE.key -> database,
HIVE_TABLE.key -> table,
UPSERT_PARALLELISM_VALUE.key -> "4",
DELETE_PARALLELISM_VALUE.key -> "4",
BULKINSERT_PARALLELISM_VALUE.key -> "4"
) ++ unpartitionDataConfig
def writerOptions(
table: String,
primaryKey: String,
database: String
): Map[String, String] = {
val operation = BULK_INSERT_OPERATION_OPT_VAL
options(
table,
primaryKey,
database,
operation
) ++ unpartitionDataConfig
}
Spark main code:
val options = writerOptions(
tableName,
primaryKey,
database
)
session.read.format("parquet")
.load(inputPath)
.write
.format("hudi")
.options(options)
.mode(SaveMode.Overwrite)
.save(targetPath)
Expected behavior
Data is overwritten when second step is finished. No logical duplicates in the table.
Environment Description
-
Hudi version : 0.9 (“org.apache.hudi” %% “hudi-spark3-bundle” % “0.9.0”) Self-package in fat jar with Spark app.
-
Spark version : 3.1.2 (EMR)
-
Hive version : AWS Glue
-
Hadoop version : Hadoop 3.2.1 (EMR)
-
Storage (HDFS/S3/GCS…) : S3
-
Running on Docker? (yes/no) : no
Additional context
- It is dynamic issues. Sometimes it works fine, so that my Spark job can successfully overwrite the table with bulk_insert.
- I am using JVM concurrency via Scala Spark code by writing several tables via Spark in parallel. Perhaps that leads to some Hudi / Spark thread-safety issue?
Stacktrace
21/10/07 12:03:18 INFO YarnScheduler: Killing all running tasks in stage 17: Stage cancelled
21/10/07 12:03:18 INFO DAGScheduler: ResultStage 17 (save at HoodieSparkSqlWriter.scala:463) failed in 3.282 s due to Job aborted due to stage failure: Task 0 in stage 17.0 failed 4 times, most recent failure: Lost task 0.3 in stage 17.0 (TID 32) (ip-10-100-160-252.....local executor 1): org.apache.spark.SparkException: Failed to execute user defined function(UDFRegistration$$Lambda$2098/1888531409: (struct<here comes my table schema in struct format.... it has many columns and they have different logical types>) => string)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:42)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1(Partitioner.scala:306)
at org.apache.spark.RangePartitioner$.$anonfun$sketch$1$adapted(Partitioner.scala:304)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieKeyException: recordKey value: "null" for field: "my_primary_key_column_here" cannot be null or empty.
at org.apache.hudi.keygen.KeyGenUtils.getRecordKey(KeyGenUtils.java:141)
at org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator.getRecordKey(NonpartitionedAvroKeyGenerator.java:60)
at org.apache.hudi.keygen.NonpartitionedKeyGenerator.getRecordKey(NonpartitionedKeyGenerator.java:50)
at org.apache.hudi.keygen.BaseKeyGenerator.getKey(BaseKeyGenerator.java:62)
at org.apache.hudi.keygen.BuiltinKeyGenerator.getRecordKey(BuiltinKeyGenerator.java:75)
at org.apache.spark.sql.UDFRegistration.$anonfun$register$352(UDFRegistration.scala:777)
... 22 more
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 16 (8 by maintainers)
@novakov-alexey @Carl-Zhou-CN Thanks for making the fix and the test. such a great collaboration!
As JIRA was filed here, closing this issue.
https://issues.apache.org/jira/browse/HUDI-2582