ray: [object_store] Object Spilling does not work properly
What is the problem?
Ray version and other system information (Python version, TensorFlow version, OS):
- Ray: nightly (3/18/2021) via https://s3-us-west-2.amazonaws.com/ray-wheels/latest/ray-2.0.0.dev0-cp38-cp38-manylinux2014_x86_64.whl
- Python 3.8.5
- Linux ip-172-31-61-153 5.4.0-1038-aws #40~18.04.1-Ubuntu SMP Sat Feb 6 01:56:56 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
Reproduction (REQUIRED)
Please provide a short code snippet (less than 50 lines if possible) that can be copy-pasted to reproduce the issue. The snippet should have no external library dependencies (i.e., use fake or mock data / environments):
I ran this on a m5.xlarge instance with 4 CPU and 16GB RAM.
First, start Ray with the following command:
ray stop && ray start --head --system-config='{"automatic_object_spilling_enabled":true,"max_io_workers":4,"min_spilling_size":104857600,"object_spilling_config":"{\"type\":\"filesystem\",\"params\":{\"directory_path\":\"/tmp/spill\"}}"}' --resources='{"worker":1}'
Second, run the repro script (see end):
Observe:
- The two mappers start, printing “Mapper 0” and “Mapper 1”
- Objects seem to be spilled onto disk as expected
- The system then hangs
If you change the BUF_SIZE to be something smaller (e.g. 1e9), then the program finishes normally.
If the code snippet cannot be run by itself, the issue will be closed with “needs-repro-script”.
- I have verified my script runs in a clean environment and reproduces the issue.
- I have verified the issue also occurs with the latest wheels.
Repro script code (edited):
import numpy as np
import ray
MAPPER_PART_SIZE = 2 * 1024 * 1024 * 1024
M = 4
R = 4
@ray.remote(resources={"worker": 1})
def mapper(mapper_id):
print("Mapper", mapper_id)
part_size = int(MAPPER_PART_SIZE / R)
ret = [b"0" * part_size for _ in range(R)]
return ret
@ray.remote
def reducer(reducer_id, *chunks):
print("Reducer", reducer_id)
print([len(c) for c in chunks])
return 0
ray.init(address="auto")
mapper_results = np.empty((M, R), dtype=object)
for m in range(M):
mapper_results[m, :] = mapper.options(num_returns=R).remote(m)
reducer_results = []
for r in range(R):
chunks = mapper_results[:, r].tolist()
ret = reducer.remote(r, *chunks)
reducer_results.append(ret)
ray.get(reducer_results)
print("OK")
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 15 (11 by maintainers)
Let me assign P1 temporarily
I think this is likely fixed by https://github.com/ray-project/ray/pull/14821 which will merge shortly