hudi: [SUPPORT] flink hudi-0.13.0 append+clustering mode, clustering will occur The requested schema is not compatible with the file schema. incompatible types: required binary key (STRING) != optional binary key (STRING)

To Reproduce

I am using flink 1.13.6 and hudi 0.13.0, cow + append + clustering mode. When aysnc clustering job scheduled, will throw exception:

2023-05-11 11:06:35,604 ERROR org.apache.hudi.sink.clustering.ClusteringOperator           [] - Executor executes action [Execute clustering for instant 20230511110411858 from task 1] error
org.apache.hudi.exception.HoodieException: unable to read next record from parquet file 
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:1.8.0_332]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295) ~[?:1.8.0_332]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207) ~[?:1.8.0_332]
	at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162) ~[?:1.8.0_332]
	at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301) ~[?:1.8.0_332]
	at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:1.8.0_332]
	at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:261) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:194) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file oss://xxxxxxx/hudi/datalog/today/db/table/day=2023-05-11/type=aa/7b1a5921-1d37-435e-b74a-0c0a5356b7bc-20_5-8-0_20230511105641808.parquet
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: required binary key (STRING) != optional binary key (STRING)
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:69) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.schema.GroupType.accept(GroupType.java:256) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:69) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.schema.GroupType.accept(GroupType.java:256) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
	... 15 more

but the schema of 7b1a5921-1d37-435e-b74a-0c0a5356b7bc-20_5-8-0_20230511105641808.parquet is:

Schema:
message flink_schema {
  optional binary _hoodie_commit_time (STRING);
  optional binary _hoodie_commit_seqno (STRING);
  optional binary _hoodie_record_key (STRING);
  optional binary _hoodie_partition_path (STRING);
  optional binary _hoodie_file_name (STRING);
  optional binary uniqueKey (STRING);
  optional binary key (STRING);
  optional int64 offset;
  optional int64 time;
  optional int32 sid;
  optional binary plat (STRING);
  optional binary pid (STRING);
  optional binary gid (STRING);
  optional binary account (STRING);
  optional binary playerid (STRING);
  optional int64 kafka_ts;
  optional int64 consume_ts;
  optional group prop (MAP) {
    repeated group key_value {
      optional binary key (STRING);
      optional binary value (STRING);
    }
  }
  optional binary day (STRING);
  optional binary type (STRING);
}

Environment Description

  • Flink version: 1.13.6

  • Hudi version : 0.13.0

  • Storage (HDFS/S3/GCS…) :OSS

  • Running on Docker? (yes/no) : no

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 24 (24 by maintainers)

Most upvoted comments

Nice findingd, could you fire a fix for it, seems it is a Avro schema conversion error.

of course, I found a temporary solution, but we need to dig deeper to find the root reason.