iris: Error 'Could not serialize object of type _FillValueMaskCheckAndStore' saving computed cube with Dask distributed

šŸ› Bug Report

I don’t seem to be able to save a cube that has been computed on a dask cluster.

To be honest, I don’t know if I should be able to, but if I could it would be really useful.

How To Reproduce

from dask.distributed import Client
client = Client(n_workers=4)

import iris
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
assert cube.shape == (18, 33, 960, 1280)
averages = cube.collapsed('realization', iris.analysis.MEAN)
assert type(averages) == iris.cube.Cube
iris.io.save(averages, "delme.nc")

Expected behaviour

File is saved without error. This is the behaviour if I don’t start a dask.distributed.Client before invoking Iris.

Environment

  • OS & Version: Amazon Linux 2.3 (Centos)
  • Iris Version: 3.1.0

Some more relevant versions

# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name                    Version                   Build  Channel
cloudpickle               2.0.0              pyhd8ed1ab_0    conda-forge
dask                      2021.12.0          pyhd8ed1ab_0    conda-forge
dask-core                 2021.12.0          pyhd8ed1ab_0    conda-forge
hdf4                      4.2.15               h10796ff_3    conda-forge
hdf5                      1.12.1          nompi_h2750804_103    conda-forge
ipykernel                 6.6.1           py310hfdc917e_0    conda-forge
ipython                   7.31.0          py310hff52083_0    conda-forge
iris                      3.1.0              pyhd8ed1ab_3    conda-forge
numpy                     1.22.0          py310h454958d_0    conda-forge
pandas                    1.3.5           py310hb5077e9_0    conda-forge
pickleshare               0.7.5                   py_1003    conda-forge
python                    3.10.1          h62f1059_2_cpython    conda-forge
scipy                     1.7.3           py310hea5193d_0    conda-forge

Additional context

Stack trace

distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.comm.utils - ERROR - ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/batched.py", line 93, in _background_send
    nbytes = yield self.comm.write(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
    value = future.result()
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/tcp.py", line 250, in write
    frames = await to_frames(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 50, in to_frames
    return _to_frames()
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
    return list(protocol.dumps(msg, **kwargs))
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
    frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
    return Packer(**kwargs).pack(o)
  File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
  File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
    sub_header, sub_frames = serialize_and_split(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
    header, frames = serialize(x, serializers, on_error, context)
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
    return serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
    headers_frames = [
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
    serialize(
  File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
    raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')

---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<timed exec> in <module>

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/io/__init__.py in save(source, target, saver, **kwargs)
    426     # Single cube?
    427     if isinstance(source, Cube):
--> 428         saver(source, target, **kwargs)
    429 
    430     # CubeList or sequence of cubes?

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in save(cube, filename, netcdf_format, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
   2770         # Iterate through the cubelist.
   2771         for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
-> 2772             sman.write(
   2773                 cube,
   2774                 local_keys,

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in write(self, cube, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
   1150 
   1151         # Create the associated cube CF-netCDF data variable.
-> 1152         cf_var_cube = self._create_cf_data_variable(
   1153             cube,
   1154             dimension_names,

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in _create_cf_data_variable(self, cube, dimension_names, local_keys, packing, fill_value, **kwargs)
   2417 
   2418         # Store the data and check if it is masked and contains the fill value
-> 2419         is_masked, contains_fill_value = store(
   2420             data, cf_var, fill_value_to_check
   2421         )

~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in store(data, cf_var, fill_value)
   2395                 # the fill value
   2396                 target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
-> 2397                 da.store([data], [target])
   2398                 return target.is_masked, target.contains_value
   2399 

~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
   1116     elif compute:
   1117         store_dsk = HighLevelGraph(layers, dependencies)
-> 1118         compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
   1119         return None
   1120 

~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
    313     schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
    314     dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315     return schedule(dsk2, keys, **kwargs)
    316 
    317 

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2689                     should_rejoin = False
   2690             try:
-> 2691                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2692             finally:
   2693                 for f in futures.values():

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1944             else:
   1945                 local_worker = None
-> 1946             return self.sync(
   1947                 self._gather,
   1948                 futures,

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    308             return future
    309         else:
--> 310             return sync(
    311                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    312             )

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    362     if error[0]:
    363         typ, exc, tb = error[0]
--> 364         raise exc.with_traceback(tb)
    365     else:
    366         return result[0]

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in f()
    347             if callback_timeout is not None:
    348                 future = asyncio.wait_for(future, callback_timeout)
--> 349             result[0] = yield future
    350         except Exception:
    351             error[0] = sys.exc_info()

~/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1810                         else:
   1811                             raise exception.with_traceback(traceback)
-> 1812                         raise exc
   1813                     if errors == "skip":
   1814                         bad_keys.add(key)

CancelledError: ('store-map-ff2955f4724c217f42a2a75fc58e80e8', 1, 0, 0)

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Comments: 18 (8 by maintainers)

Most upvoted comments

Hi @dmcg, thanks for raising this issue with Iris! As per @wjbenfold, I’ve also been able to locally reproduce this error, again with the A1B_north_america.nc file. Hopefully I’ll also be able to add some more context to the error, a workaround you can use immediately, and a suggestion for how we can fix Iris to stop this happening in general.

I’ll start with the workaround, as it has the most immediate value. If you don’t use distributed (i.e. don’t create a Client), you won’t get the error (the reason for this will hopefully become clear in the context). If you just use plain Iris you will automatically get dask local parallel processing, so you should see the same parallel performance for your collapse operation with or without distributed. As you’re using a single EC2 (I think) you won’t lose performance for distributing your processing over multiple machines - although obviously this workaround won’t scale if you do move to a cluster of machines.

So, the following code should run without error:

import iris

cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)
iris.save(averages, "delme.nc")

Your other option is to realise the data before you save it - that is, load the data into memory after you collapse but before you save. Note this will only work if there’s enough memory on the machine to store the collapsed cube’s data. For example:

import iris
from distributed import Client

client = Client(n_workers=4)

cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
averages = cube.collapsed('realization', iris.analysis.MEAN)

averages.data  # <-- This line will load the data and force processing of the collapse operation before you save.

iris.save(averages, "delme.nc")

The reason for the difference between using distributed and not is that distributed communications always run over network links - even when client, scheduler and workers are all on the same machine. There are certainly some advantages to using a local cluster over dask multiprocessing (and it’s the preferred solution in the dask docs), but it can be more unreliable.

One example of this is that the network communications cause extra constraints on how data is moved between workers. By default a distributed network runs over TCP, which transmits frames of bytes. Python objects in memory or on disk must be translated to bytes before being transmitted, and translated back from bytes on receipt. These processes are serialization and deserialization respectively, and it’s this step that’s failing here. The way that Python objects are serialized for TCP transmission is by first being pickled, and apparently the problem class here is one that cannot be pickled. Taking a look through the Iris source code it looks like the problem class is only used for NetCDF save, so the scope of the problem is small - but still annoying if you can’t save your cube!

I think the longer-term solution to this will be to make the problem class serializable. This should be achievable by overriding the class’s __getstate__ and __setstate__ methods - the trick will be ensuring that the correct state is captured by these methods. I’ll have a go…

Thanks for this. Loading into local memory should be possible for me, so happy to have this as just an irrititating edge-case.

Hi, thanks for the response.

This is running in EC2 (Cloud9 fwiw), but my understanding of that Client(n_workers=4) line is that the Dask ā€˜cluster’ will just be on the Jupyter machine, maybe even in the same process, so the pickled objects won’t really touch the cloud? Or, put another way, my ground is in the cloud!

I’m out for a couple of days, but happy to help next week, maybe at least check I can reproduce with other NetCDF files. I’ll also speak with my clients about sharing the repo with the data and code.