prefect: Multiprocessing deadlock

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn’t find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Running native python multiprocessing with more than 1 worker leads to a deadlock at the .join() step. I am running this in a docker container.

Reproduction

Inside a docker container:


import multiprocessing as mp

def mp_task(args):
    return 1

@flow 
def mp_flow():
    n_cpu = mp.cpu_count()
    pool = multiprocessing.Pool(processes=n_cpu)
    args = iter([1,2,3])

    res = pool.starmap(mp_task, args)
    pool.close()
    pool.join()
    return res

Error

None, just 0 progress.

I will get full core utility across all cpus and then they just drop to 0. and no progress is made.

Versions

prefect 2.13

Additional context

No response

About this issue

  • Original URL
  • State: closed
  • Created 9 months ago
  • Reactions: 3
  • Comments: 24 (9 by maintainers)

Most upvoted comments

I also am encountering this issue with one of my flows. My flow uses a third-party module that uses mp.Pool() and it causes the flow to hang at the pool.join() step. No errors are thrown, it just never performs the join and flow runs forever.

@urimandujano configuring multiprocessing to use “spawn” as start method is only a partial workaround as there are many situations where this might not work (spawn generally requires that objects required by the subprocess are picklable, which is not always possible).

From my experience with Prefect so far, multiprocessing within tasks was often a bit tricky. Are there any best practices regarding multiprocessing in Prefect and known limitations that developers should keep in mind?

We’re also having similar issues with hanging tasks when used in combination with multiprocessing. Here is the issue we are running into. This code hangs forever on process.join(). However, it does not hang when the function run_processes is not decorated as a task.

from multiprocessing import Process
from prefect import flow, task

@task
def run_processes():
    def processing_task():
        pass

    process = Process(target=processing_task)
    process.start()
    process.join()


@flow()
def main() -> None:
    run_processes()

main()

@ccueto36 I’m not sure how to best handle your situation. This behavior is coming from the stdlib so I would expect the fix to be implemented in whatever code is creating the Pool. Since prefect isn’t creating the pool, I don’t think this is something we should set on our end – in my opinion it should be set in the 3rd party code.

As a quick work around, it looks like you can configure the multiprocessing context to use spawn but keep in mind you’d probably need to configure the context before the 3rd party code ever creates its pool.

I can confirm that I have this exact same issue when trying to use concurrent.futures.ProcessPoolExecutor