iceberg: Flink S3Fileio incorrect commit
Flink version : 1.13.2 Iceberg version : 0.12 We were trying S3Fileio instead of default hadoop fileio, to ingest data into iceberg on S3.
While commit/saving checkpoint, we see below call stack while uploading data file. So looks datafile (for example : s3/bucket/…/iceberg_db.db/iceberg_table/data/file.parquet) upload fails.
exception: { [-]
exception_class: java.util.concurrent.CompletionException
exception_message: java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
stacktrace: java.util.concurrent.CompletionException: java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.io.UncheckedIOException: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
at software.amazon.awssdk.utils.FunctionalUtils.asRuntimeException(FunctionalUtils.java:180)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:110)
at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)
at software.amazon.awssdk.core.sync.RequestBody.fromFile(RequestBody.java:88)
at software.amazon.awssdk.core.sync.RequestBody.fromFile(RequestBody.java:99)
at org.apache.iceberg.aws.s3.S3OutputStream.lambda$uploadParts$1(S3OutputStream.java:237)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
Caused by: java.nio.file.NoSuchFileException: /tmp/s3fileio-224113294985193160.tmp
at java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
at java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
at java.base/sun.nio.fs.UnixFileAttributeViews$Basic.readAttributes(UnixFileAttributeViews.java:55)
at java.base/sun.nio.fs.UnixFileSystemProvider.readAttributes(UnixFileSystemProvider.java:149)
at java.base/sun.nio.fs.LinuxFileSystemProvider.readAttributes(LinuxFileSystemProvider.java:99)
at java.base/java.nio.file.Files.readAttributes(Files.java:1764)
at java.base/java.nio.file.Files.size(Files.java:2381)
at software.amazon.awssdk.core.sync.RequestBody.lambda$fromFile$0(RequestBody.java:88)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)
... 8 more
But on the iceberg manifest file, that data file (s3/bucket/…/iceberg_db.db/iceberg_table/data/file.parquet) is still referenced, as result, iceberg thinks that data file is valid and exists.
Now running another query, which needs to use the above data file, throws exception.
Has anybody seen this scenario? We are trying to re-produce the scenario for further investigation.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 2
- Comments: 16 (15 by maintainers)
So essentially what we observed is that the Flink S3 FileIO failed to upload the data files due to file not exist, but the missing data file is incorrectly tracked in iceberg manifest. So the subsequent query against the partition will fail as the claimed data file cannot be found.
Our iceberg was setup on AWS S3 with versioned bucket and we can confirm that the
This result in some nasty behavior where we need to reconcile the manifest state based on what exists in underlying file system, which is not possible. So we had to drop the partition as temporary work around. But I think it would be helpful to understand
why do we run into the issue where iceberg commits before s3 confirm the data file is uploaded?
In the meantime, we are trying to see if we can have a simple repro of the issue
CC @szehon-ho
The fix for this was released in 0.14.1
Yes. That’s why I’m surprised that you ended up with corrupt metadata. The complete failure should have caused a task failure that triggered a retry, rather than producing data. My guess is that there is an issue with error handling in the Flink writer that we should look into.
Was the table partitioned or not? That will tell us the internal writer that was used.
Since the multipart upload failed well before the snapshot was committed, my guess is that it happened when rolling a data file in the
RollingFileWriter. I don’t see a way that would get swallowed, but it is a good starting point to check into more. Do you have any stack trace from the logs where the failure happened?