ray: [Core] Slow Embedded Object Ref Counting

What is the problem?

ray.get() shows unexpectedly high latencies when getting objects refs that contain embedded object refs. I’ve run the repro script below to measure the object ref counting latency here, which produced the following ray.get() latency results:

1000 object refs with 600 embedded refs: ~10 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
2000 object refs with 600 embedded refs: ~20 seconds w/o ref hiding, ~1 seconds w/ ref hiding
2000 object refs with 1200 embedded refs: ~40 seconds w/o ref hiding, ~1 seconds w/ ref hiding
500 object refs with 600 embedded refs: ~5 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
500 object refs with 1200 embedded refs: ~10 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding
500 object refs with 2400 embedded refs: ~20 seconds w/o ref hiding, ~0.5 seconds w/ ref hiding

Here, “ref hiding” refers to using cloudpickle.dumps(obj_ref) to hide embedded object references from Ray’s reference counter. The above figures also show that the runtime is approximately O(obj_refs*embedded_refs_per_obj) when embedded object references are not hidden, and roughly O(obj_refs) (with an apparent lower-bound of ~0.5 seconds) when hiding object references.

Ray version and other system information (Python version, TensorFlow version, OS): Ray 1.5.1 on AWS Deep Learning AMI (Ubuntu 18.04) V30.0

Reproduction (REQUIRED)

You can configure the OBJ_REFS, EMBEDDED_OBJ_REFS, and REF_HIDING variables of the below script to reproduce the above results, and uncomment the profiler code to trace the issue back to its root cause.

Python Script:

import datetime
import time
import ray
from ray import ray_constants
from ray import cloudpickle


OBJ_REFS = 1000  # number of put_object tasks to run, and number of top-level object refs input to ray.get()
EMBEDDED_OBJ_REFS = 600  # number of embedded object refs per input object
REF_HIDING = False  # whether to hide embedded object refs via cloudpickle.dumps(ray.put(input_object)))


def run():
    cluster_resources = ray.cluster_resources()
    next_node_idx = 0
    node_idx_to_id = {}
    for resource_name in cluster_resources.keys():
        if resource_name.startswith("node:"):
            node_idx_to_id[next_node_idx] = resource_name
            next_node_idx += 1

    input_objects = list(range(OBJ_REFS))
    tasks_pending = []
    i = 0
    for input_object in input_objects:
        node_id = node_idx_to_id[i % len(node_idx_to_id)]
        resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
        promise = put_object.options(resources=resources).remote(
            input_object,
        )
        tasks_pending.append(promise)
        i += 1
    print(f"getting {len(tasks_pending)} task results...")
    results = ray.get(tasks_pending)
# uncomment for profiling
#   prof = cProfile.Profile()
#   prof = prof.runctx(
#       "ray.get(tasks_pending)",
#       {"ray": ray},
#       {"tasks_pending": tasks_pending}
#   )
#   prof.dump_stats("/home/ubuntu/output.pstats")
    print(f"got {len(results)} task results")
    print(f"retrieved all results at: {datetime.datetime.now()}")


@ray.remote
def put_object(input_object):
    start = time.perf_counter()
    obj_idx_to_obj_id = {}
    for obj_idx in range(EMBEDDED_OBJ_REFS):
        obj_ref = ray.put(
            input_object
        )
        obj_idx_to_obj_id[obj_idx] = obj_ref \
            if not REF_HIDING \
            else cloudpickle.dumps(obj_ref)
    stop = time.perf_counter()
    print("put object latency: ", stop - start)
    print(f"put object end time: {datetime.datetime.now()}")
    return obj_idx_to_obj_id


if __name__ == '__main__':
    print(f"start time: {datetime.datetime.now()}")
    ray.init(address="auto")
    start_e2e = time.perf_counter()
    run()
    stop_e2e = time.perf_counter()
    print("total latency: ", stop_e2e - start_e2e)
    print(f"end time: {datetime.datetime.now()}")

Autoscaler Config YAML:

cluster_name: ref_counter_repro

max_workers: 1

provider:
    type: aws
    region: us-east-1
    availability_zone: us-east-1a, us-east-1b, us-east-1c, us-east-1d, us-east-1f

auth:
    ssh_user: ubuntu

available_node_types:
  ray.head.default:
    max_workers: 0
    resources: {}
    node_config:
      InstanceType: r5n.8xlarge
      ImageId: latest_dlami

  ray.worker.default:
    min_workers: 1
    max_workers: 1
    resources: {}
    node_config:
      InstanceType: r5n.8xlarge
      ImageId: latest_dlami

