zarr-python: Python multiprocessing writes get deadlocked on Linux systems

Hi everyone,

Our team is trying to write to zarr arrays on AWS S3 using Python’s built-in multiprocessing (mp) tools.

Once we start an mp.Pool() context, and try to write to a zarr.Array our tool deadlocks in certain environments.

Code sample and details below.

Code sample

Assuming S3 credentials are set up in your environment.

from itertools import repeat
import multiprocessing as mp

import numpy as np
import zarr


def worker(zarr_array, idx):
    zarr_array[idx, ...] = np.random.randn(1024, 1024).astype('float32')


if __name__ == "__main__":
    zarr_root_path = 's3://XXXXX/benchmarks/debug/test.zarr'  # Sorry, have to blank out S3 buket.

    root = zarr.open_group(zarr_root_path, mode='w')

    dataset = root.create_dataset(
        shape=(5, 1024, 1024),
        chunks=(1, -1, -1),
        name='test1',
        overwrite=True,
        synchronizer=zarr.ProcessSynchronizer('.lock'),
    )

    # dataset[:] = np.random.randn(*dataset.shape)

    iterable = zip(
        repeat(dataset),
        range(5),
    )

    with mp.Pool() as pool:
        pool.starmap(worker, iterable)

Problem description

We ran into this issue when working with AWS EC2 instances and our on-prem Linux workstations. My Windows laptop (x86_64) to S3 works as expected without any issues. The data is chunked by rows, and as you can see, our parallel writes are aligned with chunk boundaries. We are also using a zarr.ProcessSynchronizer() just in case. However, behavior is the same without one.

On a Windows to S3 environment, this code completes as expected. However, on the Linux tests, it doesn’t complete.

  • Does not create any files inside .zarray on S3
  • When we kill the process, we see it is stuck on threading??

If you comment the iterable, mp.Pool, and code associated with them; and uncomment the dataset[:] = np.random.randn(*dataset.shape), everything runs as expected on all systems. Issue persists with multiprocessing.

Things we have tried:

  • Older s3fs and fsspec versions
  • Different Python versions
  • Add/remove ProcessSynchronizer and ThreadSynchronizer
  • Tried blosc.use_threads = False, and turning off compression
  • More things I can’t remember…

Here is the traceback after we keyboard interrupt.

Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/ubuntu/develop/scratch/parallel_zarr.py", line 33, in <module>
    pool.starmap(worker, iterable)
  File "/home/ubuntu/miniconda3/envs/my_env/lib/python3.9/multiprocessing/pool.py", line 372, in starmap
    return self._map_async(func, iterable, starmapstar, chunksize).get()
  File "/home/ubuntu/miniconda3/envs/my_env/lib/python3.9/multiprocessing/pool.py", line 765, in get
    self.wait(timeout)
  File "/home/ubuntu/miniconda3/envs/my_env/lib/python3.9/multiprocessing/pool.py", line 762, in wait
    self._event.wait(timeout)
  File "/home/ubuntu/miniconda3/envs/my_env/lib/python3.9/threading.py", line 574, in wait
    signaled = self._cond.wait(timeout)
  File "/home/ubuntu/miniconda3/envs/my_env/lib/python3.9/threading.py", line 312, in wait
    waiter.acquire()
KeyboardInterrupt

Version and installation information

  • AWS c6gd.12xlarge instance.
  • ARM 48 core CPU (aarch64); however on-prem boxes are x86_64.
  • Python 3.9.5 (also tested 3.7, and 3.8)
  • conda environment with conda version 4.10.1. Everything is installed via conda except 1 specialized package we built from source.
  • zarr version 2.8.1
  • s3fs version 0.6.0
  • fsspec version 2021.05.0
  • numcodecs version 0.7.3

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 17 (17 by maintainers)

Most upvoted comments

Have you tried multiprocessing context -> “spawn”? Does this work with dask-distributed and processes (which has a different serialisation model) instead of the pool?

Is it more beneficial to use “forkserver” over “spawn” if performance is critical in a Unix environment?

For long-lived processed, it won’t matter, this is a one-time cost of starting the process (and happens to be the only option on Windows). Dask uses forkserver on linux by default, ought to be OK, but

If the existing context was “fork”, then you need do nothing more: always use spawn and be happy - fork should never be used in an application with event loops or threads. If it was “forkserver”, then we need to fix this.