hudi: [SUPPORT] NPE when reading from a CDC-enabled table with null values
Describe the problem you faced
Reading from a CDC-enabled Hudi table results in a cryptic NPE error as shown in the stacktrace section. The error occurs after writing to the table with WriteOperationType.BULK_INSERT. Writing with UPSERT does not cause any issues when reading.
To Reproduce
Steps to reproduce the behavior:
- Write to Hudi table:
ds.write()
.format("hudi")
//... some options omitted for brevity
.option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.name())
.option(HoodieTableConfig.NAME.key(), "foo")
.option(HoodieTableConfig.CDC_ENABLED.key(), "true")
.option(HoodieTableConfig.CDC_SUPPLEMENTAL_LOGGING_MODE.key(), HoodieCDCSupplementalLoggingMode.data_before_after.name())
.mode(Overwrite)
.save(...);
- Read from the table
spark.readStream()
.format("hudi")
.option("hoodie.datasource.query.incremental.format", "cdc")
.option("hoodie.datasource.query.type", "incremental")
.load(...)
.writeStream()
.foreachBatch((batchDF, batchId) -> {
batchDF.show(); // FIXME: this line results in an NPE
}).start().awaitTermination();
Expected behavior
Expecting same results when reading from the table regardless of the write operation used. Expecting “Support all the write operations” as stated in the Design Goals section of the CDC RFC.
Environment Description
-
Hudi version : 0.13.1
-
Spark version : 3.3.2
-
Hive version : n/a
-
Hadoop version : 3.3.3
-
Storage (HDFS/S3/GCS…) : local file system for testing, S3 in the long run
-
Running on Docker? (yes/no) : yes
Stacktrace
23/06/21 22:10:33 ERROR MicroBatchExecution: Query [id = e4cb7dc2-dba3-403e-8d9d-d5491dc7e8b0, runId = 1837bba4-981a-4edb-be4d-9d877abf65bd] terminated with error
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (172.19.0.3 executor 0): java.lang.NullPointerException: Cannot invoke "org.apache.spark.unsafe.types.UTF8String.toString()" because the return value of "org.apache.spark.sql.catalyst.InternalRow.getUTF8String(int)" is null
at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:506)
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:459)
at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48)
at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3868)
at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
at org.apache.spark.sql.Dataset.head(Dataset.scala:2863)
at org.apache.spark.sql.Dataset.take(Dataset.scala:3084)
at org.apache.spark.sql.Dataset.getRows(Dataset.scala:288)
at org.apache.spark.sql.Dataset.showString(Dataset.scala:327)
at org.apache.spark.sql.Dataset.show(Dataset.scala:808)
at org.apache.spark.sql.Dataset.show(Dataset.scala:767)
at org.apache.spark.sql.Dataset.show(Dataset.scala:776)
...
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.InternalRow.getString(InternalRow.scala:34)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1(HoodieCDCRDD.scala:562)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.$anonfun$convertRowToJsonString$1$adapted(HoodieCDCRDD.scala:559)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.convertRowToJsonString(HoodieCDCRDD.scala:559)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.loadNext(HoodieCDCRDD.scala:295)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNextInternal(HoodieCDCRDD.scala:263)
at org.apache.hudi.cdc.HoodieCDCRDD$CDCFileGroupIterator.hasNext(HoodieCDCRDD.scala:284)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 15 (13 by maintainers)
Commits related to this issue
- Fix #9032 by handling null strings in convertRowToJsonString — committed to zaza/hudi by tomek-pv a year ago
@zaza Thanks for the information. I am able to reproduce it with values null in one of the column. Also confirmed this is only happening with bulk_insert. I will check with master code once and then create a JIRA to fix it if its still the issue.
@zaza Thanks for the PR. Please update it according to my comments on PR.
JIRA for tracking - https://issues.apache.org/jira/browse/HUDI-6450
I’ve created a PR for the issue, see https://github.com/apache/hudi/pull/9064 This is essentially @stp-pv 's diff turned into a PR. Let me know if I there’s anything I should change about the PR. We have already verified it fixes the issue for Strings and as far as I can tell non-String fields are not affected.
@stp-pv nice fix.
map(field.name) = record.get(idx, field.dataType)is there possible the same problem? can you also fake a case to test this? And then you can fix them together. Thanks.