hudi: [SUPPORT] Spark structured streaming ingestion into Hudi fails after an upgrade to 0.12.2

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

After an upgrade from 0.12.1 to 0.13.0, ingestion from Kafka into a Hudi table via Spark structured streaming fails on the second micro-batch. When the job is restarted, it fails on the first micro-batch. After reverting the version to 0.12.1 the issue goes away. Each time the upgrade is attempted, the first micro-batch succeeds and the second one fails. The issue seems to occur on an attempt to expand small files which do not exist in the underlying storage.

To Reproduce

Steps to reproduce the behavior:

Use the write options provided in the below section to write data via Spark structured streaming. The job should fail when writing data in the second micro-batch.

Expected behavior

The ingestion job should continue to ingest more micro-batches.

Environment Description

  • Hudi version : 0.13.0

  • Spark version : 3.3.0

  • Hive version : 3.1.3

  • Hadoop version : 3.3.3

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : no

Additional context

Table services are set up asynchronously in separate jobs but were not running at the time. There was only one writer into the table at the time. Below are the full write options of the streaming ingestion (some values were redacted):

        "checkpointLocation" -> "<REDACTED>",
        DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL,
        HoodieWriteConfig.INSERT_PARALLELISM_VALUE.key() -> 200,
        DataSourceWriteOptions.STREAMING_IGNORE_FAILED_BATCH.key() -> "false",
        HoodieMetricsConfig.TURN_METRICS_ON.key() -> "true",
        HoodieMetricsConfig.METRICS_REPORTER_CLASS_NAME
          .key() -> "<REDACTED>",
        HoodieMetricsConfig.METRICS_REPORTER_TYPE_VALUE.key() -> "DATADOG",
        DatadogStatsdMetricsOptions.STATSD_HOST.key() -> writeOptionsArgs.statsDHost,
        DatadogStatsdMetricsOptions.PREFIX.key() -> "<REDACTED>",
        HoodieWriteConfig.TBL_NAME.key -> dataLakeRecord.tableName,
        DataSourceWriteOptions.TABLE_TYPE.key -> DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL,
        DataSourceWriteOptions.RECORDKEY_FIELD.key -> dataLakeRecord.recordKey.mkString(","),
        DataSourceWriteOptions.KEYGENERATOR_CLASS_NAME.key() -> dataLakeRecord.keyGeneratorClassName,
        DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> dataLakeRecord.partitionPathKey,
        DataSourceWriteOptions.PRECOMBINE_FIELD.key -> dataLakeRecord.precombineKey,
        DataSourceWriteOptions.HIVE_STYLE_PARTITIONING.key() -> "true",
        DataSourceWriteOptions.ASYNC_COMPACT_ENABLE.key() -> "false",
        HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key() -> "false",
        HoodieCompactionConfig.INLINE_COMPACT.key() -> "false",
        HoodieArchivalConfig.AUTO_ARCHIVE.key() -> "false",
        HoodieMetadataConfig.ENABLE.key() -> "false",
        HoodieCleanConfig.AUTO_CLEAN.key() -> "false",
        HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key() -> "org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_TABLE_NAME.key() -> "<REDACTED>",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_PARTITION_KEY.key() -> s"${tableName}-${awsRegion}",
        DynamoDbBasedLockConfig.DYNAMODB_LOCK_REGION.key() -> "us-east-1",
        HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key() -> WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.name(),
        HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key() -> HoodieFailedWritesCleaningPolicy.LAZY.name(),
        DataSourceWriteOptions.STREAMING_CHECKPOINT_IDENTIFIER.key() -> s"${writeOptionsArgs.topicName}-ingestion"

Stacktrace

The exact partition values were redacted.

org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :379
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:336)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:342)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$1(BaseSparkCommitActionExecutor.java:253)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:907)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:907)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:378)
	at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1525)
	at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1435)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1499)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1322)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:376)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:327)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:138)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1516)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.util.NoSuchElementException: FileID 1fdf04cc-229d-48a8-8b85-a6951b484fc0-0 of partition path env_id=<REDACTED>/week=<REDACTED> does not exist.
	at org.apache.hudi.io.HoodieMergeHandle.getLatestBaseFile(HoodieMergeHandle.java:156)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:122)
	at org.apache.hudi.io.HoodieMergeHandleFactory.create(HoodieMergeHandleFactory.java:64)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.getUpdateHandle(BaseSparkCommitActionExecutor.java:385)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:362)
	at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:79)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:329)
	... 29 more

sparkui

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 19 (8 by maintainers)

Most upvoted comments

Yes, I created a new table and ingested ~1B records using Hudi 0.12.1. Then I restarted the job with 0.13.0 (same issue happened with 0.12.2, 0.12.3); the first micro-batch succeeded and the next one failed.

We haven’t looked into the DeltaStreamer to be honest as all of our jobs are written in Spark. Our ingestion job does a bunch of transformations and filtering and adds computed columns. Seems like this might be possible with the transformer class using the DeltaStreamer but we haven’t looked into it, we haven’t had any issues with Spark Streaming until now.

@psendyk You add this settings and try it. May be it works.

                .option(HoodieWriteConfig.AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE.key(), true)
                .option(HoodieWriteConfig.AVRO_SCHEMA_VALIDATE_ENABLE.key(), true)
                .option(HoodieWriteConfig.SCHEMA_ALLOW_AUTO_EVOLUTION_COLUMN_DROP.key(), true)
                .option(HoodieCommonConfig.RECONCILE_SCHEMA.key(), true)
                .option(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), true)
                .option(HoodieIndexConfig.INDEX_TYPE.key(), HoodieIndex.IndexType.SIMPLE.name())
                .option(HoodieLayoutConfig.LAYOUT_TYPE.key(), HoodieStorageLayout.LayoutType.DEFAULT.name())

and query with configs

set hoodie.schema.on.read.enable=true;
set hoodie.datasource.read.extract.partition.values.from.path=true;