prefect: Mapped tasks trigger multiple times on GKE Autopilot

Hopefully I’m not conflating multiple things here, but I’m in quite a mess when I try to run one of my flows in a GKE Autopilot cluster.

Description

Mapped tasks appear to (sometimes) trigger multiple times on GKE Autopilot. This puts flow runs into unstable states – sometimes completing successfully, sometimes completing but reporting a failure, and sometimes the flow run itself does not complete due to failure.

In production, my tasks may run for multiple hours and I’ve observed 3-4 version of a child task kick off throughout the lifecycle of the original instance (including after the original instance has succeeded 🤯 ) despite no indication that previous attempts have failed. I’ve attached a log from a child task for the minimal example below. A few observations:

  • In this instance, all slow_run children triggered more than once. In some cases this only appears to affect certain children, and occasionally I don’t observe the issue (this is rare). This makes me think we’re dealing with a race condition of some sort, potentially related to autoscaling in GKE.
  • After being started three times, the first instance of slow_run[0] was actually killed by ZombieKiller and marked as a failed run for a few minutes. It then was changed to Success after the final instance of slow_run[0] completed (this one ran for 12 minutes, as shown in logs).
  • Child tasks 0 and 2 of report_results both print out “Final value is None” which suggests the result of slow_run isn’t passed down successfully.

Expected Behavior

Mapped tasks should run through the expected state transitions. Child tasks shouldn’t re-trigger unless something happens to previous invocations, e.g. failures requiring retries.

I would expect the fact that I’m on GKE Autopilot (which includes autoscaling) to not affect the state transitions for child tasks.

Reproduction

from prefect import Flow

def share_state_transition(task, old_state, new_state):
    logger = prefect.context.get('logger')
    logger.info("\nState transition for {0}: {1} -> {2} (retrying={3})\n".format(
        task, 
        old_state, 
        new_state,
        new_state.is_retrying(),
    ))

    return new_state

@prefect.task
def get_list():
    return [1, 2, 3, 4, 5]

@prefect.task(state_handlers=[share_state_transition,],)
def slow_run(num):
    logger = prefect.context.get('logger')

    num_minutes = random.randint(4, 12)
    logger.info(
        'Running task num={} for {} minutes'.format(
            num,
            num_minutes,
        )
    )

    time.sleep(num_minutes * 60)

    return num * 2

@prefect.task(state_handlers=[share_state_transition,],)
def report_results(num):
    logger = prefect.context.get('logger')

    logger.info('Final value is {}'.format(num))

with Flow(
  # See below; using KubernetesRun (on GKE Autopilot) and DaskExecutor
) as flow:
    results = slow_run.map(
        get_list(),
    )

    report_results.map(results)

Environment

I’m running in a GKE Autopilot cluster using KubernetesRun and DaskExecutor with adaptive scaling enabled. Excerpt of my flow config, happy to provide more but I don’t think there’s anything too controversial.

# ...within Flow()...
run_config=KubernetesRun(
    image='gcr.io/<our-image>',
    image_pull_policy='IfNotPresent',
),
executor=DaskExecutor(
    cluster_class=lambda: KubeCluster(
        make_pod_spec(
            image=prefect.context.image,
            cpu_request=2,
            memory_request='8G',
        ),
    ),
    adapt_kwargs={ 
        'minimum': 1, 
        'maximum': 12,
    },
),

Prefect diagnostics:

{
  "config_overrides": {},
  "env_vars": [],
  "system_information": {
    "platform": "macOS-12.2.1-arm64-arm-64bit",
    "prefect_backend": "cloud",
    "prefect_version": "1.0.0",
    "python_version": "3.9.7"
  }
}

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 40 (37 by maintainers)

Most upvoted comments

Hey everyone, I believe Prefect is doing the “right” thing here in the face of a noisy environment, but it’s doing a poor job of displaying what’s happening. It’s important to start by noting that with this setup Prefect doesn’t control Dask’s scheduler but rather layers on state handling features (retries, prevention of logic re-execution, etc.). With that, let me break down the different pieces:

  • the only time your task logic is executing is when your task run is an a Running state within Prefect; this means that transitions such as Pending -> Cached do not represent a rerun of logic but rather a data load from a previous Success state; similarly, a transition of Pending -> Success does not represent a rerun (more on this later)
  • if we focus our attention on a single flow run process, Prefect will never re-execute completed upstream tasks
  • Dask, on the other hand, does rerun things quite often – when workers die, dask sometimes needs to recompute upstream data dependencies; other times, the Dask scheduler decides a rerun on a new worker is more efficient than shipping data around the cluster
  • if you happen to be using Dask and wish to prevent your logic from executing twice, Prefect offers two options:
    • the use of caching (so that a Pending -> Running transition is prevented and instead a Pending -> Cached transition occurs)
    • version locking: version locking prevents a task from transitioning Pending -> Running two times within a single flow run; instead, if a version lock is encountered, you’ll see a Pending -> Success on the second attempted run. The Prefect Client notices that there is a version mismatch and loads the most recent state (which in the examples above are Success states)

There is a valid bug in the original report:

After being started three times, the first instance of slow_run[0] was actually killed by ZombieKiller and marked as a failed run for a few minutes. It then was changed to Success after the final instance of slow_run[0] completed (this one ran for 12 minutes, as shown in logs).

that was caused by a race condition in the Prefect Cloud Zombie Killer that should now be resolved.

There are many improvements to the state bookkeeping that would make this experience more navigable, and we believe that the changes to state handling in 2.0 will address these.

For those who might encounter the same problem in the future. I was able to get very stable runs using adaptive mode using the following environment variables

"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING": "true",
"DASK_DISTRIBUTED__SCHEDULER__WORK_STEALING_INTERVAL": "120s",
"DASK_DISTRIBUTED__SCHEDULER__WORKER_TTL": "3600s",
"DASK_DISTRIBUTED__SCHEDULER__ALLOWED_FAILURES": "100",
"DASK_DISTRIBUTED__WORKER__PROFILE__INTERVAL": "10000ms",
"DASK_DISTRIBUTED__WORKER__PROFILE__CYCLE": "1000000ms",
"DASK_DISTRIBUTED__COMM__TIMEOUTS__CONNECT": "300s",
"DASK_DISTRIBUTED__COMM__SOCKET_BACKLOG": "16384",
"DASK_DISTRIBUTED__COMM__TIMEOUTS__TCP": "300s",
"DASK_DISTRIBUTED__COMM__RETRY__COUNT": "12"

And using adapt_kwargs to prevent aggressive up/down scaling

adapt_kwargs={
    "minimum": min_workers,
    "maximum": max_workers,
    "wait_count": 20,
    "target_duration": "300s",
    "interval": "30s",
},

Please do not ping the team, we’re all following this issue already and it will not lead to faster resolution. You are welcome to contribute a patch if this is pressing for you, otherwise we are investigating it as much as resources allow.

I haven’t had a chance to look into this further. Orion might have this issue, but we have stricter rules about state transitions which should prevent it.

Don’t worry about the noise; we won’t complain about having more information for sure. Share any updates you have

If scaling out with KubeCluster doesn’t work well on GKE Autopilot, I wonder whether it makes sense to try scaling up instead - you could try spinning up a flow run pod with more resources on KubernetesRun and use a LocalDaskExecutor within that single pod. It would vastly simplify the coordination between threads and processes and perhaps would solve the problem equally well? Definitely something worth trying