hudi: [SUPPORT]I use flink to bulk insert a mor table with bucket index. But it seems that you can not change the write.tasks when you stop insert and continue upsert

Tips before filing an issue

  • Have you gone through our FAQs? It seems that the pagee is 404.

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced [SUPPORT]I use flink to bulk insert a mor table with bucket index. But it seems that you can not change the write.tasks when you stop insert and continue upsert. It will cause data loss.

At first, I want to bulk insert hudi to load all data fastly with “write.tasks=256”. Then I stop the bulk insert and continue to upsert with “write.tasks=256” to catch up with the msg from kafka. After I catch up with the delay msg, I want to reduce the consumer cluster to change write.tasks to 50. It turns out that this will cause data loss. Is this as expected? Am I doing wrong?

To Reproduce

Steps to reproduce the behavior:

Expected behavior

Make a mor table with bucket index by bulk insert and the table can be offline compacted.

Environment Description

  • Hudi version :

  • Spark version :

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS…) :

  • Running on Docker? (yes/no) :

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 22 (7 by maintainers)

Most upvoted comments

You do not declare the index type as bucket while doing the bulk_insert.

So do you mean I should change my bulk insert conf like below

CREATE TABLE xxxx2hudi_sink(
            uid STRING PRIMARY KEY NOT ENFORCED,
            oridata STRING,
            update_time TIMESTAMP_LTZ(3)
        ) WITH (
            'table.type' = 'MERGE_ON_READ',
            'connector' = 'hudi',
            'path' = '%s',
            'write.operation' = 'bulk_insert',
            'precombine.field' = 'update_time',
            'write.tasks' = '256',
            'index.type' = 'BUCKET',
            'hoodie.bucket.index.hash.field' = 'uid',
            'hoodie.bucket.index.num.buckets' = '256'
        )