ray: [core] Possible memory leak in async actor implementation

@shrekris-anyscale and I have been trying to track down a memory leak issue happening across multiple serve components. All of them are async actors and the growth only happens when actor calls are being made, so it lines up that it could be a memory leak related to async actor calls.

I ran a minimal script to send batches of many async actor calls and memory appears to be consistently growing. This is running on nightly wheels, commit sha: 0b8fd1d6dd7e83656713b21bb408fa7f04244bfb.

Repro script (pinning to head node):

import ray

@ray.remote
class A:
    async def hi(self):
        return "hi"

a = A.options(resources={"node:10.0.62.86": 0.1}).remote()

while True:
    ray.get([a.hi.remote() for _ in range(10000)])

Memory growth over 3 hours: Screen Shot 2023-08-17 at 5 05 49 PM

Link to the workspace where this is currently running (I plan to leave it going overnight, please don’t touch):

For reference, here is a run of the same script but dropping the async def:

import ray

@ray.remote
class A:
    def hi(self):
        return "hi"

a = A.options(resources={"node:10.0.62.86": 0.1}).remote()

while True:
    ray.get([a.hi.remote() for _ in range(10000)])

Screen Shot 2023-08-17 at 5 06 29 PM

About this issue

  • Original URL
  • State: closed
  • Created 10 months ago
  • Reactions: 3
  • Comments: 15 (15 by maintainers)

Most upvoted comments

Async actor’s memory keeps going up. Reproduced

image

Screenshot 2023-08-24 at 10 40 00 PM

Left is with jemalloc, right is without jemalloc

Ok, I believe we’ve narrowed down that this is an issue related to max_concurrency / concurrency groups. Here’s the updated repro:

import asyncio
import random
import time

import ray
from ray import serve

@ray.remote(concurrency_groups={"TEST_CONCURRENCY_GROUP": 1})
class Executor:
    @ray.method(concurrency_group="TEST_CONCURRENCY_GROUP")
    async def hi(self):
        return "hi"


@ray.remote
class Caller:
    def __init__(self, executors):
        self._executors = executors

        self._task = asyncio.create_task(self.run())

    async def run(self):
        for i in range(1000000):
            start = time.time()
            print(f"Starting iteration {i}!")
            await asyncio.gather(*[self.do_calls() for _ in range(1000)])
            print(f"Iteration finished in {time.time()-start}s. Sleeping for 30s.")
            time.sleep(30)

    async def do_calls(self):
        for _ in range(100):
            result = await random.choice(self._executors).hi.remote()

executors = [Executor.remote() for _ in range(10)]
callers = [Caller.remote(executors) for _ in range(20)]

import time;time.sleep(1000000000)

With this, there’s a clear leak in the Executor actors:

Screen Shot 2023-08-23 at 3 02 25 PM