astropy: Writing (distributed) Dask array fails
Description
Dask array fails to be written to FITS, at least when using dask.distributed
. This doesn’t occur for me if I don’t create a dask.distributed
Client
. That is, the example given in the 'What’s new in 4.1` works for me.
Expected behavior
As per version 4.1, writing dask arrays is supported.
Actual behavior
The following error occurred:
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
TypeError: can't pickle _thread.lock objects
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-44-23f936e3e991> in <module>
3 from astropy.io import fits
4 hdu = fits.PrimaryHDU(data=array)
----> 5 hdu.writeto('test_dask.fits', overwrite=True)
~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
533 warnings.warn(message, warning_type, stacklevel=2)
534
--> 535 return function(*args, **kwargs)
536
537 return wrapper
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in writeto(self, name, output_verify, overwrite, checksum)
370 hdulist = HDUList([self])
371 hdulist.writeto(name, output_verify, overwrite=overwrite,
--> 372 checksum=checksum)
373
374 @classmethod
~/miniconda3/lib/python3.7/site-packages/astropy/utils/decorators.py in wrapper(*args, **kwargs)
533 warnings.warn(message, warning_type, stacklevel=2)
534
--> 535 return function(*args, **kwargs)
536
537 return wrapper
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/hdulist.py in writeto(self, fileobj, output_verify, overwrite, checksum)
941 for hdu in self:
942 hdu._prewriteto(checksum=checksum)
--> 943 hdu._writeto(hdulist._file)
944 hdu._postwriteto()
945 hdulist.close(output_verify=output_verify, closed=closed)
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto(self, fileobj, inplace, copy)
675
676 with _free_space_check(self, dirname):
--> 677 self._writeto_internal(fileobj, inplace, copy)
678
679 def _writeto_internal(self, fileobj, inplace, copy):
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writeto_internal(self, fileobj, inplace, copy)
681 if not inplace or self._new:
682 header_offset, _ = self._writeheader(fileobj)
--> 683 data_offset, data_size = self._writedata(fileobj)
684
685 # Set the various data location attributes on newly-written HDUs
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/base.py in _writedata(self, fileobj)
613 if self._data_loaded or self._data_needs_rescale:
614 if self.data is not None:
--> 615 size += self._writedata_internal(fileobj)
616 # pad the FITS data block
617 if size > 0:
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writedata_internal(self, fileobj)
621 return size
622 elif isinstance(self.data, DaskArray):
--> 623 return self._writeinternal_dask(fileobj)
624 else:
625 # Based on the system type, determine the byteorders that
~/miniconda3/lib/python3.7/site-packages/astropy/io/fits/hdu/image.py in _writeinternal_dask(self, fileobj)
707 buffer=outmmap)
708
--> 709 output.store(outarr, lock=True, compute=True)
710 finally:
711 if should_close:
~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(self, target, **kwargs)
1387 @wraps(store)
1388 def store(self, target, **kwargs):
-> 1389 r = store([self], [target], **kwargs)
1390
1391 if kwargs.get("return_stored", False):
~/miniconda3/lib/python3.7/site-packages/dask/array/core.py in store(sources, targets, lock, regions, compute, return_stored, **kwargs)
943
944 if compute:
--> 945 result.compute(**kwargs)
946 return None
947 else:
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(self, **kwargs)
164 dask.base.compute
165 """
--> 166 (result,) = compute(self, traverse=False, **kwargs)
167 return result
168
~/miniconda3/lib/python3.7/site-packages/dask/base.py in compute(*args, **kwargs)
442 postcomputes.append(x.__dask_postcompute__())
443
--> 444 results = schedule(dsk, keys, **kwargs)
445 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
446
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2712 retries=retries,
2713 user_priority=priority,
-> 2714 actors=actors,
2715 )
2716 packed = pack_data(keys, futures)
~/miniconda3/lib/python3.7/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, restrictions, loose_restrictions, priority, user_priority, resources, retries, fifo_timeout, actors)
2639 {
2640 "op": "update-graph",
-> 2641 "tasks": valmap(dumps_task, dsk),
2642 "dependencies": dependencies,
2643 "keys": list(map(tokey, keys)),
~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/lib/python3.7/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in dumps_task(task)
3356 return d
3357 elif not any(map(_maybe_complex, task[1:])):
-> 3358 return {"function": dumps_function(task[0]), "args": warn_dumps(task[1:])}
3359 return to_serialize(task)
3360
~/miniconda3/lib/python3.7/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
3365 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
3366 """ Dump an object to bytes, warn if those bytes are large """
-> 3367 b = dumps(obj, protocol=4)
3368 if not _warn_dumps_warned[0] and len(b) > limit:
3369 _warn_dumps_warned[0] = True
~/miniconda3/lib/python3.7/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol)
100 with io.BytesIO() as file:
101 cp = CloudPickler(file, protocol=protocol)
--> 102 cp.dump(obj)
103 return file.getvalue()
104
~/miniconda3/lib/python3.7/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
~/miniconda3/lib/python3.7/pickle.py in dump(self, obj)
435 if self.proto >= 4:
436 self.framer.start_framing()
--> 437 self.save(obj)
438 self.write(STOP)
439 self.framer.end_framing()
~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
502 f = self.dispatch.get(t)
503 if f is not None:
--> 504 f(self, obj) # Call unbound method with explicit self
505 return
506
~/miniconda3/lib/python3.7/pickle.py in save_tuple(self, obj)
787 write(MARK)
788 for element in obj:
--> 789 save(element)
790
791 if id(obj) in memo:
~/miniconda3/lib/python3.7/pickle.py in save(self, obj, save_persistent_id)
522 reduce = getattr(obj, "__reduce_ex__", None)
523 if reduce is not None:
--> 524 rv = reduce(self.proto)
525 else:
526 reduce = getattr(obj, "__reduce__", None)
TypeError: can't pickle _thread.lock objects
Steps to Reproduce
Following the ‘What’s New in 4.1’:
from dask.distributed import Client
client = Client()
import dask.array as da
array = da.random.random((1000, 1000))
from astropy.io import fits
hdu = fits.PrimaryHDU(data=array)
hdu.writeto('test_dask.fits', overwrite=True)
System Details
Darwin-19.6.0-x86_64-i386-64bit
Python 3.7.6 | packaged by conda-forge | (default, Jun 1 2020, 18:33:30)
[Clang 9.0.1 ]
Numpy 1.19.2
astropy 4.2
Scipy 1.3.1
Matplotlib 3.3.1
Dask 2.17.2
About this issue
- Original URL
- State: open
- Created 4 years ago
- Reactions: 1
- Comments: 16 (14 by maintainers)
@pllim have a look at the conversation upstream, in particular https://github.com/dask/dask/pull/1881#issuecomment-287196379 , and see if you think that this is something that can be handled upstream.
As it is, astropy.io.fits doesn’t support dask when used with
dask.distributed
, which IMO is the main reason to use dask at all. I’ve been maintaining my own wrapper classes to work around other serialization-unfriendly features of fits.HDUList and friends.No, never mind, it seems to be a Dask bug: https://github.com/dask/distributed/issues/780