dask-cuda: System Monitor Error

I’m getting an error when running the local_cudf_merge benchmark with latest of distributed/dask (in main).

python local_cudf_merge.py -p tcp -d 0 --profile foo.html

bzaitlen@prm-dgx-06:~$ python $CONDA_PREFIX/lib/python3.8/site-packages/dask_cuda/benchmarks/local_cudf_merge.py -p tcp -d 0  --profile
 foo.html
distributed.utils - ERROR - deque index out of range
Traceback (most recent call last):
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/utils.py", line 671, in log_errors
    yield
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/dashboard/components/shared.py", line 581, in update
    self.source.stream(self.get_data(), 1000)
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/dashboard/components/shared.py", line 573, in get_data
    d = self.worker.monitor.range_query(start=self.last_count)
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/system_monitor.py", line 123, in range_query
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/system_monitor.py", line 123, in <dictcomp>
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
  File "/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/system_monitor.py", line 123, in <listcomp>
    d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
IndexError: deque index out of range

@charlesbluca do you have time to investigate what is happening here ? I think you are familiar with the system_monitor section of distributed.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 19 (19 by maintainers)

Most upvoted comments

Sure, I can do that! Would it be skipped by the Distributed CI right now?

EDIT: Actually it looks like these Distributed tests already handle the general issue here (checking for GPU info in the WorkerState/worker monitors), so we should be good there.

Maybe it make sense to add the test to Distributed as well with a pytest.importorskip?

Thanks @pentschev! I’ll submit a test for this in Dask-CUDA (maybe something like @quasiben’s perf report snippet), but it would be nice to have an equivalent test in Distributed if/when we are able to test NVML stuff there.

Note this also happens in “basic” usage:

In [1]: from dask_cuda import LocalCUDACluster

In [2]: from dask.distributed import Client

In [3]: from dask.distributed import Client, performance_report

In [4]: cluster = LocalCUDACluster()

In [5]: client = Client(cluster)

In [6]: with performance_report(filename='foo.html'):
   ...:     client.submit(lambda x: x+1, 1)
   ...:
/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/client.py in __aexit__(self, typ, value, traceback, code)
   4711             except Exception:
   4712                 code = ""
-> 4713         data = await get_client().scheduler.performance_report(
   4714             start=self.start, last_count=self.last_count, code=code
   4715         )

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/core.py in send_recv_from_rpc(**kwargs)
    862             name, comm.name = comm.name, "ConnectionPool." + key
    863             try:
--> 864                 result = await send_recv(comm=comm, op=key, **kwargs)
    865             finally:
    866                 self.pool.reuse(self.addr, comm)

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/core.py in send_recv(comm, reply, serializers, deserializers, **kwargs)
    661         if comm.deserialize:
    662             typ, exc, tb = clean_exception(**response)
--> 663             raise exc.with_traceback(tb)
    664         else:
    665             raise Exception(response["text"])

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/core.py in handle_comm()
    496                             result = asyncio.ensure_future(result)
    497                             self._ongoing_coroutines.add(result)
--> 498                             result = await result
    499                     except (CommClosedError, CancelledError) as e:
    500                         if self.status == Status.running:

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/scheduler.cpython-38-x86_64-linux-gnu.so in performance_report()

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/bokeh/core/property/validation.py in func()
     91     def func(*args, **kwargs):
     92         with validate(False):
---> 93             return input_function(*args, **kwargs)
     94     return func
     95

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/dashboard/components/shared.py in update()
    579     def update(self):
    580         with log_errors():
--> 581             self.source.stream(self.get_data(), 1000)
    582             self.label_source.data["cpu"] = list(
    583                 "{}: {:.1f}%".format(f.__name__, f(self.source.data["cpu"]))

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/dashboard/components/shared.py in get_data()
    571
    572     def get_data(self):
--> 573         d = self.worker.monitor.range_query(start=self.last_count)
    574         d["time"] = [x * 1000 for x in d["time"]]
    575         self.last_count = self.worker.monitor.count

/gpfs/fs1/bzaitlen/miniconda3/envs/20210601-nightly-21.08/lib/python3.8/site-packages/distributed/system_monitor.py in range_query()
    121         seq = [i for i in range(istart, len(self.cpu))]
    122
--> 123         d = {k: [v[i] for i in seq] for k, v in self.quantities.items()}
    124         return d

We used to initialize a cuda context when starting the workers but I think we changed things slightly. @pentschev mentioned some of those changes here: https://github.com/rapidsai/dask-cuda/issues/632

@pentschev do you have an idea of what’s going on here ? Seems like there is another initialization / timing issue around cuda contexts