ray: ray.put() slows down over time.

ray.put slows down over time.

I have a simple setup with 2 actors. First Actor places raw and preprocessed images in shared memory and second actor runs predictions on preprocessed images.

import ray
import time
import numpy as np
from tqdm import tqdm
from ray.util import ActorPool
from ray.util.metrics import Count, Histogram, Gauge

CAMERA_COUNT = 3

@ray.remote
class Camera:
    def __init__(self):
        self.camera_acquired_frame_counter = Count(name="camera_acquired_count")
        self.camera_acquire_time = Gauge(name=f'camera_acquire', tag_keys=("frame_order",))
        self.preprocess_time = Gauge(name=f'camera_preprocess', tag_keys=("frame_order",))
        self.ray_frame_set_put_time = Gauge(name=f'camera_frame_ray_put')
        self.ray_preprocessed_frame_set_put_time = Gauge(name=f'camera_preprocessed_ray_put')

    def get(self):
        frame_set_acquire_start_time = time.time()
        camera_count = CAMERA_COUNT
        for cam_idx in range(camera_count):
            frame_acquire_start_time = time.time()
            time.sleep(0.01)
            self.camera_acquire_time.record(time.time() - frame_acquire_start_time, tags={"frame_order": str(cam_idx)})
        self.camera_acquired_frame_counter.record(1.0)
        self.camera_acquire_time.record(time.time() - frame_set_acquire_start_time, tags={"frame_order": f'frame_set'})

        frame_set_preprocess_start_time = time.time()
        for cam_idx in range(camera_count):
            frame_preprocess_start_time = time.time()
            time.sleep(0.01)
            self.preprocess_time.record(time.time() - frame_preprocess_start_time, tags={"frame_order": str(cam_idx)})
        self.camera_acquire_time.record(time.time() - frame_set_preprocess_start_time, tags={"frame_order": f'frame_set'})

        ray_frame_set_put_stime = time.time()
        frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.ray_frame_set_put_time.record(time.time() - ray_frame_set_put_stime)
        ray_preprocessed_frame_set_put_stime = time.time()
        preprocesed_frame_ref = ray.put(np.zeros((3, 512, 512, 1)))
        self.ray_preprocessed_frame_set_put_time.record(time.time() - ray_preprocessed_frame_set_put_stime)

        return frame_ref, preprocesed_frame_ref

@ray.remote
class Classifier:
    def __init__(self):
        self.classifier_infer_time = Gauge(name=f'classifier_infer')

    def predict(self, preprocesed_frame_ref):
        predict_stime = time.time()
        time.sleep(0.03)
        self.classifier_infer_time.record(time.time() - predict_stime)
        return np.zeros(1000)


if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    camera = Camera.remote()
    classifier = Classifier.remote()
    for ridx in tqdm(range(1000000), desc="Main loop"):
        frame_ref, preprocesed_frame_ref = ray.get(camera.get.remote())
        prediction = ray.get(classifier.predict.remote(preprocesed_frame_ref))

I notice that ray.put() call in Camera#get

frame_ref = ray.put(np.zeros((3, 3000, 5000, 1)))

starts to slow down over time.

screenshot

This trend looks a bit concerning. Any ideas about what’s happening here?

About this issue

  • Original URL
  • State: open
  • Created 3 years ago
  • Comments: 20 (8 by maintainers)

Most upvoted comments

My apologies. I wasn’t able to spend time on this sooner.

I modified my code to keep tracking of timing metrics for older versions of ray.

import ray
import numpy as np
from tqdm import tqdm
import time
try:
    from ray.util.metrics import Count, Gauge
except ImportError as e:
    process_start_time = time.time()
    print(f"Count and Gauge are not available in ray version {ray.__version__}.")
    import pandas as pd
    class Gauge:
        def __init__(self, name):
            self.name = name
            self.file_name = f'/tmp/{self.name}.pkl'
            pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

        def record(self, value : float):
            # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
            df=pd.read_pickle(self.file_name)
            df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
            df.to_pickle(self.file_name)

    class Count:
        def __init__(self, name):
            self.name = name
            self.file_name = f'/tmp/{self.name}.pkl'
            pd.DataFrame([], columns=['name', 'time', 'value']).to_pickle(self.file_name)

        def record(self, value : float):
            # We read DataFrame from file each time so we don't add overhead of maintaining a large DataFrame in memory
            df=pd.read_pickle(self.file_name)
            df.at[len(df), ['name', 'time', 'value']] = [self.name, int(process_start_time - time.time()), value]
            df.to_pickle(self.file_name)

@ray.remote
class Actor1:

    def __init__(self):
        self.put_count = Count(name="repro_put_count")
        self.put_gauge = Gauge(name="repro_put_gauge")

    def do_stuff(self):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.put_gauge.record((time.time() - stime) * 1000)
        self.put_count.record(1.)
        return object_ref


def run():
    actor1 = Actor1.remote()
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        object_ref = ray.get(actor1.do_stuff.remote())

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()

image

I’m seeing similar pattern with 1.0.0. I’ll run this code again with 0.8.6

import ray
import numpy as np
from tqdm import tqdm
import time
from ray.util.metrics import Count, Gauge

@ray.remote
class Actor1:

    def __init__(self):
        self.put_count = Count(name="repro_put_count")
        self.put_gauge = Gauge(name="repro_put_gauge")

    def do_stuff(self):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        self.put_gauge.record((time.time() - stime) * 1000)
        self.put_count.record(1.)
        return object_ref


def run():
    actor1 = Actor1.remote()
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        object_ref = ray.get(actor1.do_stuff.remote())

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()

I ran this code for ~20 hours and I see similar spikes here.

image

P.S: There are few periods where there are no metrics. That was because the prometheus server was not able to access metrics during that time. However, the code above was running the entire time.

This looks bad. Can you check if this is reproducible when you purely keep calling ray.put?

Sure. I’ll let this code run overnight and post results.

import ray
import numpy as np
from tqdm import tqdm
import time
from ray.util.metrics import Count, Gauge

def run():
    put_count = Count(name="repro_put_count")
    put_gauge = Gauge(name="repro_put_gauge")
    for ridx in tqdm(range(1000000000), desc="Main loop"):
        stime = time.time()
        object_ref = ray.put(np.zeros((3, 3000, 5000, 1)))
        put_gauge.record((time.time() - stime) * 1000)
        put_count.record(1.)

if __name__ == '__main__':
    print(ray.init(_metrics_export_port=58391))
    print(ray.cluster_resources())
    run()