dask: Dask cluster hangs while processing file using s3fs

So I have a function which loads nc files from S3 using s3fs and then select some specific region from files and then push back to different S3 bucket. Here’s my function. I’m doing every thing in JupyterLab

def get_records(rec):
    
    d=[rec[-1][0:4], rec[-1][4:6], rec[-1][6:8], rec[-1][9:11], rec[-1][11:13]] 
    
    yr=d[0]
    mo=d[1]
    da=d[2]
    hr=d[3]
    mn=d[4]
    
    ps = s3fs.S3FileSystem(anon=True)

    period = pd.Period(str(yr)+str('-')+str(mo)+str('-')+str(da), freq='D')
    dy=period.dayofyear
    print(dy)

    cc=[7,8,9,10,11,12,13,14,15,16]  #look at the IR channels only for now
    dy="{0:0=3d}".format(dy)

    # this loop is for 10 different channels
    for c in range(10):
        ch="{0:0=2d}".format(cc[c])

    # opening 2 different time slices of given particular record
        F1=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-2]))[['Rad']]

        F2=xr.open_dataset(ps.open(ps.glob('s3://noaa-goes16/ABI-L1b-RadF/'+str(yr)+'/'+str(dy)+'/'+str("{0:0=2d}".format(hr))+'/'+'OR_ABI-L1b-RadF-M3C'+ch+'*')[-1]))[['Rad']]
      
    # Selecting data as per given record radiance
        G1 = F1.where((F1.x >= (rec[0]-0.005)) & (F1.x <= (rec[0]+0.005)) & (F1.y >= (rec[1]-0.005)) & (F1.y <= (rec[1]+0.005)), drop=True)
        G2 = F2.where((F2.x >= (rec[0]-0.005)) & (F2.x <= (rec[0]+0.005)) & (F2.y >= (rec[1]-0.005)) & (F2.y <= (rec[1]+0.005)), drop=True)
       
    # Concating 2 time slices togethere
        G = xr.concat([G1, G2], dim  = 'time')

    # Concatiating different channels
        if c == 0:
            T = G    
        else:
            T = xr.concat([T, G], dim = 'channel')

    # Saving into nc file and storing them to S3
    path = rec[-1]+'.nc'
    T.to_netcdf(path)
    fs.put(path, bucket+path)

Now I want to use dask cluster to run this function parallelly like this this:

files = [ ]
for i in range(0, 100):
        s3_ds = dask.delayed(get_records)(records[i])
        files.append(s3_ds)

    files = dask.compute(*files)

So this thing was running perfectly fine last month but now when I try to do it again my clusters are not processing any files after a while. For example if I give 100 files to 10 cluster they won’t process any files after 60-70 even though they have memory left and will just sit idle and do nothing. So next time I tried to give just 50 files, then they’ll process around 30 files and then will sit idle. Even though they have memory left in them. I don’t know what I’m doing wrong or is it some bug in library. I upgraded all the library I was using, dask, s3fs and fssps. But still nothing is working.

Main thing is all this was working perfectly fine few weeks back, but now the same code is not working anymore.

Environment:

  • Dask version: 2021.2.0
  • Python version: 3.8.8
  • S3fs version: 0.6.0
  • Fsspec: 0.9.0

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 21 (10 by maintainers)

Commits related to this issue

Most upvoted comments

We’re making some progress on this in https://github.com/h5py/h5py/issues/2019, https://github.com/h5netcdf/h5netcdf/pull/117, and linked issues.

There might be some fixes possible at the fsspec level, but for now it seems like updating libraries to avoid circular references if possible is the best path forward. Either way, I don’t think there’s anything Dask can do, so I’m closing this.

@sharkinsspatial below are the steps I used to reproduce on a single machine. I’m running in AWS us-east-1, the same region as the S3 dataset. It’s not consistent, for me the worker process deadlocks about 50% of the time.

First start a scheduler and single worker with s3fs debugging enabled:

export S3FS_LOGGING_LEVEL=DEBUG
dask-scheduler &   
dask-worker --nthreads 1 localhost:8786

Then run the following python script in a different terminal:

import xarray as xr
import s3fs
import dask
import time
import json
from dask.distributed import Client

@dask.delayed
def s3open(path):
    fs = s3fs.S3FileSystem(anon=True, default_fill_cache=False)
    return fs.open(path)

client = Client('localhost:8786')

# Opening a single file is usually enough
files_mapper = [s3open('s3://era5-pds/2020/12/data/sea_surface_temperature.nc')]
ds = xr.open_mfdataset(files_mapper, engine='h5netcdf',
                       chunks={'lon':200,'lat':200,'time0':720},
                       concat_dim='time0',
                       combine='nested',
                       coords='minimal',
                       compat='override',
                       parallel=True)

# A short pause here makes the deadlock more likely for some reason
time.sleep(1)
# Read dataset into worker memory
ds = client.persist(ds)

# Output the stats and worker call stack every 5 seconds to monitor progress
while(1):
    print(json.dumps(client.scheduler_info(),indent=2))
    print(json.dumps(client.call_stack(),indent=2))
    time.sleep(5)

Let it run for a little while, if the worker stops displaying s3fs debug logs while there are still queued tasks you’ve likely got it deadlocked. You may need to run it a few times before it happens.

I’ve tracked this down, at least in my case, to global locking performed in the h5py library which is used by the h5netcdf engine in xarray.

I managed to reproduce the issue semi-reliably on a single worker and hooked up gdb to the hung fsspecIO thread. The stack trace shows the python garbage collector attempting to acquire a lock during clean-up of a h5py object.

This is the sequence of events:

  1. Dask worker thread acquires the global h5py lock through h5py Dataset.__getitem__ as you can see in the call stack from my earlier comment (although sometimes it occurs via a different method, they all acquire the lock)
  2. The request is passed to fsspec and s3fs as a read
  3. An S3 range request is submitted as a coroutine to the fsspecIO thread’s event loop in fsspec.sync.
  4. Sometimes python garbage collection occurs during the fsspecIO thread’s execution
  5. If the garbage collector running in the fsspecIO thread decides to clean up a h5py object, it calls ObjectID.__dealloc__ and attempts to acquire the global h5py lock
  6. The two threads are now deadlocked and the worker hangs indefinitely

I’m not sure what the best fix might be, but a few ideas that came to mind:

  • A single-threaded mode for fsspec, that could be enabled through config when h5py is in use
  • More of a work-around, disabling garbage collection during the fsspec.sync() execution, or perhaps an explicit call to gc.collect() before handing off to the co-routine might work (I haven’t tested though)
  • Set timeouts on all fsspec.sync() calls
  • More granular object-level locking in the h5py library (likely infeasible)

Is there anything else that could be done within dask or fsspec to avoid the deadlock?

Here’s the stack trace: fsspecIO-stacktrace.txt