distributed: KeyError when running P2P rechunking

(edit: I’m using python 3.10.8 and both dask and distributed are version 2022.3.1) I have several issues running this code:

Downsampling + rechunking
import dask.array as da
import numpy as np
import dask
import time
from distributed import Client
from dask_jobqueue import LSFCluster

shape = (2000,) * 3
source_chunks = (1024,) * 3
dest_chunks = (64,) * 3
data = da.random.randint(0,255, shape, dtype='uint8', chunks=source_chunks)
levels = 8
multi = [data]

for level in range(levels):
    multi.append(da.coarsen(np.mean, multi[-1], {0: 2, 1: 2, 2: 2}, trim_excess=True))

rechunked_tasks = []
for m in multi:
    # only rechunk if the chunks are too small
    if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)):
        rechunked_tasks.append(m.rechunk(dest_chunks, algorithm='tasks'))
    else:
        rechunked_tasks.append(m)
mean_tasks = [m.mean() for m in rechunked_tasks]

with dask.config.set({'optimization.fuse.active': False}):
    rechunked_p2p = []
    for m in multi:
        # only rechunk if the chunks are too small
        if any(c1 < c2 for c1,c2 in zip(m.chunksize, dest_chunks)):
            rechunked_p2p.append(m.rechunk(dest_chunks, algorithm='p2p'))
        else:
            rechunked_p2p.append(m)
    mean_p2p = [m.mean() for m in rechunked_p2p]

if __name__ == '__main__':

    num_cores = 8
    cluster = LSFCluster(
        cores=num_cores,
        processes=1,
        memory=f"{15 * num_cores}GB",
        ncpus=num_cores,
        mem=15 * num_cores,
        walltime="72:00",
    )
    cluster.scale(10)

    cl = Client(cluster)
    print(f"Begin distributed operations. Dask dashboard url: {cl.dashboard_link}")

    start = time.time()
    cl.compute(mean_p2p, sync=True)
    print(f"Completed p2p rechunking -> mean after {time.time() - start} s")

    start = time.time()
    cl.compute(mean_tasks, sync=True)
    print(f"Completed tasks rechunking -> mean after {time.time() - start} s")

first, I got an import error due to pyarrow missing from my python environment when I ran cl.compute. The error should probably happen earlier, e.g. as soon as I select algorithm=p2p in da.rechunk.

After installing pyarrow, I get a new error:

Traceback
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py", line 41, in rechunk_transfer
    return _get_worker_extension().add_partition(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 628, in add_partition
    shuffle = self.get_or_create_shuffle(shuffle_id, type=type, **kwargs)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 879, in get_or_create_shuffle
    return sync(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'
    raise exc.with_traceback(tb)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
    result = yield future
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 709, in _get_or_create_shuffle
    shuffle = await self._refresh_shuffle(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_worker_extension.py", line 768, in _refresh_shuffle
    result = await self.worker.scheduler.shuffle_get_or_create(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1227, in send_recv_from_rpc
    return await send_recv(comm=comm, op=key, **kwargs)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 1011, in send_recv
    raise exc.with_traceback(tb)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/dev/cosem-flows/test_rechunking.py", line 54, in <module>
    cl.compute(mean_p2p, sync=True)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 3351, in compute
    result = self.gather(futures)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 2305, in gather
    return self.sync(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 338, in sync
    return sync(
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 405, in sync
    raise exc.with_traceback(tb)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/utils.py", line 378, in f
    result = yield future
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/tornado/gen.py", line 769, in run
    value = future.result()
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/client.py", line 2168, in _gather
    raise exception.with_traceback(traceback)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_rechunk.py", line 50, in rechunk_transfer
    raise RuntimeError("rechunk_transfer failed during shuffle {id}") from e
RuntimeError: rechunk_transfer failed during shuffle {id}
/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/dask_jobqueue/core.py:237: FutureWarning: extra has been renamed to worker_extra_args. You are still using it (even if only set to []; please also check config files). If you did not set worker_extra_args yet, extra will be respected for now, but it will be removed in a future release. If you already set worker_extra_args, extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/dask_jobqueue/core.py:255: FutureWarning: job_extra has been renamed to job_extra_directives. You are still using it (even if only set to []; please also check config files). If you did not set job_extra_directives yet, job_extra will be respected for now, but it will be removed in a future release. If you already set job_extra_directives, job_extra is ignored and you can remove it.
  warnings.warn(warn, FutureWarning)
distributed.core - ERROR - Exception while handling op shuffle_get_or_create
Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 139, in get_or_create
    return self.get(id, worker)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 127, in get
    state = self.states[id]
KeyError: '290a0eef76b7ab6e389634324025588a'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/core.py", line 818, in _handle_comm
    result = handler(**msg)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 145, in get_or_create
    state = self._create_array_rechunk_state(id, spec)
  File "/groups/cellmap/home/bennettd/miniconda3/envs/cosem-flows/lib/python3.10/site-packages/distributed/shuffle/_scheduler_extension.py", line 204, in _create_array_rechunk_state
    for ts in self.scheduler.tasks[name].dependents:
KeyError: 'shuffle-barrier-290a0eef76b7ab6e389634324025588a'

Any ideas what could be going wrong here?

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 20 (20 by maintainers)

Most upvoted comments

I’ve been able to reduce the example further:

import dask.array as da
import dask
from distributed import Client, LocalCluster

class VoidStore():
    """
    A class that implements setitem as a no-op
    """
    def __setitem__(*args):
        pass

cluster = LocalCluster()
client = Client(cluster)
source = da.random.randint(0, 255, (4, ), chunks=(2, 2))
with dask.config.set({'optimization.fuse.active': False}):
    rechunked = source.rechunk((3, 1), algorithm='p2p')
    stored = da.store(rechunked, VoidStore(), lock=None)

The call to da.store causes the error (likely through unwanted optimization). I’ll keep investigating.