hudi: [SUPPORT] Support for Schema evolution. Facing an error

Hi Hudi team,

In https://cwiki.apache.org/confluence/display/HUDI/FAQ#FAQ-What’sHudi’sschemaevolutionstory you describe, that “As long as the schema passed to Hudi […] is backwards compatible […] Hudi will seamlessly handle read/write of old and new data and also keep the Hive schema up-to date.” We need and try to use this mechanism but are failing. Steps to reproduce:

  1. We have some old schema and events.
  2. We update the schema with a new, optional union field and restart the DeltaStreamer
  3. We ingest new events
  4. We ingest old events again (there are some upserts). In production we would have both event versions until all producers have changed to the new version. At this step the Deltastream crashes with
210545 [pool-28-thread-1] ERROR org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  - Shutting down delta-sync due to exception
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 22.0 failed 4 times, most recent failure: Lost task 0.3 in stage 22.0 (TID 1162, 100.65.150.166, executor 1): org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:253)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.lambda$execute$caffe4c4$1(BaseCommitActionExecutor.java:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1(JavaRDDLike.scala:102)
        at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitionsWithIndex$1$adapted(JavaRDDLike.scala:102)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:889)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:889)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
        at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1388)
        at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1298)
        at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1362)
        at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1186)
        at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:127)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:444)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to close UpdateHandle
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:302)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdateInternal(CommitActionExecutor.java:101)
        at org.apache.hudi.table.action.commit.CommitActionExecutor.handleUpdate(CommitActionExecutor.java:74)
        at org.apache.hudi.table.action.deltacommit.DeltaCommitActionExecutor.handleUpdate(DeltaCommitActionExecutor.java:73)
        at org.apache.hudi.table.action.commit.BaseCommitActionExecutor.handleUpsertPartition(BaseCommitActionExecutor.java:246)
        ... 28 more
Caused by: java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
        at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:128)
        at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:423)
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
        at org.apache.hudi.avro.HoodieAvroUtils.bytesToAvro(HoodieAvroUtils.java:108)
        at org.apache.hudi.common.model.OverwriteWithLatestAvroPayload.getInsertValue(OverwriteWithLatestAvroPayload.java:80)
        at org.apache.hudi.io.HoodieMergeHandle.close(HoodieMergeHandle.java:271)
        ... 32 more

I’ve created a test to reproduce this behaviour here: https://github.com/apache/hudi/pull/1844. I hope the test is helpful. The test cause the following exception, but i think they are quite related.

Job aborted due to stage failure: Task 0 in stage 16.0 failed 1 times, most recent failure: Lost task 0.0 in stage 16.0 (TID 23, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 19
        at org.apache.avro.generic.GenericData$Record.get(GenericData.java:212)
        at org.apache.hudi.AvroConversionHelper$.$anonfun$createConverterToRow$9(AvroConversionHelper.scala:170)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createDataFrame$2(AvroConversionUtils.scala:66)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$2.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at scala.collection.Iterator.isEmpty(Iterator.scala:385)
        at scala.collection.Iterator.isEmpty$(Iterator.scala:385)
        at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1429)
        at org.apache.hudi.AvroConversionUtils$.$anonfun$createRdd$2(AvroConversionUtils.scala:44)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:801)
        at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:801)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:123)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer.testSchemaEvolution(TestHoodieDeltaStreamer.java:491)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 19

I think the mailing list entry from avro could help a lot: http://apache-avro.679487.n3.nabble.com/AVRO-schema-evolution-adding-optional-column-with-default-fails-deserialization-td4043025.html The corresponding place in code is here: https://github.com/apache/hudi/blob/bf1d36fa639cae558aa388d8d547e58ad2e67aba/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java#L107

Thanks a lot in advance!

Cheers, Sebastian

Expected behavior As stated in FAQ schema evolution should not crash the DeltaStreamer

Environment Description

  • Hudi version : current master branch

  • Spark version : 3.0.0

  • Hive version : 3.1.2

  • Hadoop version : 3.2.1

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : yes, running on kubernetes

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 23 (23 by maintainers)

Most upvoted comments

@sbernauer I briefly looked at the test-case you added and I see what you are trying to reproduce. The issue seems to be as follows :

  1. Generate data with schema A, pass schema A <works fine>
  2. Generate data with schema B, pass schema B <works fine>
  3. Generate data with schema A, pass schema B <fails in AvroConversionHelper> Where schema A is smaller schema, schema B is a larger schema with 1 added field. @bvaradar Have we tested such a scenario before with the AvroConversionHelper class where we convert the RDD<GenericRecord> to DF<Row> with schema A, but pass schema B as the writer schema ?