astropy: Writing (distributed) Dask array fails

Description

Dask array fails to be written to FITS, at least when using dask.distributed. This doesn’t occur for me if I don’t create a dask.distributed Client. That is, the example given in the 'What’s new in 4.1` works for me.

Expected behavior

As per version 4.1, writing dask arrays is supported.

Actual behavior

The following error occurred:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     48         buffers.clear()
---> 49         result = pickle.dumps(x, **dump_kwargs)
     50         if len(result) < 1000:

TypeError: can't pickle _thread.lock objects

During handling of the above exception, another exception occurred:

TypeError                                 Traceback (most recent call last)
<ipython-input-44-23f936e3e991> in <module>
      3 from astropy.io import fits
      4 hdu = fits.PrimaryHDU(data=array)
----> 5 hdu.writeto('test_dask.fits', overwrite=True)

~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
    533                     warnings.warn(message, warning_type, stacklevel=2)
    534 
--> 535             return function(*args, **kwargs)
    536 
    537         return wrapper

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in writeto(self, name, output_verify, overwrite, checksum)
    370         hdulist = HDUList([self])
    371         hdulist.writeto(name, output_verify, overwrite=overwrite,
--> 372                         checksum=checksum)
    373 
    374     @classmethod

~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
    533                     warnings.warn(message, warning_type, stacklevel=2)
    534 
--> 535             return function(*args, **kwargs)
    536 
    537         return wrapper

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/hdulist.py in writeto(self, fileobj, output_verify, overwrite, checksum)
    941             for hdu in self:
    942                 hdu._prewriteto(checksum=checksum)
--> 943                 hdu._writeto(hdulist._file)
    944                 hdu._postwriteto()
    945         hdulist.close(output_verify=output_verify, closed=closed)

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto(self, fileobj, inplace, copy)
    675 
    676         with _free_space_check(self, dirname):
--> 677             self._writeto_internal(fileobj, inplace, copy)
    678 
    679     def _writeto_internal(self, fileobj, inplace, copy):

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto_internal(self, fileobj, inplace, copy)
    681         if not inplace or self._new:
    682             header_offset, _ = self._writeheader(fileobj)
--> 683             data_offset, data_size = self._writedata(fileobj)
    684 
    685             # Set the various data location attributes on newly-written HDUs

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writedata(self, fileobj)
    613         if self._data_loaded or self._data_needs_rescale:
    614             if self.data is not None:
--> 615                 size += self._writedata_internal(fileobj)
    616             # pad the FITS data block
    617             if size > 0:

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writedata_internal(self, fileobj)
    621             return size
    622         elif isinstance(self.data, DaskArray):
--> 623             return self._writeinternal_dask(fileobj)
    624         else:
    625             # Based on the system type, determine the byteorders that

~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writeinternal_dask(self, fileobj)
    707                                 buffer=outmmap)
    708 
--> 709             output.store(outarr, lock=True, compute=True)
    710         finally:
    711             if should_close:

~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(self, target, **kwargs)
   1387     @wraps(store)
   1388     def store(self, target, **kwargs):
-> 1389         r = store([self], [target], **kwargs)
   1390 
   1391         if kwargs.get("return_stored", False):

~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
    943 
    944         if compute:
--> 945             result.compute(**kwargs)
    946             return None
    947         else:

~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
    164         dask.base.compute
    165         """
--> 166         (result,) = compute(self, traverse=False, **kwargs)
    167         return result
    168 

~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
    442         postcomputes.append(x.__dask_postcompute__())
    443 
--> 444     results = schedule(dsk, keys, **kwargs)
    445     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    446 

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2712             retries=retries,
   2713             user_priority=priority,
-> 2714             actors=actors,
   2715         )
   2716         packed = pack_data(keys, futures)

~/miniconda3/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
   2639                 {
   2640                     "op": "update-graph",
-> 2641                     "tasks": valmap(dumps_task, dsk),
   2642                     "dependencies": dependencies,
   2643                     "keys": list(map(tokey, keys)),

~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()

~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
   3356             return d
   3357         elif not any(map(_maybe_complex, task[1:])):
-> 3358             return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
   3359     return to_serialize(task)
   3360 

~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
   3365 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
   3366     """ Dump an object to bytes, warn if those bytes are large """
-> 3367     b = dumps(obj, protocol=4)
   3368     if not _warn_dumps_warned[0] and len(b) > limit:
   3369         _warn_dumps_warned[0] = True

~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
     58         try:
     59             buffers.clear()
---> 60             result = cloudpickle.dumps(x, **dump_kwargs)
     61         except Exception as e:
     62             logger.info("Failed to serialize %s. Exception: %s", x, e)

~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol)
    100         with io.BytesIO() as file:
    101             cp = CloudPickler(file, protocol=protocol)
--> 102             cp.dump(obj)
    103             return file.getvalue()
    104 

~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
    561     def dump(self, obj):
    562         try:
--> 563             return Pickler.dump(self, obj)
    564         except RuntimeError as e:
    565             if "recursion" in e.args[0]:

~/miniconda3/lib/python3.7/pickle.py in dump(self, obj)
    435         if self.proto >= 4:
    436             self.framer.start_framing()
--> 437         self.save(obj)
    438         self.write(STOP)
    439         self.framer.end_framing()

~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    502         f = self.dispatch.get(t)
    503         if f is not None:
--> 504             f(self, obj) # Call unbound method with explicit self
    505             return
    506 

~/miniconda3/lib/python3.7/pickle.py in save_tuple(self, obj)
    787         write(MARK)
    788         for element in obj:
--> 789             save(element)
    790 
    791         if id(obj) in memo:

~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
    522             reduce = getattr(obj, "__reduce_ex__", None)
    523             if reduce is not None:
--> 524                 rv = reduce(self.proto)
    525             else:
    526                 reduce = getattr(obj, "__reduce__", None)

TypeError: can't pickle _thread.lock objects

Steps to Reproduce

Following the ‘What’s New in 4.1’:

from dask.distributed import Client
client = Client()
import dask.array as da
array = da.random.random((1000, 1000))
from astropy.io import fits
hdu = fits.PrimaryHDU(data=array)
hdu.writeto('test_dask.fits', overwrite=True)

System Details

Darwin-19.6.0-x86_64-i386-64bit
Python 3.7.6 | packaged by conda-forge | (default, Jun  1 2020, 18:33:30) 
[Clang 9.0.1 ]
Numpy 1.19.2
astropy 4.2
Scipy 1.3.1
Matplotlib 3.3.1
Dask 2.17.2

About this issue

  • Original URL
  • State: open
  • Created 4 years ago
  • Reactions: 1
  • Comments: 16 (14 by maintainers)

Most upvoted comments

@pllim have a look at the conversation upstream, in particular https://github.com/dask/dask/pull/1881#issuecomment-287196379 , and see if you think that this is something that can be handled upstream.

As it is, astropy.io.fits doesn’t support dask when used with dask.distributed, which IMO is the main reason to use dask at all. I’ve been maintaining my own wrapper classes to work around other serialization-unfriendly features of fits.HDUList and friends.

No, never mind, it seems to be a Dask bug: https://github.com/dask/distributed/issues/780