xarray: Dataset to zarr not working with newest s3fs Storage (s3fs > 0.5.0)
What happened: A call to zarr storage to mino instance fails with newest s3fs storage (s3fs > 0.5.0) while perfectly working with s3fs == 0.4.0. Might be due to s3fs’ reliance on aiobotocore as Traceback.
Traceback (most recent call last):
Exception in callback <TaskStepMethWrapper object at 0x7fbe34300350>()
handle: <Handle <TaskStepMethWrapper object at 0x7fbe34300350>()>
Traceback (most recent call last):
File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
RuntimeError: Leaving task <Task finished coro=<sync.<locals>.f() done, defined at /usr/local/lib/python3.7/site-packages/fsspec/asyn.py:43> result=None> does not match the current task None.
File "test_without.py", line 27, in <module>
array.to_dataset().to_zarr(store=s3store, mode="w", consolidated=True, compute=True)
File "/usr/local/lib/python3.7/site-packages/xarray/core/dataset.py", line 1662, in to_zarr
append_dim=append_dim,
File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 1369, in to_zarr
writes = writer.sync(compute=compute)
File "/usr/local/lib/python3.7/site-packages/xarray/backends/common.py", line 161, in sync
regions=self.regions,
File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 981, in store
result.compute(**kwargs)
File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 167, in compute
(result,) = compute(self, traverse=False, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 452, in compute
results = schedule(dsk, keys, **kwargs)
File "/usr/local/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
**kwargs
File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 486, in get_async
raise_exception(exc, tb)
File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 316, in reraise
raise exc
File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
result = _execute_task(task, data)
File "/usr/local/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 3722, in store_chunk
return load_store_chunk(x, out, index, lock, return_stored, False)
File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 3711, in load_store_chunk
out[index] = np.asanyarray(x)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1115, in __setitem__
self.set_basic_selection(selection, value, fields=fields)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1210, in set_basic_selection
return self._set_basic_selection_nd(selection, value, fields=fields)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1501, in _set_basic_selection_nd
self._set_selection(indexer, value, fields=fields)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1550, in _set_selection
self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1665, in _chunk_setitem
fields=fields)
File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1729, in _chunk_setitem_nosync
self.chunk_store[ckey] = cdata
File "/usr/local/lib/python3.7/site-packages/fsspec/mapping.py", line 150, in __setitem__
self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 1066, in mkdirs
return self.makedirs(path, exist_ok=exist_ok)
File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 488, in makedirs
self.mkdir(path, create_parents=True)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 118, in wrapper
return maybe_sync(func, self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 97, in maybe_sync
return sync(loop, func, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 68, in sync
raise exc.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 52, in f
result[0] = await future
File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 462, in _mkdir
if not key or (create_parents and not self.exists(bucket)):
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 118, in wrapper
return maybe_sync(func, self, *args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 90, in maybe_sync
return _run_until_done(func(*args, **kwargs))
File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 28, in _run_until_done
loop._run_once()
File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1771, in _run_once
handle = self._ready.popleft()
IndexError: pop from an empty deque
What you expected to happen:
Storage succeeding. No Exception.
Minimal Complete Verifiable Example:
# Put your MCVE code here
import dask.array as da
import xarray
import s3fs
import xarray as xr
bucket = "zarr"
name = "sample"
nana = xarray.DataArray(da.zeros((1023,1023,3)))
s3_path = f"{bucket}/{name}"
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "s3://minio:9000"}, username="weak_access_key", password="weak_secret_key")
s3store = s3.get_mapper(s3_path)
print("Storing")
nana.to_dataset().to_zarr(store=s3store, mode="w", consolidated=True, compute=True)
print("Getting")
Anything else we need to know?:
Thanks and keep up the great work!
Environment:
dask==2.28.0 xarray==0.16.1 aiobotocore==1.1.1 fsspec==0.8.3 zarr==2.4.0 s3fs==0.5.1
Output of <tt>xr.show_versions()</tt>
INSTALLED VERSIONS
commit: None python: 3.7.8 (default, Jun 30 2020, 18:27:23) [GCC 8.3.0] python-bits: 64 OS: Linux OS-release: 4.19.104-microsoft-standard machine: x86_64 processor: byteorder: little LC_ALL: None LANG: C.UTF-8 LOCALE: en_US.UTF-8 libhdf5: None libnetcdf: None
xarray: 0.16.1 pandas: 1.1.2 numpy: 1.19.2 scipy: 1.5.2 netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.4.0 cftime: None nc_time_axis: None PseudoNetCDF: None rasterio: None cfgrib: None iris: None bottleneck: None dask: 2.28.0 distributed: 2.28.0 matplotlib: 3.3.2 cartopy: None seaborn: 0.11.0 numbagg: None pint: None setuptools: 47.3.1 pip: 20.1.1 conda: None pytest: 6.1.0 IPython: 7.18.1 sphinx: None
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Comments: 15 (10 by maintainers)
Commits related to this issue
- Experiment to fix s3fs/pull/374 Fixes https://github.com/pydata/xarray/issues/4478 Fixes https://github.com/dask/s3fs/pull/374 — committed to martindurant/filesystem_spec by deleted user 4 years ago
Can you confirm that this works ok with fsspec and s3fs master?