prefect: Running the same subflow concurrently multiple times raises `RuntimeError(

Opened from the Prefect Public Slack Community

marcin.grzybowski: Hello again. How can I run the same Flow parallelly? I have used code from https://discourse.prefect.io/t/how-can-i-run-multiple-subflows-or-child-flows-in-parallel/96 and it works.

But when I simplify it and modify to run same Flow (not different 4) then I get

RuntimeError("The task runner is already started!")

Modified code below:

import asyncio
from prefect import flow

@flow
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow
async def main_flow():
    parallel_subflows = [subflow_1(), subflow_1()]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

anna: I can reproduce the issue and I can confirm that it’s either a bug or that the tutorial must be updated if there is another way of doing that.

But I’d like to know why would you want to do it this way? Do you really want to run the same subflow twice in parallel with the same parameters? I’m curious what is your use case for that?

As a temporary solution, turning that into a task instead of a subflow will work

marcin.grzybowski: Nah, i would like to run the same flow with different parameters

anna: can you explain your use case a bit more? are those subflows containing a lot of tasks?

if you would build those as tasks, then the issue becomes much simpler, but I can understand why subflows may sometimes be better

marcin.grzybowski: Yep, I’m just checking if we can achieve our goals - and as mentioned in other thread: https://prefect-community.slack.com/archives/C03D12VV4NN/p1654007524220189?thread_ts=1654004474.016759&cid=C03D12VV4NN we will probably need to use combination of nested Flows/Tasks to make it possible that we have reusable code that we can track on graph with desired granularity. If it won’t be possible maybe logs will be enough for us, but then there is this problem with logger for DaskTaskRunner 😉

So I’m just checking what’s possible and what is not

marcin.grzybowski: Possibility of seeing details of flow on low level is really nice, but for that, as I understand, I need to use Flow->Taks->Flow->Task… combination

anna: not necessarily - the problem you see here is only when running a single subflow multiple times in parallel - if you don’t run those in parallel, it works fine - this works and satisfies your modularity use case:

import asyncio
from prefect import flow

@flow 
async def subflow_1():
    print("Subflow 1 started!")
    await asyncio.sleep(1)


@flow 
async def main_flow():
    for _ in range(5):
        await subflow_1()

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())

anna: <@ULVA73B9P> open “Running the same subflow concurrently multiple times raises RuntimeError("The task runner is already started!")

Original thread can be found here.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 4
  • Comments: 17 (9 by maintainers)

Most upvoted comments

This one is a bit complicated but is on our radar to resolve.

Yeah this is feasible and something we’ll be exploring.

@NoamGit this is unrelated to the error reported in this thread, can you open a new issue?

I’m having a similar problem when trying to run a flow that calls the same subflow multiple times, which itself calls tasks. The outer flow generates and loops over a list of URLs, calling a subflow on each one that checks a cache, and if it’s not found, fetches the URL and saves the result (each of those steps being a task).

I have Orion connected to Postgres so I don’t get the SQLite issue shown above, but I do get errors like:

RuntimeError: Task <Task pending name='anyio.from_thread.BlockingPortal._call_func' coro=<BlockingPortal._call_func() running at /home/ubuntu/.local/lib/python3.8/site-packages/anyio/from_thread.py:187> cb=[TaskGroup._spawn.<locals>.task_done() at /home/ubuntu/.local/lib/python3.8/site-packages/anyio/_backends/_asyncio.py:629]> got Future <Future pending> attached to a different loop

This can be evaded by making all of the tasks within a (sub-)flow async as well, but that’s not ideal as the subflow’s tasks (check cache => fetch => save) are all dependent so I’m just awaiting repeatedly. Ideally I could just asynchronously run the subflow over all the URLs, while running the tasks in each subflow run synchronously.

unfortunately it works only for flows without tasks… Adding task causes error

import asyncio
import copy
from prefect import flow, get_run_logger, task

@task
def task1(a):
    get_run_logger().info("task 1 started!" + a)

@flow
async def subflow_1(a):
    get_run_logger().info("Subflow 1 started!" + a)
    task1(a)
    await asyncio.sleep(1)


@flow
async def main_flow():
    params = ['a', 'b', 'c']
    parallel_subflows = [copy.deepcopy(subflow_1)(param) for param in params]
    await asyncio.gather(*parallel_subflows)

if __name__ == "__main__":
    main_flow_state = asyncio.run(main_flow())
Traceback (most recent call last):
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/engine/base.py", line 1819, in _execute_context
    self.dialect.do_execute(
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/engine/default.py", line 732, in do_execute
    cursor.execute(statement, parameters)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 100, in execute
    self._adapt_connection._handle_exception(error)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 229, in _handle_exception
    raise error
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/dialects/sqlite/aiosqlite.py", line 82, in execute
    self.await_(_cursor.execute(operation, parameters))
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 68, in await_only
    return current.driver.switch(awaitable)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/sqlalchemy/util/_concurrency_py3k.py", line 121, in greenlet_spawn
    value = await result
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/cursor.py", line 37, in execute
    await self._execute(self._cursor.execute, sql, parameters)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/cursor.py", line 31, in _execute
    return await self._conn._execute(fn, *args, **kwargs)
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/core.py", line 129, in _execute
    return await future
  File "/mnt/h/projects/prefect2.0/lib/python3.9/site-packages/aiosqlite/core.py", line 102, in run
    result = function()
sqlite3.OperationalError: database is locked