hudi: [SUPPORT] java.io.EOFException: Invalid position - exceeds the bounds of the stream

Describe the problem you faced

Upgrading to 0.11.0 has caused consistent EOFExceptions on compaction for our Flink job. We have not seen this exception in 0.10.* versions.

To Reproduce

Steps to reproduce the behavior:

  1. Use Flink to write and compact data in S3 ?

Expected behavior

Compaction succeeds without an EOFException.

Environment Description

  • Hudi version : 0.11.0

  • Flink version : 1.14.2

  • Hive version : 2.3.9

  • Hadoop version : 2.10.1

  • Storage (HDFS/S3/GCS…) : S3

  • Running on Docker? (yes/no) : no

Additional context

Our config/setup looks something like this:


val path = "$basePath/$databaseName/$hudiTableName"
conf.setString(FlinkOptions.PATH, path)
conf.setString(FlinkOptions.TABLE_NAME, hudiTableName)
conf.setString(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ")
conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA, schema)
conf.setInteger(FlinkOptions.WRITE_TASKS, 4)
conf.setInteger(FlinkOptions.COMPACTION_TASKS, 4)
conf.setString(FlinkOptions.RECORD_KEY_FIELD, "primaryKey")
conf.setString(FlinkOptions.PRECOMBINE_FIELD, "primaryKey") // TODO: pick something more intelligibly

conf.setString(FlinkOptions.PARTITION_PATH_FIELD, "") // TODO: determine partition strategy
conf.setString(FlinkOptions.KEYGEN_TYPE, KeyGeneratorType.NON_PARTITION.name)

conf.setInteger(FlinkOptions.COMPACTION_DELTA_COMMITS, checkpointsPerCompaction)

 ....

dataStream.let { Pipelines.bootstrap(conf, rowType, 1, it) }
                    .let { Pipelines.hoodieStreamWrite(conf, 1, it) }
                    .let { Pipelines.compact(conf, it) }

Stacktrace

Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 6 for operator stream_write (3/4)#0. Failure reason: Checkpoint was declined.
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
	at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
	at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
	at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
	... 22 more
Caused by: org.apache.hudi.exception.HoodieIOException: Could not load Hoodie properties from s3://bucket/database/table/.hoodie/hoodie.properties
	at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:254)
	at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:125)
	at org.apache.hudi.common.table.HoodieTableMetaClient.<init>(HoodieTableMetaClient.java:78)
	at org.apache.hudi.common.table.HoodieTableMetaClient$Builder.build(HoodieTableMetaClient.java:668)
	at org.apache.hudi.client.BaseHoodieClient.createMetaClient(BaseHoodieClient.java:138)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1448)
	at org.apache.hudi.client.BaseHoodieWriteClient.initTable(BaseHoodieWriteClient.java:1496)
	at org.apache.hudi.client.HoodieFlinkWriteClient.upsert(HoodieFlinkWriteClient.java:140)
	at org.apache.hudi.sink.StreamWriteFunction.lambda$initWriteFunction$1(StreamWriteFunction.java:184)
	at org.apache.hudi.sink.StreamWriteFunction.lambda$flushRemaining$7(StreamWriteFunction.java:461)
	at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608)
	at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:454)
	at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:131)
	at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:157)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
	at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
	at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
	at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
	... 33 more
Caused by: java.io.IOException: Unexpected end of stream pos=5200, contentLength=5175
	at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:297)
	at [java.io](http://java.io/).BufferedInputStream.fill(BufferedInputStream.java:246)
	at [java.io](http://java.io/).BufferedInputStream.read1(BufferedInputStream.java:286)
	at [java.io](http://java.io/).BufferedInputStream.read(BufferedInputStream.java:345)
	at [java.io](http://java.io/).DataInputStream.read(DataInputStream.java:149)
	at [java.io](http://java.io/).DataInputStream.read(DataInputStream.java:100)
	at java.util.Properties$LineReader.readLine(Properties.java:508)
	at java.util.Properties.load0(Properties.java:353)
	at java.util.Properties.load(Properties.java:341)
	at org.apache.hudi.common.table.HoodieTableConfig.fetchConfigs(HoodieTableConfig.java:303)
	at org.apache.hudi.common.table.HoodieTableConfig.<init>(HoodieTableConfig.java:244)
	... 50 more
Caused by: java.io.EOFException: Invalid position: 5200, exceeds the bounds of the stream: [0, 5175]
	at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.throwPositionOutOfBoundsException(S3FSInputStream.java:411)
	at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.retrieveInputStreamWithInfo(S3FSInputStream.java:385)
	at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.reopenStream(S3FSInputStream.java:378)
	at [com.amazon.ws](http://com.amazon.ws/).emr.hadoop.fs.s3.S3FSInputStream.read(S3FSInputStream.java:260)
	... 60 more

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 23 (22 by maintainers)

Most upvoted comments

Have fired a fix for flink here: https://github.com/apache/hudi/pull/5660

https://issues.apache.org/jira/browse/HUDI-3782 and https://issues.apache.org/jira/browse/HUDI-4138 may cause this bug.

The HoodieTable#getMetadataWriter is used by many async table service such as cleaning, compaction, clustering and so on, this method now would try to modify the table config each time it is called no matter whether metadata table is enabled/disabled.

In general, we should never make any side effect in the read code path of hoodie table config. And hoodie table metadata writer.

I’m not sure how to fix this on Spark side, have two ways to fix on my mind:

  1. make table config concurrency safe (not suggested because it is too heavy for a config)
  2. make sure the metadata cleaning only happens once for the whole Job lifetime (still risky because there may be multiple jobs, but with very small probability). I would suggest this way from my side.

I can give some details/background here,

the CompactionPlanOperator tries to schedule new compaction plan on each checkpoint complete event, in release 0.11, we tweak the strategy to not schedule new in the last compaction does not finish yet. While before 0.11, the new compaction is scheduled no matter whether the last compaction is on-going.

We also add a timed-out rollback strategy in release 0.11, the default timeout interval is 20 minutes. If a compaction does not finish within 20 minutes, we also try to rollback it, see: https://github.com/apache/hudi/blob/4258a715174e0a97271112148ab20ef9307e2bd8/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionPlanOperator.java#L91