ray: Ray remote task + fastapi memory leak

What happened + What you expected to happen

I started a fastapi app and an interface continuously created remote tasks.

I use memray tool get memory leak report:

After 1w+ request “/tasks”: 0x1 56.3 MiB malloc 1239346 invocation at /usr/local/lib/python3.10/site-packages/ray/remote_function.py:411

After 2w+ request “/tasks”: 0x27 150.3 MiB malloc 3198029 invocation at /usr/local/lib/python3.10/site-packages/ray/remote_function.py:411

image

image

Versions / Dependencies

ray version == 2.7.1, and ray cluster run in docker.

Reproduction script

Here is my fastapi.py

@ray.remote
def run_task(entrypoint, tid, metadata):
    entrypoint += f" --job='{json.dumps(metadata)}'"
    cmd = shlex.split(entrypoint)
    logging.info(f"run {cmd}")
    with Popen(args=cmd) as p:
        ...

@app.post("/tasks/")
async def fop_jobs(req: FopReq):
    a = run_fop.options(
        num_cpus=req.entrypoint_num_cpus,
        num_gpus=req.entrypoint_num_gpus,
        runtime_env=req.runtime_env
    ).remote(req.entrypoint, req.tid, req.metadata)
    return req

if __name__ == "__main__":
    uvicorn.run(app, host='0.0.0.0', port=8000)

Issue Severity

High: It blocks me from completing my task.

About this issue

  • Original URL
  • State: closed
  • Created 7 months ago
  • Comments: 20 (20 by maintainers)

Most upvoted comments

@jjyao Your PR works fine, it can fix the memory leak

Simpler repro:

import ray

ray.init()

@ray.remote(max_calls=1)
def f():
  pass

while True:
  ray.get(f.remote())

@jjyao This is also a point of confusion for me. It is easy to reproduce this problem in version 2.8.0, but in the master branch I need to run fastapi.py and test.py at the same time and maintain the same qps as the total resource amount to reproduce it. But it can be reproduced in a similar way anyway. The key point seems to be options.num_cpus=0.5 instead of 1. For specific phenomena, you can see that ray list workers continue to increase.

I add one debug log at core_worker_pool.cc CoreWorkerClientPool::GetOrConnect RAY_LOG(DEBUG) << "Connected to " << addr_proto.ip_address() << ":" << addr_proto.port() << " size: " << client_map_.size();

here is log of driver worker in head node which is built by branch of master. [2023-11-27 22:30:55,540 D 1182 1217] core_worker_client_pool.cc:42: Connected to 10.23.176.225:10004 size: 1 // … 10 min interval, size 1 to 1914 [2023-11-27 22:41:54,014 D 1182 1217] core_worker_client_pool.cc:42: Connected to 10.23.176.225:13439 size: 1914

This client_map_ will continue to grow and will not be released.

At the same time, I also noticed your comment in my code. That was my mistake. I am still looking for a way to let the idle worker on the worker node notify the driver worker on the head node that its idle worker has exited and destroyed the client_map_ record to ensure the size.