datashader: Memory errors on distributed dask cluster

Description

I have a persisted dask dataframe which is larger than the amount of memory on my notebook server and any of the individual workers. The structure is x, y, z lidar data.

When trying to plot with datashader it seems to attempt to transfer the whole dataframe to the notebook when aggregating before plotting.

ddf = client.persist(dd.read_parquet('Some 20GB dataset'))
cvs = ds.Canvas(900, 525)
agg = cvs.points(ddf, 'x', 'y', agg=ds.mean('z'))

This results in 20GB of data being transferred to my notebook (and it gets killed by the OOM killer as I only have 16GB of RAM).

Your environment

Datashader version: 0.6.8 Dask version: 0.20.0 Distributed version: 1.24.0

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Comments: 16 (1 by maintainers)

Most upvoted comments

Hi @jacobtomlinson,

Wanted to let you know that I’m planning to take a look at this, as it’s definitely an important usecase (and it’s something that Datashader+Dask should be able to handle). But unfortunately it probably won’t be until early February that I’ll have a compute/storage environment setup to be able to reproduce what you’re seeing.

groupby-aggregations are computed by doing groupby aggregations on the partitions, then merging a few, doing more groupby-aggreations on those, and so on in a tree reduction until we get to a final result. There is never much memory in any particular partition (assuming that the number of groups is managable)

As an example, we accomplish a groupby-mean by doing a groupby-sum and groupby-count on each partition, then doing a groupby-sum on both of those until we get down to one, then dividing the result on the final partition.

However, datashader does different things than dask.dataframe. I’m not as familiar with their algorithms, but I suspect that they do something similar.