ray: [object_store] Object Spilling does not work properly

What is the problem?

Ray version and other system information (Python version, TensorFlow version, OS):

Reproduction (REQUIRED)

Please provide a short code snippet (less than 50 lines if possible) that can be copy-pasted to reproduce the issue. The snippet should have no external library dependencies (i.e., use fake or mock data / environments):

I ran this on a m5.xlarge instance with 4 CPU and 16GB RAM.

First, start Ray with the following command:

ray stop && ray start --head --system-config='{"automatic_object_spilling_enabled":true,"max_io_workers":4,"min_spilling_size":104857600,"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}' --resources='{"worker":1}'

Second, run the repro script (see end):

Observe:

  • The two mappers start, printing “Mapper 0” and “Mapper 1”
  • Objects seem to be spilled onto disk as expected
  • The system then hangs

If you change the BUF_SIZE to be something smaller (e.g. 1e9), then the program finishes normally.

cc @stephanie-wang @rkooo567

If the code snippet cannot be run by itself, the issue will be closed with “needs-repro-script”.

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

Repro script code (edited):

import numpy as np
import ray

MAPPER_PART_SIZE = 2 * 1024 * 1024 * 1024
M = 4
R = 4


@ray.remote(resources={"worker": 1})
def mapper(mapper_id):
    print("Mapper", mapper_id)
    part_size = int(MAPPER_PART_SIZE / R)
    ret = [b"0" * part_size for _ in range(R)]
    return ret


@ray.remote
def reducer(reducer_id, *chunks):
    print("Reducer", reducer_id)
    print([len(c) for c in chunks])
    return 0


ray.init(address="auto")

mapper_results = np.empty((M, R), dtype=object)

for m in range(M):
    mapper_results[m, :] = mapper.options(num_returns=R).remote(m)

reducer_results = []
for r in range(R):
    chunks = mapper_results[:, r].tolist()
    ret = reducer.remote(r, *chunks)
    reducer_results.append(ret)

ray.get(reducer_results)
print("OK")

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 15 (11 by maintainers)

Most upvoted comments

Let me assign P1 temporarily

I think this is likely fixed by https://github.com/ray-project/ray/pull/14821 which will merge shortly