distributed: KeyError when running P2P rechunking
(edit: I’m using python 3.10.8 and both dask and distributed are version 2022.3.1) I have several issues running this code:
Downsampling + rechunking
import dask.array as da
import numpy as np
import dask
import time
from distributed import Client
from dask_jobqueue import LSFCluster
shape = (2000,) * 3
source_chunks = (1024,) * 3
dest_chunks = (64,) * 3
data = da.random.randint(0,255, shape, dtype='uint8', chunks=source_chunks)
levels = 8
multi = [data]
for level in range(levels):
multi.append(da.coarsen(np.mean, multi[-1], {0: 2, 1: 2, 2: 2}, trim_excess=True))
rechunked_tasks = []
for m in multi:
# only rechunk if the chunks are too small
if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)):
rechunked_tasks.append(m.rechunk(dest_chunks, algorithm='tasks'))
else:
rechunked_tasks.append(m)
mean_tasks = [m.mean() for m in rechunked_tasks]
with dask.config.set({'optimization.fuse.active': False}):
rechunked_p2p = []
for m in multi:
# only rechunk if the chunks are too small
if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)):
rechunked_p2p.append(m.rechunk(dest_chunks, algorithm='p2p'))
else:
rechunked_p2p.append(m)
mean_p2p = [m.mean() for m in rechunked_p2p]
if __name__ == '__main__':
num_cores = 8
cluster = LSFCluster(
cores=num_cores,
processes=1,
memory=f"{15 * num_cores}GB",
ncpus=num_cores,
mem=15 * num_cores,
walltime="72:00",
)
cluster.scale(10)
cl = Client(cluster)
print(f"Begin distributed operations. Dask dashboard url: {cl.dashboard_link}")
start = time.time()
cl.compute(mean_p2p, sync=True)
print(f"Completed p2p rechunking -> mean after {time.time() - start} s")
start = time.time()
cl.compute(mean_tasks, sync=True)
print(f"Completed tasks rechunking -> mean after {time.time() - start} s")
first, I got an import error due to pyarrow missing from my python environment when I ran cl.compute. The error should probably happen earlier, e.g. as soon as I select algorithm=p2p in da.rechunk.
After installing pyarrow, I get a new error:
Traceback
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py", line 41, in rechunk_transfer
return _get_worker_extension().add_partition(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 628, in add_partition
shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 879, in get_or_create_shuffle
return sync(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
raise exc.with_traceback(tb)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
result = yield future
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 709, in _get_or_create_shuffle
shuffle = await self._refresh_shuffle(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 768, in _refresh_shuffle
result = await self.worker.scheduler.shuffle_get_or_create(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1227, in send_recv_from_rpc
return await send_recv(comm=comm, op=key, **kwargs)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1011, in send_recv
raise exc.with_traceback(tb)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/dev/cosem-flows/test_rechunking.py", line 54, in <module>
cl.compute(mean_p2p, sync=True)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 3351, in compute
result = self.gather(futures)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 2305, in gather
return self.sync(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync
return sync(
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
raise exc.with_traceback(tb)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
result = yield future
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
value = future.result()
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 2168, in _gather
raise exception.with_traceback(traceback)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py", line 50, in rechunk_transfer
raise RuntimeError("rechunk_transfer failed during shuffle {id}") from e
RuntimeError: rechunk_transfer failed during shuffle {id}
/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/dask_jobqueue/core.py:237: FutureWarning: extra has been renamed to worker_extra_args. You are still using it (even if only set to []; please also check config files). If you did not set worker_extra_args yet, extra will be respected for now, but it will be removed in a future release. If you already set worker_extra_args, extra is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
warnings.warn(warn, FutureWarning)
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
return self.get(id, worker)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
result = handler(**msg)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
state = self._create_array_rechunk_state(id, spec)
File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
Any ideas what could be going wrong here?
About this issue
- Original URL
- State: open
- Created a year ago
- Comments: 20 (20 by maintainers)
I’ve been able to reduce the example further:
The call to
da.storecauses the error (likely through unwanted optimization). I’ll keep investigating.