prefect: AssertionError on running tasks in parallel

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn’t find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

  1. Having a main flow which uses sequential task runner, calling a subflow which just calls the .map function on the task trying to create parallel task runs.
  2. few of those tasks crash , giving AssertionError Screenshot 2022-11-25 at 10 57 50 AM

Reproduction

from prefect import task, flow
from prefect import get_run_logger
import time
from prefect.task_runners import SequentialTaskRunner


@task(name='run_scheduler')
def run_scheduler(event):
    scheduler_output = [i for i in range(1, 300)]
    time.sleep(150)  # Adding sleep time to make task bit long running
    return scheduler_output


@task(name='run_executor', tags=['spends_executor'])
def run_executor(scheduler_output):
    time.sleep(150)  # Adding sleep time to make task bit long running
    executor_output = f"printing just the executor input {scheduler_output}"
    return executor_output


@flow(task_runner=SequentialTaskRunner())
def spends_flow(event):
    logger = get_run_logger()
    logger.info(event)

    # Calling Scheduler task
    scheduler_output = run_scheduler(event)

    # if the output is empty array just log it else calling subflow which creates parallel task executions

    if len(scheduler_output) < 1:
        logger.info("no elements in array")
    else:
        spends_executor(scheduler_output)
    logger.info('flow completed')


@flow
def spends_executor(scheduler_output):
    run_executor.map(scheduler_output)


if __name__ == "__main__":
    event = "test"
    spends_flow(event)

Error

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/prefect/engine.py", line 1247, in orchestrate_task_run
    result = await run_sync(task.fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 68, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "flows/test_calculate_spends_flows/test_spends_flow.py", line 16, in run_executor
    output = run_deployment(
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 197, in coroutine_wrapper
    return run_async_from_worker_thread(async_fn, *args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 148, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/usr/local/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.9/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/prefect/deployments.py", line 131, in run_deployment
    flow_run = await client.read_flow_run(flow_run_id)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/orion.py", line 1443, in read_flow_run
    response = await self._client.get(f"/flow_runs/{flow_run_id}")
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1757, in get
    return await self.request(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1533, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/usr/local/lib/python3.9/site-packages/prefect/client/base.py", line 160, in send
    await super().send(*args, **kwargs)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1620, in send
    response = await self._send_handling_auth(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1648, in _send_handling_auth
    response = await self._send_handling_redirects(
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1685, in _send_handling_redirects
    response = await self._send_single_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_client.py", line 1722, in _send_single_request
    response = await transport.handle_async_request(request)
  File "/usr/local/lib/python3.9/site-packages/httpx/_transports/default.py", line 353, in handle_async_request
    resp = await self._pool.handle_async_request(req)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 221, in handle_async_request
    await self._attempt_to_acquire_connection(status)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 160, in _attempt_to_acquire_connection
    status.set_connection(connection)
  File "/usr/local/lib/python3.9/site-packages/httpcore/_async/connection_pool.py", line 22, in set_connection
    assert self.connection is None
AssertionError

Versions

agent
Version:             2.6.6
API version:         0.8.3
Python version:      3.9.15
Git commit:          87767cda
Built:               Thu, Nov 3, 2022 1:15 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

self hosted orion server
Version:             2.6.0
API version:         0.8.2
Python version:      3.9.14
Git commit:          96f09a51
Built:               Thu, Oct 13, 2022 3:21 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         hosted

Additional context

No response

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Comments: 22 (4 by maintainers)

Most upvoted comments

Hi! I believe this error is from the most recent httpcore/httpx releases — I’d recommend downgrading those dependencies to the previous version.

@padbk that looks like an unrelated issue.

Looks like they’ve addressed this upstream https://github.com/encode/httpcore/pull/627

Hi @deepanshu-zluri , I can’t share my file directly but there is no real magic behind. Attention we pre-bundle all our stuff into one image and use it as infrastructure. Our agent and server are running in Docker images with default prefect image and Python 3.10. So it depends how you use it later.

So just a little snippet:

FROM python:3.10-slim-bullseye
# copy all flows and other internal dependencies
# there is also a requirements.txt which defines prefect version etc.
COPY prefect /prefect
RUN pip install --no-warn-script-location /prefect/.