head_node_type: ray.head.default

head_start_ray_commands:
    - ray stop
    # r5n-8xlarge worker nodes provide 256GiB of memory spread across 32 CPUs
    # (i.e. an average of 256GiB/32CPU = 8GiB/CPU)
    # The head node reserves 96GiB for the object store and leave 160GiB for
    # everything else (e.g. gcs, driver, tasks, other OS processes, etc.).
    - "ulimit -n 65536; ray start --head --port=6379 --object-manager-port=8076 --object-store-memory=103079215104 --autoscaling-config=~/ray_bootstrap_config.yaml"

worker_start_ray_commands:
    - ray stop
    # r5n-8xlarge worker nodes provide 256GiB of memory spread across 32 CPUs
    # (i.e. an average of 256GiB/32CPU = 8GiB/CPU)
    # Worker nodes reserve 96GiB for the object store and leave 160GiB for
    # everything else (e.g. raylet, tasks, other OS proceses, etc.)
    # 32 CPUs on r5n-8xlarge instances means that we receive an average of
    # 96GiB/32CPU = 3GiB/CPU of object store memory per CPU and
    # 160GiB/32 = 5GiB/CPU of memory for everything else.
    - "ulimit -n 65536; ray start --address=$RAY_HEAD_IP:6379 --object-store-memory=103079215104 --object-manager-port=8076"

setup_commands:
  - pip install ray

If the code snippet cannot be run by itself, the issue will be closed with “needs-repro-script”.

  • I have verified my script runs in a clean environment and reproduces the issue.
  • I have verified the issue also occurs with the latest wheels.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 20 (13 by maintainers)

Most upvoted comments

Thanks for the detailed reproduction. I was able to reproduce the behavior with a modified script to wait for all tasks to finish. The behavior of slow getting and deallocating objects with embedded refs is reproducible on a single local node too.

In the last example, 500 * 2400 = 1.2e6 embedded object refs are created in ~ 20s. Each ObjectRef creation took ~ 17 microsec. When an embedded ObjectRef is created, reference count for the object needs to be incremented. This is where the script spent majority of the walltime. I think there are opportunities to reduce the time spent here. And I will check with others to see if anything can be optimized from a higher level.

Btw, @pdames how many embedded objects refs are you using inside an outer object or in total, and what would be an acceptable latency for calling ray.get() on them?

Got it. In that case then the only workaround I can think of is parallelizing the deserialization by using multiple tasks to do the ref-shuffle instead of just 1.

Right now sounds like the driver is deserializing |num_mappers| * |num_groups| refs. But, this could be parallelized with multiple tasks doing the reference grouping (suppose you had K tasks doing the driver work, then each task could handle some fraction of the mapper outputs).

So basically adding a mini “reference-only” shuffle phase to parallelize that bottlenecked portion.

I think the two layers of grouping still can help a lot here. Concretely, I would

  • in put_groups, return a batched dict of {group_id: ray.put(refs_for_this_group)}
  • in the driver, gather together all the batched refs for each group, and then,
  • pass a list of ref batches to reduce_group instead of a list of refs (you could also launch an intermediate task to combine the batches into a flat list)

Also, is there a reason you need to ray.put() every individual object in put_groups? Batching together multiple objects into ray.put() would reduce the number of references considerably if you have enough memory to buffer them.

Hmm Patrick in that case why not ray.put() each group of IDs as separate objects (have two layers of serialized objects?) I think that would remove most of the deserialization overhead.

On Thu, Aug 26, 2021, 11:25 PM Patrick Ames @.***> wrote:

Although I typically use all embedded object refs, I still think that some form of lazy deserialization could be useful for my use-case since I lazily distribute deserialization of embedded object refs to a parallel task worker launched for each outer object. I use the manual ref hiding workaround to form a dictionary of group ID keys to pickled object reference values containing the members of that group output from shuffle tasks run on the driver.

So, I generally don’t want a call like ray.get(group_ids_to_object_refs) to run a deep deserialization of all embedded object references on the driver if possible (esp. if it incurs significant latency), but would instead like to deserialize only the group ID keys on the driver, while distributing deserialization of all values for a given group (i.e. the embedded object references) to the task assigned to process that group.

