prefect: AssertionError on running tasks in parallel
First check
- I added a descriptive title to this issue.
- I used the GitHub search to find a similar issue and didn’t find it.
- I searched the Prefect documentation for this issue.
- I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
- Having a main flow which uses sequential task runner, calling a subflow which just calls the .map function on the task trying to create parallel task runs.
- few of those tasks crash , giving AssertionError
Reproduction
from prefect import task, flow
from prefect import get_run_logger
import time
from prefect.task_runners import SequentialTaskRunner
@task(name='run_scheduler')
def run_scheduler(event):
scheduler_output = [i for i in range(1, 300)]
time.sleep(150) # Adding sleep time to make task bit long running
return scheduler_output
@task(name='run_executor', tags=['spends_executor'])
def run_executor(scheduler_output):
time.sleep(150) # Adding sleep time to make task bit long running
executor_output = f"printing just the executor input {scheduler_output}"
return executor_output
@flow(task_runner=SequentialTaskRunner())
def spends_flow(event):
logger = get_run_logger()
logger.info(event)
# Calling Scheduler task
scheduler_output = run_scheduler(event)
# if the output is empty array just log it else calling subflow which creates parallel task executions
if len(scheduler_output) < 1:
logger.info("no elements in array")
else:
spends_executor(scheduler_output)
logger.info('flow completed')
@flow
def spends_executor(scheduler_output):
run_executor.map(scheduler_output)
if __name__ == "__main__":
event = "test"
spends_flow(event)
Error
Encountered exception during execution:
Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1247, in orchestrate_task_run
result = await run_sync(task.fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(call, cancellable=True)
File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "flows/test_calculate_spends_flows/test_spends_flow.py", line 16, in run_executor
output = run_deployment(
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper
return run_async_from_worker_thread(async_fn, *args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
return self.__get_result()
File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
raise self._exception
File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
return await fn(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 131, in run_deployment
flow_run = await client.read_flow_run(flow_run_id)
File "/usr/local/lib/python3.9/site-packages/prefect/client/orion.py", line 1443, in read_flow_run
response = await self._client.get(f"/flow_runs/{flow_run_id}")
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1757, in get
return await self.request(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1533, in request
return await self.send(request, auth=auth, follow_redirects=follow_redirects)
File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 160, in send
await super().send(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1620, in send
response = await self._send_handling_auth(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
response = await self._send_handling_redirects(
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
response = await self._send_single_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1722, in _send_single_request
response = await transport.handle_async_request(request)
File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
resp = await self._pool.handle_async_request(req)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 221, in handle_async_request
await self._attempt_to_acquire_connection(status)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 160, in _attempt_to_acquire_connection
status.set_connection(connection)
File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 22, in set_connection
assert self.connection is None
AssertionError
Versions
agent
Version: 2.6.6
API version: 0.8.3
Python version: 3.9.15
Git commit: 87767cda
Built: Thu, Nov 3, 2022 1:15 PM
OS/Arch: linux/x86_64
Profile: default
Server type: hosted
self hosted orion server
Version: 2.6.0
API version: 0.8.2
Python version: 3.9.14
Git commit: 96f09a51
Built: Thu, Oct 13, 2022 3:21 PM
OS/Arch: linux/x86_64
Profile: default
Server type: hosted
Additional context
No response
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 22 (4 by maintainers)
Hi! I believe this error is from the most recent httpcore/httpx releases — I’d recommend downgrading those dependencies to the previous version.
@padbk that looks like an unrelated issue.
Looks like they’ve addressed this upstream https://github.com/encode/httpcore/pull/627
Hi @deepanshu-zluri , I can’t share my file directly but there is no real magic behind. Attention we pre-bundle all our stuff into one image and use it as infrastructure. Our agent and server are running in Docker images with default prefect image and Python 3.10. So it depends how you use it later.
So just a little snippet: