dask-image: dask-image imread v0.5.0 not working with dask distributed Client & napari

@kpasko made a bug report https://github.com/napari/napari/issues/2304 but it turns out this is a problem caused by dask-image. I’ve copied the contents of the report into this issue (sadly I’m unable to transfer issues between different organisations).

What happened:

TypeError: can not serialize ‘function’ object

In distributed/client.py line 2635 futures = self._graph_to_futures

What you expected to happen:

successful image viewing

Minimal Complete Verifiable Example:

(Edited)

from dask.distributed import Client
import napari
from dask_image.imread import imread

client = Client()
data = imread('./*.tif')
napari.view_image(data)

Anything else we need to know?: Works fine when not initializing client, i.e.

from dask.distributed import Client
import napari
from dask_image.imread import imread

data = imread('./*.tif')
napari.view_image(data)

works as expected

Environment:

  • Napari/Dask version: dask 2021.2.0 pyhd8ed1ab_0 conda-forge dask-core 2021.2.0 pyhd8ed1ab_0 conda-forge dask-image 0.5.0 pyh44b312d_0 conda-forge distributed 2021.2.0 py39h6e9494a_0 conda-forge napari 0.4.5 pyhd8ed1ab_0 conda-forge napari-console 0.0.3 pyhd8ed1ab_0 conda-forge napari-plugin-engine 0.1.9 py39h6e9494a_1 conda-forge napari-svg 0.1.4 py_0 conda-forge

  • Python version: python 3.9.2 h2502468_0_cpython conda-forge

  • Operating System: OS X 11.2.1

  • Install method (conda, pip, source): conda

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 61 (31 by maintainers)

Most upvoted comments

@jakirkham @jrbourbeau I can confirm that https://github.com/dask/dask/pull/7353 fixes all problems discussed in this issue and in particular the original dask-image-napari workflow 🎉

Okay I narrowed it down to (without napari):

import dask.array as da
import numpy as np

from dask.distributed import Client
import napari
client = Client()

xn = np.random.randint(0, 100, (2, 4, 4))
xd = da.from_array(xn, chunks=(1, 2, 2))

# fails
# def func(block_info=None):
#     return np.random.randint(0, 100, (1, 2, 2))

# works
def func():
    return np.random.randint(0, 100, (1, 2, 2))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)

from dask.core import flatten
keyset = set(flatten(xm.__dask_keys__()))
xm.dask.__dask_distributed_pack__(client, keyset)

Same error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-93-163cc014c931> in <module>
     21 from dask.core import flatten
     22 keyset = set(flatten(xm.__dask_keys__()))
---> 23 xm.dask.__dask_distributed_pack__(client, keyset)

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys)
    942                 }
    943             )
--> 944         return dumps_msgpack({"layers": layers})
    945 
    946     @staticmethod

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    161     """
    162     header = {}
--> 163     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
    164 
    165     fmt, payload = maybe_compress(payload, compression=compression)

~/miniconda3/envs/dask_image_delme/lib/python3.9/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

TypeError: can not serialize 'function' object

I tested the changes in https://github.com/dask/dask/pull/7353 against the example snippet posted in https://github.com/dask/distributed/issues/4574 and the issue was resolved (a corresponding test was also added). It’d be great if someone could double check with the original napari workflow

napari turns off dask task fusion when slicing dask arrays

Yep that was the missing piece 😄

Here’s another repro ( https://github.com/dask/distributed/issues/4574#issuecomment-794773534 ). Fusion on things work. Fusion off fails!

Wow, this is the kind of sleuthing that one loves to wake up to! 😂 👏 👏 Awesome work @m-albert!

I should have remembered and mentioned one more probably-relevant detail, which is that napari turns off dask task fusion when slicing dask arrays. That might account for why it was so hard to reproduce without napari. Sorry, I only just remembered that!

So, dask_image and napari work together as expected in the current main, which however is one commit ahead of the 0.5 release.

Have confirmed this also. As a result, we will:

Really nice work here @m-albert

Does PR ( https://github.com/dask/dask/pull/7353 ) solve the issue? We just merged it fwiw

I should have remembered and mentioned one more probably-relevant detail, which is that napari turns off dask task fusion when slicing dask arrays. That might account for why it was so hard to reproduce without napari. Sorry, I only just remembered that!

Nice, so something goes wrong when trying to pack unfused graphs. For some reason, choosing a mapping function def func(x, block_info=None) instead of def func(block_info=None) circumvents this issue in the latest commit of dask_image. Looking forward to see whether https://github.com/dask/dask/pull/7353 will solve the issue!

Well done! 😄 👏

Can you please file this as a new issue on Distributed ( https://github.com/dask/distributed/issues )?

Are you able to see what the Dask task graph looks like before calling .compute()?

In this failing line dumps_msgpack({"layers": layers}) it seems the entire layers object is packable apart from layers[1]['state']['indices'] which contains func from the example.

ipdb> p layers[1]['state']['indices']
[<function func at 0x150d21a60>, None, (<class 'tuple'>, ['block_info']), None, 'block-info-func-b78f8575db855a3a3f1010f0ef59e206', ('.0', '.1', '.2')]

Btw, I found another hook of some type. This reproduces the problem without dask-image, but using napari.

%gui qt
from dask.distributed import Client
import napari
from dask_image.imread import imread
client = Client()

import dask.array as da
import numpy as np

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func(block_info=None):
    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)

Now, replacing

def func(block_info=None):

by

def func():

makes the error disappear.

As if dask couldn’t deal with the provided function properly in case there’s the block_info argument without the dask array being passed.

Sorry I may have missed this, what is the reproducer now? Were we able to do this without Napari (like just calling .compute())

Couldn’t reproduce the issue with a different/simpler example than the one reported here yet…

Does it occur with 2021.03.0?

Yes it does.

@jni

it shouldn’t be that many commits to check which one did it? ie can you do git-bisect to find the commit that fixes things? (I would do this but as mentioned I couldn’t reproduce this issue)

The breaking commit is 17ec4c25d6b26300bbcdb6bfe781595bd2586a96 which implements map_blocks for imread:

    ...
    a = dask.array.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

    return a


def _map_read_frame(block_info=None, **kwargs):

    i, j = block_info[None]['array-location'][0]

    return _utils._read_frame(i=slice(i, j), **kwargs)

After the latest commit 91fe6e1cb262058296c182ae9f9bd9b255aaebd9, the problem doesn’t occur anymore. This commit changes the way map_blocks is implemented in imread:

    ...
    # place source filenames into dask array
    filenames = sorted(glob.glob(sfname))  # pims also does this
    if len(filenames) > 1:
        ar = dask.array.from_array(filenames, chunks=(nframes,))
        multiple_files = True
    else:
        ar = dask.array.from_array(filenames * shape[0], chunks=(nframes,))
        multiple_files = False

    # read in data using encoded filenames
    a = ar.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        multiple_files=multiple_files,
        new_axis=list(range(1, len(shape))),
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

    return a


def _map_read_frame(x, multiple_files, block_info=None, **kwargs):

    fn = x[0]  # get filename from input chunk

    if multiple_files:
        i, j = 0, 1
    else:
        i, j = block_info[None]['array-location'][0]

    return _utils._read_frame(fn=fn, i=slice(i, j), **kwargs)

These code snippets include all relevant changed lines. Interestingly, the problem seems to be related to the differences in the use of map_blocks. Namely, the problematic commit calls dask.array.map_blocks while the fixing commit uses x.map_blocks, where x is a dask array. I’ve tested that adding a dummy array in the problematic commit solves the problem:

    import dask.array as da
    import numpy as np
    x = da.from_array(np.zeros(shape, dtype=dtype),
                      chunks=dask.array.core.normalize_chunks((nframes,) + shape[1:], shape),)

    a = x.map_blocks(
        _map_read_frame,
        chunks=dask.array.core.normalize_chunks(
            (nframes,) + shape[1:], shape),
        fn=sfname,
        arrayfunc=arrayfunc,
        meta=arrayfunc([]).astype(dtype),  # meta overwrites `dtype` argument
    )

However, in my understanding using map_blocks as dask.array.map_blocks should be intended behaviour, as an example for this is also included in the docstring of map_blocks. Also, using dask.array.map_blocks in a minimal example in combination with a distributed client and the napari viewer doesn’t seem to break anything:

import dask.array as da
import numpy as np

xn = np.random.randint(0, 100, (20, 30, 30))
xd = da.from_array(xn, chunks=(1, 5, 5))

def func():
    return np.random.randint(0, 100, (1, 5, 5))

xm = da.map_blocks(func, chunks=xd.chunks, dtype=xd.dtype)
napari.view_image(xm)

Something else to consider is the observation by @GenevieveBuckley that calling data.compute() circumvents the problem. So my guess at this point would be that there’s a dask or distributed problem as @jakirkham suggested, potentially related to new graph functionality, potentially occurring when napari first slices and then computes, as @jni suggested.

One could create a hybrid solution. Namely use PIMS just to find shape and dtype info and then use skimage.io.imread or similar to read in each chunk on workers

totally agree. let’s change it!

I’m also think there’s probably no need to involve napari in the example at all, it’s entirely possible that calling data.compute() will trigger the same problem. (Haven’t checked this yet)

Well, @tlambert03’s example (using np.asarray) would seem to disprove that… But I haven’t checked it locally.

Oh yeah, I’d almost forgotten about that. It turns out it’s even more interesting than I’d thought.

ipython --gui=qt

from dask.distributed import Client
import napari
from dask_image.imread import imread

client = Client()
data = imread('*.tif')
napari.view_image(data)  # you get an error

data.compute()  # this works fine
napari.view_image(data)  # works fine now
Error message:
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-9-4d014add55bf> in <module>
----> 1 napari.view_image(data)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/view_layers.py in view_image(data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale, title, ndisplay, order, axis_labels, show)
      7 and the ``Viewer.add_<layer_type>`` methods.  The final generated functions
      8 follow this pattern
----> 9 (where <layer_type> is replaced with one of the layer types):
     10 
     11     def view_<layer_type>(*args, **kwargs):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/components/viewer_model.py in add_image(self, data, channel_axis, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
    674                         "did you mean to specify a 'channel_axis'? "
    675                     )
--> 676             layer = image_class(data, **kwargs)
    677             self.layers.append(layer)
    678 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in __init__(self, data, rgb, colormap, contrast_limits, gamma, interpolation, rendering, iso_threshold, attenuation, name, metadata, scale, translate, rotate, shear, affine, opacity, blending, visible, multiscale)
    274 
    275         # Trigger generation of view slice and thumbnail
--> 276         self._update_dims()
    277 
    278     def _new_empty_slice(self):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in _update_dims(self, event)
    528         self._ndim = ndim
    529 
--> 530         self.refresh()
    531         self._value = self.get_value(self.position, world=True)
    532 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in refresh(self, event)
    938         """Refresh all layer data based on current view slice."""
    939         if self.visible:
--> 940             self.set_view_slice()
    941             self.events.set_data()
    942             self._update_thumbnail()

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/base/base.py in set_view_slice(self)
    798     def set_view_slice(self):
    799         with self.dask_optimized_slicing():
--> 800             self._set_view_slice()
    801 
    802     @abstractmethod

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _set_view_slice(self)
    611         # Load our images, might be sync or async.
    612         data = SliceDataClass(self, image_indices, image, thumbnail_source)
--> 613         self._load_slice(data)
    614 
    615     def _load_slice(self, data: SliceDataClass):

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/image.py in _load_slice(self, data)
    620         data : Slice
    621         """
--> 622         if self._slice.load(data):
    623             # The load was synchronous.
    624             self._on_data_loaded(data, sync=True)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice.py in load(self, data)
    117         """
    118         self.loaded = False  # False until self._on_loaded is calls
--> 119         return self.loader.load(data)
    120 
    121     def on_loaded(self, data: ImageSliceData) -> bool:

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_loader.py in load(self, data)
     20             True if load happened synchronously.
     21         """
---> 22         data.load_sync()
     23         return True
     24 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/napari/layers/image/_image_slice_data.py in load_sync(self)
     38     def load_sync(self) -> None:
     39         """Call asarray on our images to load them."""
---> 40         self.image = np.asarray(self.image)
     41 
     42         if self.thumbnail_source is not None:

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/numpy/core/_asarray.py in asarray(a, dtype, order, like)
    100         return _asarray_with_like(a, dtype=dtype, order=order, like=like)
    101 
--> 102     return array(a, dtype, copy=False, order=order)
    103 
    104 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/array/core.py in __array__(self, dtype, **kwargs)
   1446 
   1447     def __array__(self, dtype=None, **kwargs):
-> 1448         x = self.compute()
   1449         if dtype and x.dtype != dtype:
   1450             x = x.astype(dtype)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    279         dask.base.compute
    280         """
--> 281         (result,) = compute(self, traverse=False, **kwargs)
    282         return result
    283 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    561         postcomputes.append(x.__dask_postcompute__())
    562 
--> 563     results = schedule(dsk, keys, **kwargs)
    564     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    565 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2633         Client.compute : Compute asynchronous collections
   2634         """
-> 2635         futures = self._graph_to_futures(
   2636             dsk,
   2637             keys=set(flatten([keys])),

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
   2541                 dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
   2542 
-> 2543             dsk = highlevelgraph_pack(dsk, self, keyset)
   2544 
   2545             annotations = {}

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/highlevelgraph.py in highlevelgraph_pack(hlg, client, client_keys)
    122             }
    123         )
--> 124     return dumps_msgpack({"layers": layers})
    125 
    126 

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/distributed/protocol/core.py in dumps_msgpack(msg, compression)
    182     """
    183     header = {}
--> 184     payload = msgpack.dumps(msg, default=msgpack_encode_default, use_bin_type=True)
    185 
    186     fmt, payload = maybe_compress(payload, compression=compression)

~/anaconda3/envs/daskimage-issue194/lib/python3.8/site-packages/msgpack/__init__.py in packb(o, **kwargs)
     33     See :class:`Packer` for options.
     34     """
---> 35     return Packer(**kwargs).pack(o)
     36 
     37 

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer.pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

msgpack/_packer.pyx in msgpack._cmsgpack.Packer._pack()

TypeError: can not serialize 'function' object