ray: Ray starts too many workers (and may crash) when using nested remote functions.

This is very similar to the earlier issue #231. One proposed solution was implemented by @stephanie-wang in https://github.com/ray-project/ray/pull/425.

Users sometimes encounter variants of the following bug and have no idea what is going wrong.

Running the following workload requires about 500 workers to be started (to execute all of the g tasks which are blocked in the call to ray.get before the f tasks start getting executed.

import ray
ray.init()

@ray.remote
def f():
    return 1

@ray.remote
def g():
    return sum(ray.get([f.remote() for _ in range(10)]))

ray.get([g.remote() for _ in range(500)])

Workarounds:

  • Start fewer g tasks
  • Divide g into two parts, e.g.,
    @ray.remote
    def g_part_a():
        return [f.remote() for _ in range(10)]
    
    @ray.remote
    def g_part_b(*results):
        return sum(results)
    
    intermediate = ray.get([g_part_a.remote() for _ in range(500)])
    ray.get([g_part_b.remote(*ids) for ids in intermediate])
    
  • Use custom resources to constrain the number of g tasks running concurrently (suggested by @ericl).

Potential Solutions:

  • Make the scheduler prioritize the f tasks over the g tasks (e.g., the strategy in #425 or some sort of LIFO policy.

In the meantime, we can easily detect that we’ve started way too many workers and push a warning to the user with a link to some possible workaround.

cc @stephanie-wang @ericl

About this issue

  • Original URL
  • State: closed
  • Created 6 years ago
  • Reactions: 1
  • Comments: 19 (2 by maintainers)

Most upvoted comments

Hi is this issue improved in the latest ray verisions (e.g. 1.1.0)?

@nmayhall-vt there is something a little subtle happening here.

If you are doing something like

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(data):
    x = ray.get(data[0])
    y = ray.get(data[1])

    # Use x and y

process.remote([x_id, y_id])

Then you should be able to avoid the issue by doing

x_id = ray.put(x)
y_id = ray.put(y)

@ray.remote
def process(x, y):
    # Use x and y

process.remote(x_id, y_id)

The reason is that when you call ray.get inside of a remote function, Ray will treat the task as “not using any resources” until ray.get returns, and so will potentially schedule additional tasks, which may require additional workers to be created.do this,

Does that make sense?

Also, is it giving the warning and then crashing? Or is it giving the warning and then succeeding?

I’m looking into using Ray to simplify the parallelization of the training/predicting of a multilevel model in Python. I’ve encountered this warning in what seems like a fairly simple example as shown below:

import ray


ray.init(num_cpus=6)


@ray.remote
class Foo:
    def __init__(self, x):
        self.x = x

    def calc(self, y):
        return self.x + y

    def adjust_x(self, new_x):
        self.x = new_x

class MultiFoo:
    def __init__(self, num):
        self.foos = [Foo.remote(i) for i in range(num)]

    def calc(self, y):
        return ray.get([foo.calc.remote(y) for foo in self.foos])

    def adjust_foos(self, new_xs):
        results = [
          foo.adjust_x.remote(new_x)
          for foo, new_x in zip(self.foos, new_xs)
        ]
        ray.get(results)

Now if I instantiate some examples of MultiFoo, I can get the warning as mention above:

In [4]: multifoos = MultiFoo(6)

In [5]: multifoos = MultiFoo(5)

In [6]: multifoos = MultiFoo(8)

2019-02-18 16:49:57,841	ERROR worker.py:1632 -- WARNING: 18 workers have been started. This could be a result of using a large number of actors, or it could be a consequence of using nested tasks (see https://github.com/ray-project/ray/issues/3644) for some a discussion of workarounds.

However, I can sometimes instantiate many (e.g., 10) such MultiFoo(18) objects very quickly, and bind them to the same variable, and have no such warning pop up.

As this is the issue linked in the warning, I’m posting my question here. Am I doing something incorrectly? Is this something I can safely ignore?

Is this warning actually a problem if I intend to create a bunch of workers? I’m using ray to parallelize writing something to disk frequently, and don’t care about the return value. The calls seem to be succeeding, but the warning message keeps appearing so I’m worried that there is something I’m missing (e.g., maybe some of the processes aren’t actually succeeding and I’m just missing them).

Thanks!

To the best of my knowledge, their is no such argument that can pass into ray.remote() to control the total number of worker, just like what we do with: tune.run(..., config={'num_worker': ..}

Am I right?

@PMende, the reason you might not be seeing the error when you create many MultiFoo objects and bind them to the same variable is that the previous MultiFoo objects are going out of scope and destructing the Foo actors that they created (the destructed actor processes are killed and so there are never too many workers started at one time).

When all of the MultiFoo objects are still in scope, then all of the actors must be kept alive simultaneously. Each actor is a separate process, and there are limits to how many processes you can start simultaneously. For example, if you check ulimit -n`, that is one upper bound. For this reason, you can’t start an arbitrarily large number of actors on a single machine. So if you’re script is running fine, then yes you can ignore this warning, but if things crash, it will probably be because too many processes were started simultaneously.