prefect: Dask CancelledError when running flow in parallel

I managed to get the DaskExecutor to work with the simple example with inc/dec/add/list_sum. However, I’m running (pretty reliably) into what seems to be concurrency errors. When I run Dask with just one thread in a process, it works fine. However, when I give it multiple threads in one process, I’m consistently getting errors. I reduced the example flow to the following:

with Flow("dask-example") as flow:
	incs = inc.map(x=range(100))
	decs = dec.map(x=range(100))

The error I’m getting when running with Dask, one worker process (+ one scheduler process) and multiple threads in that worker process:

[2019-04-18 15:06:18,588] INFO - prefect.TaskRunner | Task 'Constant[range]': Starting task run...
[2019-04-18 15:06:18,588] INFO - prefect.TaskRunner | Task 'Constant[range]': Starting task run...
... (more init for Task Contant) ...
[2019-04-18 15:06:18,598] INFO - prefect.TaskRunner | Task 'dec': Starting task run...
[2019-04-18 15:06:18,602] INFO - prefect.TaskRunner | Task 'inc': Starting task run...
[2019-04-18 15:06:18,826] DEBUG - prefect.TaskRunner | Task 'dec': Handling state change from Pending to Mapped
**[2019-04-18 15:06:18,856] INFO - prefect.TaskRunner | Unexpected error: CancelledError('run_fn-bc3361b2-937f-4d22-b02e-7bdeafe9f539-86',)
[2019-04-18 15:06:18,862] ERROR - prefect.TaskRunner | Task 'dec': unexpected error while running task: CancelledError('run_fn-bc3361b2-937f-4d22-b02e-7bdeafe9f539-86',)**
[2019-04-18 15:06:18,869] INFO - prefect.TaskRunner | Task 'dec': finished task run for task with final state: 'Failed'
[2019-04-18 15:06:18,897] DEBUG - prefect.TaskRunner | Task 'inc': Handling state change from Pending to Mapped
[2019-04-18 15:06:18,999] INFO - prefect.TaskRunner | Task 'dec[0]': Starting task run...
[2019-04-18 15:06:18,999] INFO - prefect.TaskRunner | Task 'dec[1]': Starting task run...
[2019-04-18 15:06:18,999] INFO - prefect.TaskRunner | Task 'dec[2]': Starting task run...
[2019-04-18 15:06:18,999] DEBUG - prefect.TaskRunner | Task 'dec[0]': Handling state change from Pending to Running
[2019-04-18 15:06:19,000] DEBUG - prefect.TaskRunner | Task 'dec[1]': Handling state change from Pending to Running
[2019-04-18 15:06:19,001] DEBUG - prefect.TaskRunner | Task 'dec[2]': Handling state change from Pending to Running
[2019-04-18 15:06:19,001] DEBUG - prefect.TaskRunner | Task 'dec[0]': Calling task.run() method...
[2019-04-18 15:06:19,002] DEBUG - prefect.TaskRunner | Task 'dec[0]': Handling state change from Running to Success
[2019-04-18 15:06:19,002] DEBUG - prefect.TaskRunner | Task 'dec[1]': Calling task.run() method...
[2019-04-18 15:06:19,002] DEBUG - prefect.TaskRunner | Task 'dec[2]': Calling task.run() method...
[2019-04-18 15:06:19,002] INFO - prefect.TaskRunner | Task 'dec[0]': finished task run for task with final state: 'Success'
... (further runs of tasks dec[x]/inc[x] which all seem to run fine, even though dec has already been cancelled) ...

If I include the add.map(x.incs, y.incs) in the flow after the inc/dec, then there’s an additional error. However, this just seems to be a side-effect of the inc/dec failing before. Perhaps the error in this case should be a bit more meaningful, or execution just stopped. At least the error shouldn’t be unexpected I think, as the input task failed already.

[2019-04-18 14:47:14,581] INFO - prefect.TaskRunner | Task 'add': Starting task run...
[2019-04-18 14:47:14,581] INFO - prefect.TaskRunner | Unexpected error: TypeError("'CancelledError' object does not support indexing",)
[2019-04-18 14:47:14,581] DEBUG - prefect.TaskRunner | Task 'add': Handling state change from Pending to Failed
[2019-04-18 14:47:14,581] DEBUG - prefect.TaskRunner | Task 'add': task has been mapped; ending run.

However my question is more related to how I can debug this properly. I’ve been trying to run it with various of the suggested debug settings and trying to dig into the logs to see if I could find anything useful, but so far unfortunately fruitless. I’d be grateful for any suggestions.

About this issue

  • Original URL
  • State: closed
  • Created 5 years ago
  • Comments: 15 (8 by maintainers)

Most upvoted comments

Thanks to both of you for the help so far! I did some more investigation on the dask-yarn, but couldn’t really find a reason why this would be causing it. Therefore, I tried running it without it on just Dask and the issue occurs there as well. Therefore, I think it has indeed something to do with Dask parallelization itself and/or Prefect’s handling of this. I managed to break it down into an example that reproduces 100% on my machine. I hope this replicates on your setup as well. I also have some more detailed logging available.

Start a scheduler and a worker with 3 threads:

dask-scheduler > scheduler.log 2>&1
dask-worker --nanny --nthreads 3 127.0.0.1:8786 > worker.log 2>&1

Run the Prefect example: python3 dask_test.py

The first run of this always fails on my machine. However, if I don’t restart the Dask scheduler/worker, second run and so on will run fine. As soon as I restart my Dask scheduler/worker, the first run after that fails again.

dask_test.py.txt scheduler.log worker.log