iceberg: Flink S3Fileio incorrect commit

Flink version : 1.13.2 Iceberg version : 0.12 We were trying S3Fileio instead of default hadoop fileio, to ingest data into iceberg on S3.

While commit/saving checkpoint, we see below call stack while uploading data file. So looks datafile (for example : s3/bucket/…/iceberg_db.db/iceberg_table/data/file.parquet) upload fails.

exception: { [-] 
      exception_class:  java.util.concurrent.CompletionException 
      exception_message:  java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp 
      stacktrace:  java.util.concurrent.CompletionException: java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
	at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
	at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
	at software.amazon.awssdk.utils.FunctionalUtils.asRuntimeException(FunctionalUtils.java:180)
	at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:110)
	at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)
	at software.amazon.awssdk.core.sync.RequestBody.fromFile(RequestBody.java:88)
	at software.amazon.awssdk.core.sync.RequestBody.fromFile(RequestBody.java:99)
	at org.apache.iceberg.aws.s3.S3OutputStream.lambda$uploadParts$1(S3OutputStream.java:237)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
	... 3 more
Caused by: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
	at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
	at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
	at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
	at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
	at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
	at java.base/java.nio.file.Files.readAttributes(Files.java:1764)
	at java.base/java.nio.file.Files.size(Files.java:2381)
	at software.amazon.awssdk.core.sync.RequestBody.lambda$fromFile$0(RequestBody.java:88)
	at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)
	... 8 more

But on the iceberg manifest file, that data file (s3/bucket/…/iceberg_db.db/iceberg_table/data/file.parquet) is still referenced, as result, iceberg thinks that data file is valid and exists.

Now running another query, which needs to use the above data file, throws exception.

Has anybody seen this scenario? We are trying to re-produce the scenario for further investigation.

Code link : https://github.com/apache/iceberg/blob/master/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java#L291

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 2
  • Comments: 16 (15 by maintainers)

Most upvoted comments

So essentially what we observed is that the Flink S3 FileIO failed to upload the data files due to file not exist, but the missing data file is incorrectly tracked in iceberg manifest. So the subsequent query against the partition will fail as the claimed data file cannot be found.

Our iceberg was setup on AWS S3 with versioned bucket and we can confirm that the

  • data file never get uploaded, no version exists of given path
  • the data file is being tracked in iceberg and we do see such non-exist data file in iceberg metadata query

This result in some nasty behavior where we need to reconcile the manifest state based on what exists in underlying file system, which is not possible. So we had to drop the partition as temporary work around. But I think it would be helpful to understand

why do we run into the issue where iceberg commits before s3 confirm the data file is uploaded?

In the meantime, we are trying to see if we can have a simple repro of the issue

CC @szehon-ho

The fix for this was released in 0.14.1

Is it safe to assume that iceberg is ready to commit if all writers can write its data and metadata files to fileIO and writers are close without exception?

Yes. That’s why I’m surprised that you ended up with corrupt metadata. The complete failure should have caused a task failure that triggered a retry, rather than producing data. My guess is that there is an issue with error handling in the Flink writer that we should look into.

We are curious about call path on how S3OutputStream is wired to IcebergStreamWriter?

Was the table partitioned or not? That will tell us the internal writer that was used.

Since the multipart upload failed well before the snapshot was committed, my guess is that it happened when rolling a data file in the RollingFileWriter. I don’t see a way that would get swallowed, but it is a good starting point to check into more. Do you have any stack trace from the logs where the failure happened?