iris: Error 'Could not serialize object of type _FillValueMaskCheckAndStore' saving computed cube with Dask distributed
š Bug Report
I donāt seem to be able to save a cube that has been computed on a dask cluster.
To be honest, I donāt know if I should be able to, but if I could it would be really useful.
How To Reproduce
from dask.distributed import Client
client = Client(n_workers=4)
import iris
cube = iris.load_cube('~/environment/data/000490262cdd067721a34112963bcaa2b44860ab.nc')
assert cube.shape == (18, 33, 960, 1280)
averages = cube.collapsed('realization', iris.analysis.MEAN)
assert type(averages) == iris.cube.Cube
iris.io.save(averages, "delme.nc")
Expected behaviour
File is saved without error. This is the behaviour if I donāt start a dask.distributed.Client before invoking Iris.
Environment
- OS & Version: Amazon Linux 2.3 (Centos)
- Iris Version: 3.1.0
Some more relevant versions
# packages in environment at /home/ec2-user/miniconda3/envs/iris:
#
# Name Version Build Channel
cloudpickle 2.0.0 pyhd8ed1ab_0 conda-forge
dask 2021.12.0 pyhd8ed1ab_0 conda-forge
dask-core 2021.12.0 pyhd8ed1ab_0 conda-forge
hdf4 4.2.15 h10796ff_3 conda-forge
hdf5 1.12.1 nompi_h2750804_103 conda-forge
ipykernel 6.6.1 py310hfdc917e_0 conda-forge
ipython 7.31.0 py310hff52083_0 conda-forge
iris 3.1.0 pyhd8ed1ab_3 conda-forge
numpy 1.22.0 py310h454958d_0 conda-forge
pandas 1.3.5 py310hb5077e9_0 conda-forge
pickleshare 0.7.5 py_1003 conda-forge
python 3.10.1 h62f1059_2_cpython conda-forge
scipy 1.7.3 py310hea5193d_0 conda-forge
Additional context
Stack trace
distributed.protocol.core - CRITICAL - Failed to Serialize
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.comm.utils - ERROR - ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
distributed.batched - ERROR - Error in batched write
Traceback (most recent call last):
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/batched.py", line 93, in _background_send
nbytes = yield self.comm.write(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py", line 762, in run
value = future.result()
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/tcp.py", line 250, in write
frames = await to_frames(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 50, in to_frames
return _to_frames()
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/comm/utils.py", line 33, in _to_frames
return list(protocol.dumps(msg, **kwargs))
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 76, in dumps
frames[0] = msgpack.dumps(msg, default=_encode_default, use_bin_type=True)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/msgpack/__init__.py", line 35, in packb
return Packer(**kwargs).pack(o)
File "msgpack/_packer.pyx", line 294, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 300, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 297, in msgpack._cmsgpack.Packer.pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 264, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 231, in msgpack._cmsgpack.Packer._pack
File "msgpack/_packer.pyx", line 285, in msgpack._cmsgpack.Packer._pack
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/core.py", line 57, in _encode_default
sub_header, sub_frames = serialize_and_split(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 435, in serialize_and_split
header, frames = serialize(x, serializers, on_error, context)
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 262, in serialize
return serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 308, in serialize
headers_frames = [
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 309, in <listcomp>
serialize(
File "/home/ec2-user/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/protocol/serialize.py", line 359, in serialize
raise TypeError(msg, str(x)[:10000])
TypeError: ('Could not serialize object of type _FillValueMaskCheckAndStoreTarget.', '<iris.fileformats.netcdf._FillValueMaskCheckAndStoreTarget object at 0x7f8254a6a620>')
---------------------------------------------------------------------------
CancelledError Traceback (most recent call last)
<timed exec> in <module>
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/io/__init__.py in save(source, target, saver, **kwargs)
426 # Single cube?
427 if isinstance(source, Cube):
--> 428 saver(source, target, **kwargs)
429
430 # CubeList or sequence of cubes?
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in save(cube, filename, netcdf_format, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
2770 # Iterate through the cubelist.
2771 for cube, packspec, fill_value in zip(cubes, packspecs, fill_values):
-> 2772 sman.write(
2773 cube,
2774 local_keys,
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in write(self, cube, local_keys, unlimited_dimensions, zlib, complevel, shuffle, fletcher32, contiguous, chunksizes, endian, least_significant_digit, packing, fill_value)
1150
1151 # Create the associated cube CF-netCDF data variable.
-> 1152 cf_var_cube = self._create_cf_data_variable(
1153 cube,
1154 dimension_names,
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in _create_cf_data_variable(self, cube, dimension_names, local_keys, packing, fill_value, **kwargs)
2417
2418 # Store the data and check if it is masked and contains the fill value
-> 2419 is_masked, contains_fill_value = store(
2420 data, cf_var, fill_value_to_check
2421 )
~/miniconda3/envs/iris/lib/python3.10/site-packages/iris/fileformats/netcdf.py in store(data, cf_var, fill_value)
2395 # the fill value
2396 target = _FillValueMaskCheckAndStoreTarget(cf_var, fill_value)
-> 2397 da.store([data], [target])
2398 return target.is_masked, target.contains_value
2399
~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
1116 elif compute:
1117 store_dsk = HighLevelGraph(layers, dependencies)
-> 1118 compute_as_if_collection(Array, store_dsk, map_keys, **kwargs)
1119 return None
1120
~/miniconda3/envs/iris/lib/python3.10/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
313 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
314 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 315 return schedule(dsk2, keys, **kwargs)
316
317
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2689 should_rejoin = False
2690 try:
-> 2691 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2692 finally:
2693 for f in futures.values():
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1944 else:
1945 local_worker = None
-> 1946 return self.sync(
1947 self._gather,
1948 futures,
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
308 return future
309 else:
--> 310 return sync(
311 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
312 )
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
362 if error[0]:
363 typ, exc, tb = error[0]
--> 364 raise exc.with_traceback(tb)
365 else:
366 return result[0]
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/utils.py in f()
347 if callback_timeout is not None:
348 future = asyncio.wait_for(future, callback_timeout)
--> 349 result[0] = yield future
350 except Exception:
351 error[0] = sys.exc_info()
~/miniconda3/envs/iris/lib/python3.10/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/iris/lib/python3.10/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1810 else:
1811 raise exception.with_traceback(traceback)
-> 1812 raise exc
1813 if errors == "skip":
1814 bad_keys.add(key)
CancelledError: ('store-map-ff2955f4724c217f42a2a75fc58e80e8', 1, 0, 0)
About this issue
- Original URL
- State: open
- Created 2 years ago
- Comments: 18 (8 by maintainers)
Hi @dmcg, thanks for raising this issue with Iris! As per @wjbenfold, Iāve also been able to locally reproduce this error, again with the
A1B_north_america.nc
file. Hopefully Iāll also be able to add some more context to the error, a workaround you can use immediately, and a suggestion for how we can fix Iris to stop this happening in general.Iāll start with the workaround, as it has the most immediate value. If you donāt use distributed (i.e. donāt create a
Client
), you wonāt get the error (the reason for this will hopefully become clear in the context). If you just use plain Iris you will automatically get dask local parallel processing, so you should see the same parallel performance for your collapse operation with or without distributed. As youāre using a single EC2 (I think) you wonāt lose performance for distributing your processing over multiple machines - although obviously this workaround wonāt scale if you do move to a cluster of machines.So, the following code should run without error:
Your other option is to realise the data before you save it - that is, load the data into memory after you collapse but before you save. Note this will only work if thereās enough memory on the machine to store the collapsed cubeās data. For example:
The reason for the difference between using distributed and not is that distributed communications always run over network links - even when client, scheduler and workers are all on the same machine. There are certainly some advantages to using a local cluster over dask multiprocessing (and itās the preferred solution in the dask docs), but it can be more unreliable.
One example of this is that the network communications cause extra constraints on how data is moved between workers. By default a distributed network runs over TCP, which transmits frames of bytes. Python objects in memory or on disk must be translated to bytes before being transmitted, and translated back from bytes on receipt. These processes are serialization and deserialization respectively, and itās this step thatās failing here. The way that Python objects are serialized for TCP transmission is by first being pickled, and apparently the problem class here is one that cannot be pickled. Taking a look through the Iris source code it looks like the problem class is only used for NetCDF save, so the scope of the problem is small - but still annoying if you canāt save your cube!
I think the longer-term solution to this will be to make the problem class serializable. This should be achievable by overriding the classās
__getstate__
and__setstate__
methods - the trick will be ensuring that the correct state is captured by these methods. Iāll have a goā¦Thanks for this. Loading into local memory should be possible for me, so happy to have this as just an irrititating edge-case.
Hi, thanks for the response.
This is running in EC2 (Cloud9 fwiw), but my understanding of that
Client(n_workers=4)
line is that the Dask āclusterā will just be on the Jupyter machine, maybe even in the same process, so the pickled objects wonāt really touch the cloud? Or, put another way, my ground is in the cloud!Iām out for a couple of days, but happy to help next week, maybe at least check I can reproduce with other NetCDF files. Iāll also speak with my clients about sharing the repo with the data and code.