gcsfs: GCSFileSystem() hangs when called from multiple processes

What happened: In the last two versions of gcsfs (versions 2021.04.0 and 0.8.0), calling gcsfs.GCSFileSystem() from multiple processes hangs without any error messages if gcsfs.GCSFileSystem() has been called previously in the same Python interpreter session.

This bug was not present in gcsfs version 0.7.2 (with fsspec 0.8.7). All the code examples below work perfectly with gcsfs version 0.7.2 (with fsspec 0.8.7).

Minimal Complete Verifiable Example:

The examples below assume gcsfs version 2021.04.0 is installed (with fsspec 2021.04.0) or gcsfs version 0.8.0 (with fsspec 0.9.0)

Install a fresh conda environment: conda create --name test_gcsfs python=3.8 gcsfs ipykernel

The last block of this code hangs:

from concurrent.futures import ProcessPoolExecutor
import gcsfs

# This line works fine!  (And it's fine to repeat this line multiple times.)
gcs = gcsfs.GCSFileSystem() 

# This block hangs, with no error messages:
with ProcessPoolExecutor() as executor:
    for i in range(8):
        future = executor.submit(gcsfs.GCSFileSystem)

But, if we don’t do gcs = gcsfs.GCSFileSystem(), then the code works fine. The next code example works perfectly, if run in a fresh Python interpreter. The only difference between the next code example and the previous code example is I’ve removed gcs = gcsfs.GCSFileSystem().

from concurrent.futures import ProcessPoolExecutor
import gcsfs

# This works fine:
with ProcessPoolExecutor() as executor:
    for i in range(8):
        future = executor.submit(gcsfs.GCSFileSystem)

Likewise, calling the ProcessPoolExecutor multiple times works the first time, but hangs on subsequent tries:

from concurrent.futures import ProcessPoolExecutor
import gcsfs

def process_pool():
    with ProcessPoolExecutor(max_workers=1) as executor:
        for i in range(8):
            future = executor.submit(gcsfs.GCSFileSystem)

# The first attempt works fine:
process_pool()

# This second attempt hangs:
process_pool()

Anything else we should know

Thank you so much for all your hard work on gcsfs - it’s a hugely useful tool! Sorry to be reporting a bug!

I tested all this code in a Jupyter Lab notebook.

This issue might be related to this Stack Overflow issue: https://stackoverflow.com/questions/66283634/use-gcsfilesystem-with-multiprocessing

Environment:

  • Dask version: Not installed
  • Python version: 3.8
  • Operating System: Ubuntu 20.10
  • Install method: conda, from conda-forge, using a fresh conda environment.

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Reactions: 5
  • Comments: 44 (19 by maintainers)

Most upvoted comments

Maybe call gcs.clear_instance_cache() before the block instead of at the end, or include skip_instance_cache=True in the constructor; but this still doesn’t clear the reference to the loop and thread. You could do that with

fsspec.asyn.iothread[0] = None
fsspec.asyn.loop[0] = None

and that is what any fork-detecting code should be doing.

I’ve done a few more experiments (in the hopes that this might be of use to other people in a similar situation; or maybe useful to help understand what’s going on!)

It turns out that fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None needs to be run in every worker process. It’s not sufficient to just do this in the parent process.

It doesn’t matter if the code does fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None before or after gcs = gcsfs.GCSFileSystem().

When using fsspec.asyn.iothread[0] = None; fsspec.asyn.loop[0] = None, it’s no longer necessary to do skip_instance_cache=True or gcs.clear_instance_cache().

Each worker process has to open the Zarr store. If I try lazily opening the Zarr store in the main process and passing this object into each worker process then fsspec throws an error saying it’s not thread safe. That’s fine, it’s no problem for my code to open the Zarr store in each worker process.

I came across the same issue - I’m running FastAPI on Gunicorn with Uvicorn workers, 4 workers running. What’s the recommended way to use GCSFileSystem in such a setup? I use fsspec.filesystem("gs") to initialize in case that’s relevant.

You could try this (assuming that you fork the worker processes causing deadlocks):

os.register_at_fork(
    after_in_child=fsspec.asyn.reset_lock,
)

No difference unfortunately. I’m using gunicorn with --preload so putting that line in at the start of my app.py should get picked up before the fork happens, but I’m not certain about how gunicorn handles its forking

If you are not sure whether the registered hook gets picked up before the fork you could also just directly call fsspec.asyn.reset_lock() at the beginning of your code. This should release the lock that is causing the deadlock.

This did the trick for me! As you say had to find the right place to put it (there was some ThreadPoolExecutor code pre-fork as well which was messing with things). By making sure I called reset_lock (in the parent thread) after fsspec.filesystem(...) had been called in the parent thread, things worked nicely on all the forks