prefect: Mapped tasks trigger multiple times on GKE Autopilot
Hopefully I’m not conflating multiple things here, but I’m in quite a mess when I try to run one of my flows in a GKE Autopilot cluster.
Description
Mapped tasks appear to (sometimes) trigger multiple times on GKE Autopilot. This puts flow runs into unstable states – sometimes completing successfully, sometimes completing but reporting a failure, and sometimes the flow run itself does not complete due to failure.
In production, my tasks may run for multiple hours and I’ve observed 3-4 version of a child task kick off throughout the lifecycle of the original instance (including after the original instance has succeeded 🤯 ) despite no indication that previous attempts have failed. I’ve attached a log from a child task for the minimal example below. A few observations:
- In this instance, all
slow_run
children triggered more than once. In some cases this only appears to affect certain children, and occasionally I don’t observe the issue (this is rare). This makes me think we’re dealing with a race condition of some sort, potentially related to autoscaling in GKE. - After being started three times, the first instance of
slow_run[0]
was actually killed by ZombieKiller and marked as a failed run for a few minutes. It then was changed toSuccess
after the final instance ofslow_run[0]
completed (this one ran for 12 minutes, as shown in logs). - Child tasks 0 and 2 of
report_results
both print out “Final value is None” which suggests the result ofslow_run
isn’t passed down successfully.
Expected Behavior
Mapped tasks should run through the expected state transitions. Child tasks shouldn’t re-trigger unless something happens to previous invocations, e.g. failures requiring retries.
I would expect the fact that I’m on GKE Autopilot (which includes autoscaling) to not affect the state transitions for child tasks.
Reproduction
from prefect import Flow
def share_state_transition(task, old_state, new_state):
logger = prefect.context.get('logger')
logger.info("\nState transition for {0}: {1} -> {2} (retrying={3})\n".format(
task,
old_state,
new_state,
new_state.is_retrying(),
))
return new_state
@prefect.task
def get_list():
return [1, 2, 3, 4, 5]
@prefect.task(state_handlers=[share_state_transition,],)
def slow_run(num):
logger = prefect.context.get('logger')
num_minutes = random.randint(4, 12)
logger.info(
'Running task num={} for {} minutes'.format(
num,
num_minutes,
)
)
time.sleep(num_minutes * 60)
return num * 2
@prefect.task(state_handlers=[share_state_transition,],)
def report_results(num):
logger = prefect.context.get('logger')
logger.info('Final value is {}'.format(num))
with Flow(
# See below; using KubernetesRun (on GKE Autopilot) and DaskExecutor
) as flow:
results = slow_run.map(
get_list(),
)
report_results.map(results)
Environment
I’m running in a GKE Autopilot cluster using KubernetesRun
and DaskExecutor
with adaptive scaling enabled. Excerpt of my flow config, happy to provide more but I don’t think there’s anything too controversial.
# ...within Flow()...
run_config=KubernetesRun(
image='gcr.io/<our-image>',
image_pull_policy='IfNotPresent',
),
executor=DaskExecutor(
cluster_class=lambda: KubeCluster(
make_pod_spec(
image=prefect.context.image,
cpu_request=2,
memory_request='8G',
),
),
adapt_kwargs={
'minimum': 1,
'maximum': 12,
},
),
Prefect diagnostics:
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "macOS-12.2.1-arm64-arm-64bit",
"prefect_backend": "cloud",
"prefect_version": "1.0.0",
"python_version": "3.9.7"
}
}
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 1
- Comments: 40 (37 by maintainers)
Hey everyone, I believe Prefect is doing the “right” thing here in the face of a noisy environment, but it’s doing a poor job of displaying what’s happening. It’s important to start by noting that with this setup Prefect doesn’t control Dask’s scheduler but rather layers on state handling features (retries, prevention of logic re-execution, etc.). With that, let me break down the different pieces:
Running
state within Prefect; this means that transitions such asPending
->Cached
do not represent a rerun of logic but rather a data load from a previousSuccess
state; similarly, a transition ofPending
->Success
does not represent a rerun (more on this later)Pending
->Running
transition is prevented and instead aPending
->Cached
transition occurs)Pending
->Running
two times within a single flow run; instead, if a version lock is encountered, you’ll see aPending
->Success
on the second attempted run. The Prefect Client notices that there is a version mismatch and loads the most recent state (which in the examples above areSuccess
states)There is a valid bug in the original report:
that was caused by a race condition in the Prefect Cloud Zombie Killer that should now be resolved.
There are many improvements to the state bookkeeping that would make this experience more navigable, and we believe that the changes to state handling in 2.0 will address these.
For those who might encounter the same problem in the future. I was able to get very stable runs using adaptive mode using the following environment variables
And using
adapt_kwargs
to prevent aggressive up/down scalingPlease do not ping the team, we’re all following this issue already and it will not lead to faster resolution. You are welcome to contribute a patch if this is pressing for you, otherwise we are investigating it as much as resources allow.
I haven’t had a chance to look into this further. Orion might have this issue, but we have stricter rules about state transitions which should prevent it.
Don’t worry about the noise; we won’t complain about having more information for sure. Share any updates you have
If scaling out with
KubeCluster
doesn’t work well on GKE Autopilot, I wonder whether it makes sense to try scaling up instead - you could try spinning up a flow run pod with more resources onKubernetesRun
and use aLocalDaskExecutor
within that single pod. It would vastly simplify the coordination between threads and processes and perhaps would solve the problem equally well? Definitely something worth trying