hudi: [SUPPORT]Fail to execute offline flink compactor in service mode.

Tips before filing an issue

  • Have you gone through our FAQs? Yes

Describe the problem you faced

I’m using HoodieFlinkCompactor to do offline compaction job. And it failed to using service mode.

The failure is Cannot have more than one execute() or executeAsync() call in a single environment.

Runs fine in single round with service mode disabled(run once and quit).

To Reproduce

Steps to reproduce the behavior:

  1. start a standalone flink compactor job
  2. enabling service mode
  3. the job fails when “the parallism” jobs done(the next loop)
  4. the job restart

Expected behavior

the second loop(which more than the first “parallism” jobs done) success when using service mode.

Environment Description

  • Hudi version :

0.12.1

  • Spark version :

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS…) :

  • Running on Docker? (yes/no) :

Additional context

job config

--path oss://dengine-lake-zjk/cloudcode_prod/dwd_egc_adv_req_outra
--compaction-max-memory 2048
--seq LIFO
--compaction-tasks 16
--plan-select-strategy all
--min-compaction-interval-seconds 30
--service

Stacktrace

2022-12-23 23:14:05,976 [pool-17-thread-1] INFO  org.apache.flink.api.java.typeutils.TypeExtractor            [] - Class class org.apache.hudi.common.model.CompactionOperation cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
2022-12-23 23:14:05,983 [pool-17-thread-1] WARN  org.apache.flink.resourceplan.applyagent.StreamGraphModifier [] - Path of resource plan is not specified, do nothing.
2022-12-23 23:14:05,983 [pool-17-thread-1] ERROR org.apache.hudi.client.RunsTableService                      [] - Shutting down compaction service due to exception
org.apache.flink.util.FlinkRuntimeException: Cannot have more than one execute() or executeAsync() call in a single environment.
	at org.apache.flink.client.program.StreamContextEnvironment.validateAllowedExecution(StreamContextEnvironment.java:199) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
	at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:187) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
	at org.apache.flink.client.program.StreamContextEnvironment.execute(StreamContextEnvironment.java:119) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1985) ~[flink-dist-1.15-vvr-6.0.2-3-SNAPSHOT.jar:1.15-vvr-6.0.2-3-SNAPSHOT]
	at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.compact(HoodieFlinkCompactor.java:322) ~[flink-hudi-bundle-1.3-SNAPSHOT-jar-with-dependencies-20221217104900.jar:?]
	at org.apache.hudi.sink.compact.HoodieFlinkCompactor$AsyncCompactionService.lambda$startService$0(HoodieFlinkCompactor.java:204) ~[flink-hudi-bundle-1.3-SNAPSHOT-jar-with-dependencies-20221217104900.jar:?]
	at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) [?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147) [?:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622) [?:1.8.0_102]
	at java.lang.Thread.run(Thread.java:834) [?:1.8.0_102]

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 20 (10 by maintainers)

Most upvoted comments

@danny0405 Thanks for the fix, I will give it a try. Yeah I’m using aliyun vvp/vvr, but the bundle is compiled by myself and manually provided as individual jar.So it will not depend on the enviroment.