Also, I’d be happy to run some tests against the proposed changes once they’re merged to measure the end-to-end latency impact for my use-case.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/issues/17803#issuecomment-906957990, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSVC5TPVSKPVRVDBKYLT64VXPANCNFSM5CCSJ7IQ . Triage notifications on the go with GitHub Mobile for iOS https://apps.apple.com/app/apple-store/id1477376905?ct=notification-email&mt=8&pt=524675 or Android https://play.google.com/store/apps/details?id=com.github.android&referrer=utm_campaign%3Dnotification-email%26utm_medium%3Demail%26utm_source%3Dgithub.

@mwtian and I synced offline and I think some steps to try are:

  1. Disable recording the call stack entirely.
  2. Batching the calls to deserialize_and_register_object_ref.

Also, I think the script is not an apples-to-apples comparison because the version that hides the refs doesn’t deserialize the ObjectRefs fully. If you deserialize the refs after ray.get, it actually has about the same latency as normally embedded ObjectRefs (note, on my laptop). Here’s the modified script that I used:

import datetime
import time
import ray
from ray import ray_constants
from ray import cloudpickle


OBJ_REFS = 100  # number of put_object tasks to run, and number of top-level object refs input to ray.get()
EMBEDDED_OBJ_REFS = 600  # number of embedded object refs per input object
REF_HIDING = True # whether to hide embedded object refs via cloudpickle.dumps(ray.put(input_object)))
DESERIALIZE = True # whether to deserialize the hidden embedded object refs


def run():
    cluster_resources = ray.cluster_resources()
    next_node_idx = 0
    node_idx_to_id = {}
    for resource_name in cluster_resources.keys():
        if resource_name.startswith("node:"):
            node_idx_to_id[next_node_idx] = resource_name
            next_node_idx += 1

    input_objects = list(range(OBJ_REFS))
    tasks_pending = []
    i = 0
    for input_object in input_objects:
        node_id = node_idx_to_id[i % len(node_idx_to_id)]
        resources = {node_id: ray_constants.MIN_RESOURCE_GRANULARITY}
        promise = put_object.options(resources=resources).remote(
            input_object,
        )
        tasks_pending.append(promise)
        i += 1
    print(f"getting {len(tasks_pending)} task results...")
    results = ray.get(tasks_pending)
# uncomment for profiling
#   prof = cProfile.Profile()
#   prof = prof.runctx(
#       "ray.get(tasks_pending)",
#       {"ray": ray},
#       {"tasks_pending": tasks_pending}
#   )
#   prof.dump_stats("/home/ubuntu/output.pstats")
    print(f"got {len(results)} task results")
    print(f"retrieved all results at: {datetime.datetime.now()}")

    del results
    start = time.time()
    results = ray.get(tasks_pending)
    if REF_HIDING and DESERIALIZE:
        results = [cloudpickle.loads(result) for result in results]
    end = time.time()
    print("ray.get in", end - start)


@ray.remote
def put_object(input_object):
    start = time.perf_counter()
    obj_idx_to_obj_id = {}
    for obj_idx in range(EMBEDDED_OBJ_REFS):
        obj_ref = ray.put(
            input_object
        )
        obj_idx_to_obj_id[obj_idx] = obj_ref
        #obj_idx_to_obj_id[obj_idx] = obj_ref \
        #    if not REF_HIDING \
        #    else cloudpickle.dumps(obj_ref)
    if REF_HIDING:
        obj_idx_to_obj_id = cloudpickle.dumps(obj_idx_to_obj_id)
    stop = time.perf_counter()
    print("put object latency: ", stop - start)
    print(f"put object end time: {datetime.datetime.now()}")
    return obj_idx_to_obj_id


if __name__ == '__main__':
    print(f"start time: {datetime.datetime.now()}")
    ray.init()
    start_e2e = time.perf_counter()
    run()
    stop_e2e = time.perf_counter()
    print("total latency: ", stop_e2e - start_e2e)
    print(f"end time: {datetime.datetime.now()}")

We could definitely get this run time down through the optimizations @mwtian is looking into, but just want to make sure we understand the actual performance gap.

@mwtian I’m currently using an average of ~1000 embedded object references per outer object, and a max of ~10,000 embedded object references. In total across all outer objects, I’m producing an average of ~2,000,000 embedded object references, and a max of ~30,000,000 embedded object references.

An acceptable latency for a common scenario of 625 outer object references with 3520 embedded object references each would be <5 seconds, and ideally <2 seconds.

For a relatively common large-scale scenario of 10,000 outer object references with 8,000 embedded object references each, an acceptable latency would be <20 seconds, and ideally <10 seconds.