astropy: Writing (distributed) Dask array fails
Description
Dask array fails to be written to FITS, at least when using dask.distributed. This doesn’t occur for me if I don’t create a dask.distributed Client. That is, the example given in the 'What’s new in 4.1` works for me.
Expected behavior
As per version 4.1, writing dask arrays is supported.
Actual behavior
The following error occurred:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
TypeError: can't pickle _thread.lock objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-44-23f936e3e991> in <module>
3 from astropy.io import fits
4 hdu = fits.PrimaryHDU(data=array)
----> 5 hdu.writeto('test_dask.fits', overwrite=True)
~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
533 warnings.warn(message, warning_type, stacklevel=2)
534
--> 535 return function(*args, **kwargs)
536
537 return wrapper
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in writeto(self, name, output_verify, overwrite, checksum)
370 hdulist = HDUList([self])
371 hdulist.writeto(name, output_verify, overwrite=overwrite,
--> 372 checksum=checksum)
373
374 @classmethod
~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
533 warnings.warn(message, warning_type, stacklevel=2)
534
--> 535 return function(*args, **kwargs)
536
537 return wrapper
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/hdulist.py in writeto(self, fileobj, output_verify, overwrite, checksum)
941 for hdu in self:
942 hdu._prewriteto(checksum=checksum)
--> 943 hdu._writeto(hdulist._file)
944 hdu._postwriteto()
945 hdulist.close(output_verify=output_verify, closed=closed)
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto(self, fileobj, inplace, copy)
675
676 with _free_space_check(self, dirname):
--> 677 self._writeto_internal(fileobj, inplace, copy)
678
679 def _writeto_internal(self, fileobj, inplace, copy):
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto_internal(self, fileobj, inplace, copy)
681 if not inplace or self._new:
682 header_offset, _ = self._writeheader(fileobj)
--> 683 data_offset, data_size = self._writedata(fileobj)
684
685 # Set the various data location attributes on newly-written HDUs
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writedata(self, fileobj)
613 if self._data_loaded or self._data_needs_rescale:
614 if self.data is not None:
--> 615 size += self._writedata_internal(fileobj)
616 # pad the FITS data block
617 if size > 0:
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writedata_internal(self, fileobj)
621 return size
622 elif isinstance(self.data, DaskArray):
--> 623 return self._writeinternal_dask(fileobj)
624 else:
625 # Based on the system type, determine the byteorders that
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writeinternal_dask(self, fileobj)
707 buffer=outmmap)
708
--> 709 output.store(outarr, lock=True, compute=True)
710 finally:
711 if should_close:
~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(self, target, **kwargs)
1387 @wraps(store)
1388 def store(self, target, **kwargs):
-> 1389 r = store([self], [target], **kwargs)
1390
1391 if kwargs.get("return_stored", False):
~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
943
944 if compute:
--> 945 result.compute(**kwargs)
946 return None
947 else:
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
442 postcomputes.append(x.__dask_postcompute__())
443
--> 444 results = schedule(dsk, keys, **kwargs)
445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
446
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2712 retries=retries,
2713 user_priority=priority,
-> 2714 actors=actors,
2715 )
2716 packed = pack_data(keys, futures)
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2639 {
2640 "op": "update-graph",
-> 2641 "tasks": valmap(dumps_task, dsk),
2642 "dependencies": dependencies,
2643 "keys": list(map(tokey, keys)),
~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex, task[1:])):
-> 3358 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
3365 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
3366 """ Dump an object to bytes, warn if those bytes are large """
-> 3367 b = dumps(obj, protocol=4)
3368 if not _warn_dumps_warned[0] and len(b) > limit:
3369 _warn_dumps_warned[0] = True
~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol)
100 with io.BytesIO() as file:
101 cp = CloudPickler(file, protocol=protocol)
--> 102 cp.dump(obj)
103 return file.getvalue()
104
~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
~/miniconda3/lib/python3.7/pickle.py in dump(self, obj)
435 if self.proto >= 4:
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
439 self.framer.end_framing()
~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/miniconda3/lib/python3.7/pickle.py in save_tuple(self, obj)
787 write(MARK)
788 for element in obj:
--> 789 save(element)
790
791 if id(obj) in memo:
~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
522 reduce = getattr(obj, "__reduce_ex__", None)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
526 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle _thread.lock objects
Steps to Reproduce
Following the ‘What’s New in 4.1’:
from dask.distributed import Client
client = Client()
import dask.array as da
array = da.random.random((1000, 1000))
from astropy.io import fits
hdu = fits.PrimaryHDU(data=array)
hdu.writeto('test_dask.fits', overwrite=True)
System Details
Darwin-19.6.0-x86_64-i386-64bit
Python 3.7.6 | packaged by conda-forge | (default, Jun 1 2020, 18:33:30)
[Clang 9.0.1 ]
Numpy 1.19.2
astropy 4.2
Scipy 1.3.1
Matplotlib 3.3.1
Dask 2.17.2
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 1
- Comments: 16 (14 by maintainers)
@pllim have a look at the conversation upstream, in particular https://github.com/dask/dask/pull/1881#issuecomment-287196379 , and see if you think that this is something that can be handled upstream.
As it is, astropy.io.fits doesn’t support dask when used with
dask.distributed, which IMO is the main reason to use dask at all. I’ve been maintaining my own wrapper classes to work around other serialization-unfriendly features of fits.HDUList and friends.No, never mind, it seems to be a Dask bug: https://github.com/dask/distributed/issues/780