hudi: [SUPPORT] java.io.EOFException: Invalid position - exceeds the bounds of the stream
Describe the problem you faced
Upgrading to 0.11.0 has caused consistent EOFExceptions on compaction for our Flink job. We have not seen this exception in 0.10.* versions.
To Reproduce
Steps to reproduce the behavior:
- Use Flink to write and compact data in S3 ?
Expected behavior
Compaction succeeds without an EOFException.
Environment Description
-
Hudi version : 0.11.0
-
Flink version : 1.14.2
-
Hive version : 2.3.9
-
Hadoop version : 2.10.1
-
Storage (HDFS/S3/GCS…) : S3
-
Running on Docker? (yes/no) : no
Additional context
Our config/setup looks something like this:
val path = "$basePath/$databaseName/$hudiTableName"
conf.setString(FlinkOptions.PATH, path)
conf.setString(FlinkOptions.TABLE_NAME, hudiTableName)
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ")
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema)
conf.setInteger(FlinkOptions.WRITE_TASKS, 4)
conf.setInteger(FlinkOptions.COMPACTION_TASKS, 4)
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "primaryKey")
conf.setString(FlinkOptions.PRECOMBINE_FIELD, "primaryKey") // TODO: pick something more intelligibly
conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "") // TODO: determine partition strategy
conf.setString(FlinkOptions.KEYGEN_TYPE, KeyGeneratorType.NON_PARTITION.name)
conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, checkpointsPerCompaction)
....
dataStream.let { Pipelines.bootstrap(conf, rowType, 1, it) }
.let { Pipelines.hoodieStreamWrite(conf, 1, it) }
.let { Pipelines.compact(conf, it) }
Stacktrace
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6 for operator stream_write (3/4)#0. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
... 22 more
Caused by: org.apache.hudi.exception.HoodieIOException: Could not load Hoodie properties from s3://bucket/database/table/.hoodie/hoodie.properties
at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:254)
at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:125)
at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:78)
at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:668)
at org.apache.hudi.client.BaseHoodieClient.createMetaClient(BaseHoodieClient.java:138)
at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1448)
at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1496)
at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:140)
at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:184)
at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)
at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:454)
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:131)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:157)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
... 33 more
Caused by: java.io.IOException: Unexpected end of stream pos=5200, contentLength=5175
at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
at [java.io](http://java.io/).BufferedInputStream.fill(BufferedInputStream.java:246)
at [java.io](http://java.io/).BufferedInputStream.read1(BufferedInputStream.java:286)
at [java.io](http://java.io/).BufferedInputStream.read(BufferedInputStream.java:345)
at [java.io](http://java.io/).DataInputStream.read(DataInputStream.java:149)
at [java.io](http://java.io/).DataInputStream.read(DataInputStream.java:100)
at java.util.Properties$LineReader.readLine(Properties.java:508)
at java.util.Properties.load0(Properties.java:353)
at java.util.Properties.load(Properties.java:341)
at org.apache.hudi.common.table.HoodieTableConfig.fetchConfigs(HoodieTableConfig.java:303)
at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:244)
... 50 more
Caused by: java.io.EOFException: Invalid position: 5200, exceeds the bounds of the stream: [0, 5175]
at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.throwPositionOutOfBoundsException(S3FSInputStream.java:411)
at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:385)
at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
... 60 more
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 23 (22 by maintainers)
Have fired a fix for flink here: https://github.com/apache/hudi/pull/5660
https://issues.apache.org/jira/browse/HUDI-3782 and https://issues.apache.org/jira/browse/HUDI-4138 may cause this bug.
The
HoodieTable#getMetadataWriteris used by many async table service such as cleaning, compaction, clustering and so on, this method now would try to modify the table config each time it is called no matter whether metadata table is enabled/disabled.In general, we should never make any side effect in the read code path of hoodie table config. And hoodie table metadata writer.
I’m not sure how to fix this on Spark side, have two ways to fix on my mind:
I can give some details/background here,
the
CompactionPlanOperatortries to schedule new compaction plan on each checkpoint complete event, in release 0.11, we tweak the strategy to not schedule new in the last compaction does not finish yet. While before 0.11, the new compaction is scheduled no matter whether the last compaction is on-going.We also add a timed-out rollback strategy in release 0.11, the default timeout interval is 20 minutes. If a compaction does not finish within 20 minutes, we also try to rollback it, see: https://github.com/apache/hudi/blob/4258a715174e0a97271112148ab20ef9307e2bd8/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java#L91