hudi: [SUPPORT] Flink Exceeded checkpoint tolerable failure threshold.
Describe the problem you faced
A clear and concise description of the problem. “I use Flink cdc to read MySQL data, and then write it to S3 through hudi. I often encounter checkpoint org.apache.Flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.” “The common problem is that a checkpoint failure occurs every 20 minutes. I have no problems running on a local machine, but when I go to an EKS cluster, this problem occurs.”
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
2023-03-23 10:23:07,101 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [handle write metadata event for instant 20230323101927464] success!
2023-03-23 10:23:07,817 INFO org.apache.flink.fs.s3.common.writer.S3Committer [] - Committing reject/savepoint-fbea13-af1aa20f0400/_metadata with MPU ID rqLctlP9RnUBjWedNLI1bXhNB32evfVDwi7T1nNz8Gd9gDzFtYDRm615A5MCQivzMav.9yAJeD_Tp36Yp52oHitnmMZ6BqYmJV9G.JcDXEco.czZfXHRzGisnIuVzO._qSlcCQaQpSW.Qx.X0ex3LQ--
2023-03-23 10:23:08,065 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed checkpoint 11 for job fbea139434199b0095f544fe5c15d25f (747958 bytes, checkpointDuration=6668 ms, finalizationTime=0 ms).
2023-03-23 10:24:06,082 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering checkpoint 12 (type=CheckpointType{name='Checkpoint', sharingFilesStrategy=FORWARD_BACKWARD}) @ 1679567046082 for job fbea139434199b0095f544fe5c15d25f.
2023-03-23 10:24:06,765 INFO org.apache.hudi.sink.StreamWriteOperatorCoordinator [] - Executor executes action [taking checkpoint 12] success!
2023-03-23 10:34:06,083 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Checkpoint 12 of job fbea139434199b0095f544fe5c15d25f expired before completing.
2023-03-23 10:34:06,083 WARN org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to trigger or complete checkpoint 12 for job fbea139434199b0095f544fe5c15d25f. (0 consecutive failed attempts so far)
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing.
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) [flink-dist-1.15.3.jar:1.15.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) [?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) [?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) [?:?]
at java.lang.Thread.run(Unknown Source) [?:?]
2023-03-23 10:34:06,084 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a global failure.
org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:206) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2082) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2061) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$600(CheckpointCoordinator.java:98) ~[flink-dist-1.15.3.jar:1.15.3]
at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2143) ~[flink-dist-1.15.3.jar:1.15.3]
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) ~[?:?]
at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
at java.lang.Thread.run(Unknown Source) ~[?:?]

Environment Description
-
Hudi version : 0.13.0
-
Flink version:1.15.3
-
Hadoop version : 3.2.1
-
Storage (HDFS/S3/GCS…) : S3
-
Running on Docker? (yes/no) :yes ,EKS
Additional context
```
connector = hudi,
table.type = MERGE_ON_READ,
hoodie.clean.async=true,
hoodie.compact.inline= true,
hoodie.compact.inline.max.delta.commits=2,
hoodie.clean.max.commits=2,
hoodie.cleaner.commits.retained = 3,
hoodie.cleaner.policy = KEEP_LATEST_COMMITS,
hoodie.parquet.small.file.limit=104857600,
clustering.schedule.enabled=true,
clustering.async.enabled=true,
hoodie.clustering.inline= true,
hoodie.clustering.inline.max.commits= 2,
hoodie.clustering.plan.strategy.max.bytes.per.group= 107374182400,
hoodie.clustering.plan.strategy.max.num.groups= 1,
hoodie.datasource.write.recordkey.field = id,installs,rejects,
path = s3a://xxxxxxxx/xxxxxxx
Add any other context about the problem here.
**Stacktrace**
```Add the stacktrace of the error.```
About this issue
- Original URL
- State: open
- Created a year ago
- Reactions: 1
- Comments: 25 (11 by maintainers)
Commits related to this issue
- Fix Flink cannot create savepoint with hudi sink https://github.com/apache/hudi/issues/8276 — committed to viethung2281996/hudi by viethung2281996 9 months ago
I am facing the same problem, tunning up resources could resolve the issue, but I am still curious about the reason behind it.
I have a pipeline running for days ATM, and the savepoint was triggered by the k8s operator every 6 hours. 4c8g x2 task managers in use with a total of 8 slots. Every checkpoint would take around 10min, processing 100k records and affecting 20+ partitions. And every checkpoint after savepoint would result in failure:
Checkpoint expired before completing(60 min).Could someone elaborate on what exactly is different about the process after Savepoint?
I did a test, and there was a 10 minute interval between savepoint and checkpoint. After triggering savepoint, the checkpoint still failed because the resources were sufficient at that time