xarray: Dataset to zarr not working with newest s3fs Storage (s3fs > 0.5.0)

What happened: A call to zarr storage to mino instance fails with newest s3fs storage (s3fs > 0.5.0) while perfectly working with s3fs == 0.4.0. Might be due to s3fs’ reliance on aiobotocore as Traceback.

Traceback (most recent call last):
Exception in callback <TaskStepMethWrapper object at 0x7fbe34300350>()
handle: <Handle <TaskStepMethWrapper object at 0x7fbe34300350>()>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
RuntimeError: Leaving task <Task finished coro=<sync.<locals>.f() done, defined at /usr/local/lib/python3.7/site-packages/fsspec/asyn.py:43> result=None> does not match the current task None.
  File "test_without.py", line 27, in <module>
    array.to_dataset().to_zarr(store=s3store, mode="w", consolidated=True, compute=True)
  File "/usr/local/lib/python3.7/site-packages/xarray/core/dataset.py", line 1662, in to_zarr
    append_dim=append_dim,
  File "/usr/local/lib/python3.7/site-packages/xarray/backends/api.py", line 1369, in to_zarr
    writes = writer.sync(compute=compute)
  File "/usr/local/lib/python3.7/site-packages/xarray/backends/common.py", line 161, in sync
    regions=self.regions,
  File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 981, in store
    result.compute(**kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 167, in compute
    (result,) = compute(self, traverse=False, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask/base.py", line 452, in compute
    results = schedule(dsk, keys, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/dask/threaded.py", line 84, in get
    **kwargs
  File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 486, in get_async
    raise_exception(exc, tb)
  File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 316, in reraise
    raise exc
  File "/usr/local/lib/python3.7/site-packages/dask/local.py", line 222, in execute_task
    result = _execute_task(task, data)
  File "/usr/local/lib/python3.7/site-packages/dask/core.py", line 121, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 3722, in store_chunk
    return load_store_chunk(x, out, index, lock, return_stored, False)
  File "/usr/local/lib/python3.7/site-packages/dask/array/core.py", line 3711, in load_store_chunk
    out[index] = np.asanyarray(x)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1115, in __setitem__
    self.set_basic_selection(selection, value, fields=fields)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1210, in set_basic_selection
    return self._set_basic_selection_nd(selection, value, fields=fields)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1501, in _set_basic_selection_nd
    self._set_selection(indexer, value, fields=fields)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1550, in _set_selection
    self._chunk_setitem(chunk_coords, chunk_selection, chunk_value, fields=fields)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1665, in _chunk_setitem
    fields=fields)
  File "/usr/local/lib/python3.7/site-packages/zarr/core.py", line 1729, in _chunk_setitem_nosync
    self.chunk_store[ckey] = cdata
  File "/usr/local/lib/python3.7/site-packages/fsspec/mapping.py", line 150, in __setitem__
    self.fs.mkdirs(self.fs._parent(key), exist_ok=True)
  File "/usr/local/lib/python3.7/site-packages/fsspec/spec.py", line 1066, in mkdirs
    return self.makedirs(path, exist_ok=exist_ok)
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 488, in makedirs
    self.mkdir(path, create_parents=True)
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 118, in wrapper
    return maybe_sync(func, self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 97, in maybe_sync
    return sync(loop, func, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 68, in sync
    raise exc.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 52, in f
    result[0] = await future
  File "/usr/local/lib/python3.7/site-packages/s3fs/core.py", line 462, in _mkdir
    if not key or (create_parents and not self.exists(bucket)):
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 118, in wrapper
    return maybe_sync(func, self, *args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 90, in maybe_sync
    return _run_until_done(func(*args, **kwargs))
  File "/usr/local/lib/python3.7/site-packages/fsspec/asyn.py", line 28, in _run_until_done
    loop._run_once()
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 1771, in _run_once
    handle = self._ready.popleft()
IndexError: pop from an empty deque

What you expected to happen:

Storage succeeding. No Exception.

Minimal Complete Verifiable Example:

# Put your MCVE code here
import dask.array as da
import xarray
import s3fs
import xarray as xr

bucket = "zarr"
name = "sample"

nana = xarray.DataArray(da.zeros((1023,1023,3)))

s3_path = f"{bucket}/{name}"
s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": "s3://minio:9000"}, username="weak_access_key", password="weak_secret_key")
s3store = s3.get_mapper(s3_path)


print("Storing")
nana.to_dataset().to_zarr(store=s3store, mode="w", consolidated=True, compute=True)
print("Getting")

Anything else we need to know?:

Thanks and keep up the great work!

Environment:

dask==2.28.0 xarray==0.16.1 aiobotocore==1.1.1 fsspec==0.8.3 zarr==2.4.0 s3fs==0.5.1

Output of <tt>xr.show_versions()</tt>

INSTALLED VERSIONS

commit: None python: 3.7.8 (default, Jun 30 2020, 18:27:23) [GCC 8.3.0] python-bits: 64 OS: Linux OS-release: 4.19.104-microsoft-standard machine: x86_64 processor: byteorder: little LC_ALL: None LANG: C.UTF-8 LOCALE: en_US.UTF-8 libhdf5: None libnetcdf: None

xarray: 0.16.1 pandas: 1.1.2 numpy: 1.19.2 scipy: 1.5.2 netCDF4: None pydap: None h5netcdf: None h5py: None Nio: None zarr: 2.4.0 cftime: None nc_time_axis: None PseudoNetCDF: None rasterio: None cfgrib: None iris: None bottleneck: None dask: 2.28.0 distributed: 2.28.0 matplotlib: 3.3.2 cartopy: None seaborn: 0.11.0 numbagg: None pint: None setuptools: 47.3.1 pip: 20.1.1 conda: None pytest: 6.1.0 IPython: 7.18.1 sphinx: None

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 15 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Can you confirm that this works ok with fsspec and s3fs master?