ray: [Core] Slow Embedded Object Ref Counting
What is the problem?
ray.get() shows unexpectedly high latencies when getting objects refs that contain embedded object refs. I’ve run the repro script below to measure the object ref counting latency here, which produced the following ray.get() latency results:
1000 object refs with 600 embedded refs: ~10 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
2000 object refs with 600 embedded refs: ~20 seconds w/o ref hiding, ~1 seconds w/ ref hiding
2000 object refs with 1200 embedded refs: ~40 seconds w/o ref hiding, ~1 seconds w/ ref hiding
500 object refs with 600 embedded refs: ~5 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
500 object refs with 1200 embedded refs: ~10 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
500 object refs with 2400 embedded refs: ~20 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
Here, “ref hiding” refers to using cloudpickle.dumps(obj_ref) to hide embedded object references from Ray’s reference counter. The above figures also show that the runtime is approximately O(obj_refs*embedded_refs_per_obj) when embedded object references are not hidden, and roughly O(obj_refs) (with an apparent lower-bound of ~0.5 seconds) when hiding object references.
Ray version and other system information (Python version, TensorFlow version, OS): Ray 1.5.1 on AWS Deep Learning AMI (Ubuntu 18.04) V30.0
Reproduction (REQUIRED)
You can configure the OBJ_REFS, EMBEDDED_OBJ_REFS, and REF_HIDING variables of the below script to reproduce the above results, and uncomment the profiler code to trace the issue back to its root cause.
Python Script:
import datetime
import time
import ray
from ray import ray_constants
from ray import cloudpickle
OBJ_REFS = 1000 # number of put_object tasks to run, and number of top-level object refs input to ray.get()
EMBEDDED_OBJ_REFS = 600 # number of embedded object refs per input object
REF_HIDING = False # whether to hide embedded object refs via cloudpickle.dumps(ray.put(input_object)))
def run():
cluster_resources = ray.cluster_resources()
next_node_idx = 0
node_idx_to_id = {}
for resource_name in cluster_resources.keys():
if resource_name.startswith("node:"):
node_idx_to_id[next_node_idx] = resource_name
next_node_idx += 1
input_objects = list(range(OBJ_REFS))
tasks_pending = []
i = 0
for input_object in input_objects:
node_id = node_idx_to_id[i % len(node_idx_to_id)]
resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
promise = put_object.options(resources=resources).remote(
input_object,
)
tasks_pending.append(promise)
i += 1
print(f"getting {len(tasks_pending)} task results...")
results = ray.get(tasks_pending)
# uncomment for profiling
# prof = cProfile.Profile()
# prof = prof.runctx(
# "ray.get(tasks_pending)",
# {"ray": ray},
# {"tasks_pending": tasks_pending}
# )
# prof.dump_stats("/home/ubuntu/output.pstats")
print(f"got {len(results)} task results")
print(f"retrieved all results at: {datetime.datetime.now()}")
@ray.remote
def put_object(input_object):
start = time.perf_counter()
obj_idx_to_obj_id = {}
for obj_idx in range(EMBEDDED_OBJ_REFS):
obj_ref = ray.put(
input_object
)
obj_idx_to_obj_id[obj_idx] = obj_ref \
if not REF_HIDING \
else cloudpickle.dumps(obj_ref)
stop = time.perf_counter()
print("put object latency: ", stop - start)
print(f"put object end time: {datetime.datetime.now()}")
return obj_idx_to_obj_id
if __name__ == '__main__':
print(f"start time: {datetime.datetime.now()}")
ray.init(address="auto")
start_e2e = time.perf_counter()
run()
stop_e2e = time.perf_counter()
print("total latency: ", stop_e2e - start_e2e)
print(f"end time: {datetime.datetime.now()}")
Autoscaler Config YAML:
cluster_name: ref_counter_repro
max_workers: 1
provider:
type: aws
region: us-east-1
availability_zone: us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f
auth:
ssh_user: ubuntu
available_node_types:
ray.head.default:
max_workers: 0
resources: {}
node_config:
InstanceType: r5n.8xlarge
ImageId: latest_dlami
ray.worker.default:
min_workers: 1
max_workers: 1
resources: {}
node_config:
InstanceType: r5n.8xlarge
ImageId: latest_dlami
head_node_type: ray.head.default
head_start_ray_commands:
- ray stop
# r5n-8xlarge worker nodes provide 256GiB of memory spread across 32 CPUs
# (i.e. an average of 256GiB/32CPU = 8GiB/CPU)
# The head node reserves 96GiB for the object store and leave 160GiB for
# everything else (e.g. gcs, driver, tasks, other OS processes, etc.).
- "ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --object-store-memory=103079215104 --autoscaling-config=~/ray_bootstrap_config.yaml"
worker_start_ray_commands:
- ray stop
# r5n-8xlarge worker nodes provide 256GiB of memory spread across 32 CPUs
# (i.e. an average of 256GiB/32CPU = 8GiB/CPU)
# Worker nodes reserve 96GiB for the object store and leave 160GiB for
# everything else (e.g. raylet, tasks, other OS proceses, etc.)
# 32 CPUs on r5n-8xlarge instances means that we receive an average of
# 96GiB/32CPU = 3GiB/CPU of object store memory per CPU and
# 160GiB/32 = 5GiB/CPU of memory for everything else.
- "ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-store-memory=103079215104 --object-manager-port=8076"
setup_commands:
- pip install ray
If the code snippet cannot be run by itself, the issue will be closed with “needs-repro-script”.
- I have verified my script runs in a clean environment and reproduces the issue.
- I have verified the issue also occurs with the latest wheels.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 20 (13 by maintainers)
Thanks for the detailed reproduction. I was able to reproduce the behavior with a modified script to wait for all tasks to finish. The behavior of slow getting and deallocating objects with embedded refs is reproducible on a single local node too.
In the last example,
500 * 2400 = 1.2e6embedded object refs are created in ~20s. EachObjectRefcreation took ~17 microsec. When an embeddedObjectRefis created, reference count for the object needs to be incremented. This is where the script spent majority of the walltime. I think there are opportunities to reduce the time spent here. And I will check with others to see if anything can be optimized from a higher level.Btw, @pdames how many embedded objects refs are you using inside an outer object or in total, and what would be an acceptable latency for calling
ray.get()on them?Got it. In that case then the only workaround I can think of is parallelizing the deserialization by using multiple tasks to do the ref-shuffle instead of just 1.
Right now sounds like the driver is deserializing
|num_mappers| * |num_groups|refs. But, this could be parallelized with multiple tasks doing the reference grouping (suppose you had K tasks doing the driver work, then each task could handle some fraction of the mapper outputs).So basically adding a mini “reference-only” shuffle phase to parallelize that bottlenecked portion.
I think the two layers of grouping still can help a lot here. Concretely, I would
{group_id: ray.put(refs_for_this_group)}Also, is there a reason you need to ray.put() every individual object in put_groups? Batching together multiple objects into ray.put() would reduce the number of references considerably if you have enough memory to buffer them.
Hmm Patrick in that case why not ray.put() each group of IDs as separate objects (have two layers of serialized objects?) I think that would remove most of the deserialization overhead.
On Thu, Aug 26, 2021, 11:25 PM Patrick Ames @.***> wrote:
@mwtian and I synced offline and I think some steps to try are:
deserialize_and_register_object_ref.Also, I think the script is not an apples-to-apples comparison because the version that hides the refs doesn’t deserialize the ObjectRefs fully. If you deserialize the refs after
ray.get, it actually has about the same latency as normally embedded ObjectRefs (note, on my laptop). Here’s the modified script that I used:We could definitely get this run time down through the optimizations @mwtian is looking into, but just want to make sure we understand the actual performance gap.
@mwtian I’m currently using an average of ~1000 embedded object references per outer object, and a max of ~10,000 embedded object references. In total across all outer objects, I’m producing an average of ~2,000,000 embedded object references, and a max of ~30,000,000 embedded object references.
An acceptable latency for a common scenario of 625 outer object references with 3520 embedded object references each would be <5 seconds, and ideally <2 seconds.
For a relatively common large-scale scenario of 10,000 outer object references with 8,000 embedded object references each, an acceptable latency would be <20 seconds, and ideally <10 seconds.
cc @mwtian