beam: [Bug]: Python SDFs (e.g. PeriodicImpulse) running in Flink and polling using tracker.defer_remainder have checkpoint size growing indefinitely
What happened?
Please see https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405 for code to replicate this problem.
Problem description
I am finding that using unbounded SDFs with Flink results in checkpoint sizes that grow without bound, which eventually results in the job failing and all further job submissions to fail, even for beam.transforms.periodicsequence.PeriodicImpulse
in a very simple pipeline, given below.
My pipeline consists of an SDF that reads from an unbounded source, which means that when there are no new messages, the SDF must poll the unbounded source, with some timeout. I observed that when my SDF would do this polling behavior (using tracker.defer_remainder
as described in https://beam.apache.org/documentation/programming-guide/#user-initiated-checkpoint, the checkpoint size would grow.
This happens even if the unbounded source was empty, and hence my SDF simply executed a loop of polling the unbounded source and then calling tracker.defer_remainder
and returning from the DoFn
to relinquish control and wait to poll again.
I was concerned that I had implemented my SDF or my pipeline incorrectly, so I found beam.transforms.periodicsequence.PeriodicImpulse
and tested it in a very simple pipeline, which is as follows (note that apply_windowing
’s value does not change the problematic behavior):
with beam.Pipeline(options=runner_options) as pipeline:
pcoll = pipeline | PeriodicImpulse(
fire_interval=5, apply_windowing=True
)
pcoll | beam.Map(print)
This pipeline also results in growing checkpoint size.
The Flink cluster configuration and full source for the program to replicate the problem and the Docker compose to get the Flink cluster up and running are given below in the reproduction steps and in https://gist.github.com/nybbles/6e1f2ab31866b251ff754e22b71f8405.
In case it is helpful, I’ll list the FLINK_PROPERTIES
and PipelineOptions
below.
FLINK_PROPERTIES: &flink_properties |-
historyserver.archive.clean-expired-jobs: true
state.backend: hashmap
state.checkpoints.dir: file:///tmp/beam_state/
state.checkpoints.num-retained: 10
jobmanager.rpc.address: host.docker.internal
rest.address: host.docker.internal
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.process.size: 2048m
runner_options = PortableOptions(
artifact_endpoint=f"{JOB_SERVER_HOSTNAME}:8098",
environment_cache_millis=0,
environment_config="apache/beam_python3.11_sdk:2.48.0",
environment_options=None,
environment_type="DOCKER",
job_endpoint=f"{JOB_SERVER_HOSTNAME}:8099",
job_server_timeout=60,
runner = "PortableRunner",
output_executable_path=None,
sdk_worker_parallelism=0,
state_backend = "filesystem",
state_backend_storage_path = "file:///tmp/beam_state/",
streaming = True,
checkpointing_interval = STREAMING_CHECKPOINTING_INTERVAL,
parallelism = 1,
auto_watermark_interval = 500,
)
See this email thread for more context: https://lists.apache.org/thread/7yjr1f24rdzwzofdty1h12w9m28o62sm.
Note on priority
I followed the linked guide for setting issue priorities and set this one to priority 1 because it seems like unbounded SDFs is an important component, running on Flink is an important usecase, and having arbitrary checkpoint size growth makes unbounded SDFs on Flink non-functional. My apologies in advance if this is the wrong priority level.
Reproduction steps
- Run a Flink cluster (i.e. the Gist above provides my Docker Compose-based Flink cluster. It uses a taskmanager image that can run Docker containers, needed for using
environment_type="DOCKER"
). - Run the attached driver program in the Gist above to create the job and submit it to the Flink cluster.
- Observe in the logs that the checkpoints grow in size, specifically the step with the SDF, (and in proportion to the number of calls to
tracker.defer_remainder
), despite the SDF not actually explicitly accumulating any state. - The checkpoint size grows until the Java heap space is exhausted and the job is killed. Now when the job is resubmitted, it will always fail, because the job manager attempts to restore the job from the large checkpoint, resulting in Java heap space being exhausted again.
Issue Priority
Priority: 1 (data loss / total loss of function)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner
About this issue
- Original URL
- State: open
- Created a year ago
- Reactions: 1
- Comments: 17 (7 by maintainers)
I am testing Java portable runner and is making a little bit progress
in PortableRunner.run(), one is able to get GenerateSequence working for portable Flink runner. Same should happen for KafkaIO
@nybbles would you mind testing your kafka pipeline with option
--experiments=use_deprecated_read
? (PeriodicImpulse does not work because its a pure SDF; while KafkaIO still has an UnboundedRead implementation)I am not very familiar with, one thing could note is that the splittable DoFn support is missing for “portable” Flink runner job, which is the case of Python SDK jobs.
https://github.com/apache/beam/blob/af533fe976c05e708d9573f0b6cbeb31bf9dbb3a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPortablePipelineTranslator.java#L731
Maybe one start point is here. Will dig into.
fyi Java Portable Runner has the same issue (submit a Java PeriodicImpulse using PortableRunner via FlinkJobService). Python flink runner is just a wrapper to use the job service jar. So the fix should be in Java portable runner code side.
~EDIT: Java PeriodicImpulse using PortableRunner actually works. The test forgot to add
--streaming
flag.~added
--streaming
flag does not change. Still fail after some secondsThanks @nybbles the findings in the issue description is already very specific and clearer than what I know before in terms of the root cause. Happy to help from the beam side