prefect: Mapped tasks trigger multiple times on GKE Autopilot
Hopefully I’m not conflating multiple things here, but I’m in quite a mess when I try to run one of my flows in a GKE Autopilot cluster.
Description
Mapped tasks appear to (sometimes) trigger multiple times on GKE Autopilot. This puts flow runs into unstable states – sometimes completing successfully, sometimes completing but reporting a failure, and sometimes the flow run itself does not complete due to failure.
In production, my tasks may run for multiple hours and I’ve observed 3-4 version of a child task kick off throughout the lifecycle of the original instance (including after the original instance has succeeded 🤯 ) despite no indication that previous attempts have failed. I’ve attached a log from a child task for the minimal example below. A few observations:
- In this instance, all
slow_runchildren triggered more than once. In some cases this only appears to affect certain children, and occasionally I don’t observe the issue (this is rare). This makes me think we’re dealing with a race condition of some sort, potentially related to autoscaling in GKE. - After being started three times, the first instance of
slow_run[0]was actually killed by ZombieKiller and marked as a failed run for a few minutes. It then was changed toSuccessafter the final instance ofslow_run[0]completed (this one ran for 12 minutes, as shown in logs). - Child tasks 0 and 2 of
report_resultsboth print out “Final value is None” which suggests the result ofslow_runisn’t passed down successfully.
Expected Behavior
Mapped tasks should run through the expected state transitions. Child tasks shouldn’t re-trigger unless something happens to previous invocations, e.g. failures requiring retries.
I would expect the fact that I’m on GKE Autopilot (which includes autoscaling) to not affect the state transitions for child tasks.
Reproduction
from prefect import Flow
def share_state_transition(task, old_state, new_state):
logger = prefect.context.get('logger')
logger.info("\nState transition for {0}: {1} -> {2} (retrying={3})\n".format(
task,
old_state,
new_state,
new_state.is_retrying(),
))
return new_state
@prefect.task
def get_list():
return [1, 2, 3, 4, 5]
@prefect.task(state_handlers=[share_state_transition,],)
def slow_run(num):
logger = prefect.context.get('logger')
num_minutes = random.randint(4, 12)
logger.info(
'Running task num={} for {} minutes'.format(
num,
num_minutes,
)
)
time.sleep(num_minutes * 60)
return num * 2
@prefect.task(state_handlers=[share_state_transition,],)
def report_results(num):
logger = prefect.context.get('logger')
logger.info('Final value is {}'.format(num))
with Flow(
# See below; using KubernetesRun (on GKE Autopilot) and DaskExecutor
) as flow:
results = slow_run.map(
get_list(),
)
report_results.map(results)
Environment
I’m running in a GKE Autopilot cluster using KubernetesRun and DaskExecutor with adaptive scaling enabled. Excerpt of my flow config, happy to provide more but I don’t think there’s anything too controversial.
# ...within Flow()...
run_config=KubernetesRun(
image='gcr.io/<our-image>',
image_pull_policy='IfNotPresent',
),
executor=DaskExecutor(
cluster_class=lambda: KubeCluster(
make_pod_spec(
image=prefect.context.image,
cpu_request=2,
memory_request='8G',
),
),
adapt_kwargs={
'minimum': 1,
'maximum': 12,
},
),
Prefect diagnostics:
{
"config_overrides": {},
"env_vars": [],
"system_information": {
"platform": "macOS-12.2.1-arm64-arm-64bit",
"prefect_backend": "cloud",
"prefect_version": "1.0.0",
"python_version": "3.9.7"
}
}
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 1
- Comments: 40 (37 by maintainers)
Hey everyone, I believe Prefect is doing the “right” thing here in the face of a noisy environment, but it’s doing a poor job of displaying what’s happening. It’s important to start by noting that with this setup Prefect doesn’t control Dask’s scheduler but rather layers on state handling features (retries, prevention of logic re-execution, etc.). With that, let me break down the different pieces:
Runningstate within Prefect; this means that transitions such asPending->Cacheddo not represent a rerun of logic but rather a data load from a previousSuccessstate; similarly, a transition ofPending->Successdoes not represent a rerun (more on this later)Pending->Runningtransition is prevented and instead aPending->Cachedtransition occurs)Pending->Runningtwo times within a single flow run; instead, if a version lock is encountered, you’ll see aPending->Successon the second attempted run. The Prefect Client notices that there is a version mismatch and loads the most recent state (which in the examples above areSuccessstates)There is a valid bug in the original report:
that was caused by a race condition in the Prefect Cloud Zombie Killer that should now be resolved.
There are many improvements to the state bookkeeping that would make this experience more navigable, and we believe that the changes to state handling in 2.0 will address these.
For those who might encounter the same problem in the future. I was able to get very stable runs using adaptive mode using the following environment variables
And using
adapt_kwargsto prevent aggressive up/down scalingPlease do not ping the team, we’re all following this issue already and it will not lead to faster resolution. You are welcome to contribute a patch if this is pressing for you, otherwise we are investigating it as much as resources allow.
I haven’t had a chance to look into this further. Orion might have this issue, but we have stricter rules about state transitions which should prevent it.
Don’t worry about the noise; we won’t complain about having more information for sure. Share any updates you have
If scaling out with
KubeClusterdoesn’t work well on GKE Autopilot, I wonder whether it makes sense to try scaling up instead - you could try spinning up a flow run pod with more resources onKubernetesRunand use aLocalDaskExecutorwithin that single pod. It would vastly simplify the coordination between threads and processes and perhaps would solve the problem equally well? Definitely something worth trying