hudi: [SUPPORT] split_reader don't checkpoint before consuming all splits

split_reader don’t checkpoint before consuming all splits

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

When using flink to incrementally query a mor table with many splits(read.start-commit=earliest),The first checkpoint of the reader is successful after all splits are consumed. It takes a lot of time to cause checkpoint timeout. In my understanding, the process of reading data based on mini batch can trigger and complete the checkpoint.

image

To Reproduce

Steps to reproduce the behavior:

  1. mor table with many splits
  2. read.start-commit=ealiest
  3. read.tasks=1
  4. checkpoint interval=10s and checkpoint timeout=10min

Environment Description

  • Hudi version : 0.11.1

  • flink version : 1.14.5

Stacktrace

Caused by: org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable failure threshold.

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 21 (21 by maintainers)

Most upvoted comments

The first checkpoint of the reader is successful after all splits are consumed. It takes a lot of time to cause checkpoint timeout.

So which checkpoint is timed out? The StreamReadOperator would cache all the input splits when they are not consumed, if the reader starts reading from the earliest, the checkpoint data set can be large, how much is the ckp data size for the successfull ckp of the StreamReadOperator ?

The first checkpoint is timed out , because of StreamReadOperator don’t trigger until the all cache splits is consumed . The number of splits is about 1200 , so the ckp data size should be small.

Now I control the upper limit of the number of data read per checkpoint interval, then it looks gook.

private void enqueueProcessSplits() {
    if (maxConsumeRecordsPerCkp > 0 && consumedRecordsBetweenCkp > maxConsumeRecordsPerCkp)
            return;  //reach max consume records in this checkpoint interval
}

private void consumeAsMiniBatch(MergeOnReadInputSplit split) throws IOException {
         consumedRecordsBetweenCkp += 1L;
         split.consume();
}

public void snapshotState(StateSnapshotContext context) throws Exception {
        consumedRecordsBetweenCkp = 0L;  // reset when a new checkpoint coming.
}