hudi: [SUPPORT] flink hudi-0.13.0 append+clustering mode, clustering will occur The requested schema is not compatible with the file schema. incompatible types: required binary key (STRING) != optional binary key (STRING)
To Reproduce
I am using flink 1.13.6 and hudi 0.13.0, cow + append + clustering mode. When aysnc clustering job scheduled, will throw exception:
2023-05-11 11:06:35,604 ERROR org.apache.hudi.sink.clustering.ClusteringOperator [] - Executor executes action [Execute clustering for instant 20230511110411858 from task 1] error
org.apache.hudi.exception.HoodieException: unable to read next record from parquet file
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:53) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.common.util.MappingIterator.hasNext(MappingIterator.java:35) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at java.util.Spliterators$IteratorSpliterator.tryAdvance(Spliterators.java:1811) ~[?:1.8.0_332]
at java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:295) ~[?:1.8.0_332]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:207) ~[?:1.8.0_332]
at java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:162) ~[?:1.8.0_332]
at java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:301) ~[?:1.8.0_332]
at java.util.Spliterators$1Adapter.hasNext(Spliterators.java:681) ~[?:1.8.0_332]
at org.apache.hudi.client.utils.ConcatenatingIterator.hasNext(ConcatenatingIterator.java:45) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.sink.clustering.ClusteringOperator.doClustering(ClusteringOperator.java:261) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.sink.clustering.ClusteringOperator.lambda$processElement$0(ClusteringOperator.java:194) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.sink.utils.NonThrownExecutor.lambda$wrapAction$0(NonThrownExecutor.java:130) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file oss://xxxxxxx/hudi/datalog/today/db/table/day=2023-05-11/type=aa/7b1a5921-1d37-435e-b74a-0c0a5356b7bc-20_5-8-0_20230511105641808.parquet
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:254) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
... 15 more
Caused by: org.apache.parquet.io.ParquetDecodingException: The requested schema is not compatible with the file schema. incompatible types: required binary key (STRING) != optional binary key (STRING)
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.incompatibleSchema(ColumnIOFactory.java:101) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:81) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:69) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.schema.GroupType.accept(GroupType.java:256) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:69) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.schema.GroupType.accept(GroupType.java:256) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visitChildren(ColumnIOFactory.java:83) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory$ColumnIOCreatorVisitor.visit(ColumnIOFactory.java:57) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.schema.MessageType.accept(MessageType.java:55) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.io.ColumnIOFactory.getColumnIO(ColumnIOFactory.java:162) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:225) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:132) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:136) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
at org.apache.hudi.common.util.ParquetReaderIterator.hasNext(ParquetReaderIterator.java:48) ~[hudi-flink1.13-bundle-0.13.0.jar:0.13.0]
... 15 more
but the schema of 7b1a5921-1d37-435e-b74a-0c0a5356b7bc-20_5-8-0_20230511105641808.parquet is:
Schema:
message flink_schema {
optional binary _hoodie_commit_time (STRING);
optional binary _hoodie_commit_seqno (STRING);
optional binary _hoodie_record_key (STRING);
optional binary _hoodie_partition_path (STRING);
optional binary _hoodie_file_name (STRING);
optional binary uniqueKey (STRING);
optional binary key (STRING);
optional int64 offset;
optional int64 time;
optional int32 sid;
optional binary plat (STRING);
optional binary pid (STRING);
optional binary gid (STRING);
optional binary account (STRING);
optional binary playerid (STRING);
optional int64 kafka_ts;
optional int64 consume_ts;
optional group prop (MAP) {
repeated group key_value {
optional binary key (STRING);
optional binary value (STRING);
}
}
optional binary day (STRING);
optional binary type (STRING);
}
Environment Description
-
Flink version: 1.13.6
-
Hudi version : 0.13.0
-
Storage (HDFS/S3/GCS…) :OSS
-
Running on Docker? (yes/no) : no
About this issue
- Original URL
- State: closed
- Created a year ago
- Comments: 24 (24 by maintainers)
of course, I found a temporary solution, but we need to dig deeper to find the root reason.