ray: [Core] Using multiprocessing to start a child process in a ray worker and starting a grandchild process in the child process again causes a hang

What happened + What you expected to happen

The bug

Hi everyone. In my code logic, the child process needs to be started in the ray worker, and the child process may continue to start the grandchild process (some third-party libraries used by the child process will continue to create child processes). I’ve provided a simplified reproducible code snippet below. This code snippet runs correctly for a while (about 300-1000 iterations) and then hangs. I used eventlet as a timer in my code, and I found that after iterating for some time, the child process would timeout while waiting for the grandchild process to close and an errors are reported: "direct_actor_transport.cc:163: Check failed: objects_valid 0 1".

logs

(ForkTest pid=27120) grand_child_process pid:28889, N.6101 start
(ForkTest pid=27120) grand_child_process pid:28893, N.6201 start
(ForkTest pid=27120) child_process: Pid:28859, N.60 end
(ForkTest pid=27120) child_process: Pid:28867, N.61 end
(ForkTest pid=27120) child_process: Pid:28873, N.62 end
(ForkTest pid=27120) Traceback (most recent call last):
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 658, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 665, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 669, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 616, in ray._raylet.execute_task.function_executor
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 675, in actor_method_executor
(ForkTest pid=27120)     return method(__ray_actor, *args, **kwargs)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 462, in _resume_span
(ForkTest pid=27120)     return method(self, *_args, **_kwargs)
(ForkTest pid=27120)   File "/root/wgt/ray_test/small_code.py", line 25, in father_process
(ForkTest pid=27120)     plist[i].join()
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/process.py", line 149, in join
(ForkTest pid=27120)     res = self._popen.wait(timeout)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
(ForkTest pid=27120)     return self.poll(os.WNOHANG if timeout == 0.0 else 0)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
(ForkTest pid=27120)     pid, sts = os.waitpid(self.pid, flag)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/green/os.py", line 96, in waitpid
(ForkTest pid=27120)     greenthread.sleep(0.01)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/greenthread.py", line 36, in sleep
(ForkTest pid=27120)     hub.switch()
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/hubs/hub.py", line 313, in switch
(ForkTest pid=27120)     return self.greenlet.switch()
(ForkTest pid=27120) eventlet.timeout.Timeout: 10 seconds
(ForkTest pid=27120) Exception ignored in: 'ray._raylet.task_execution_handler'
(ForkTest pid=27120) Traceback (most recent call last):
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 658, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 665, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 669, in ray._raylet.execute_task
(ForkTest pid=27120)   File "python/ray/_raylet.pyx", line 616, in ray._raylet.execute_task.function_executor
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 675, in actor_method_executor
(ForkTest pid=27120)     return method(__ray_actor, *args, **kwargs)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 462, in _resume_span
(ForkTest pid=27120)     return method(self, *_args, **_kwargs)
(ForkTest pid=27120)   File "/root/wgt/ray_test/small_code.py", line 25, in father_process
(ForkTest pid=27120)     plist[i].join()
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/process.py", line 149, in join
(ForkTest pid=27120)     res = self._popen.wait(timeout)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
(ForkTest pid=27120)     return self.poll(os.WNOHANG if timeout == 0.0 else 0)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
(ForkTest pid=27120)     pid, sts = os.waitpid(self.pid, flag)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/green/os.py", line 96, in waitpid
(ForkTest pid=27120)     greenthread.sleep(0.01)
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/greenthread.py", line 36, in sleep
(ForkTest pid=27120)     hub.switch()
(ForkTest pid=27120)   File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/hubs/hub.py", line 313, in switch
(ForkTest pid=27120)     return self.greenlet.switch()
(ForkTest pid=27120) eventlet.timeout.Timeout: 10 seconds
(ForkTest pid=27120) [2022-12-21 14:18:11,129 C 27120 27120] direct_actor_transport.cc:163:  Check failed: objects_valid 0  1
(ForkTest pid=27120) *** StackTrace Information ***
(ForkTest pid=27120)     ray::SpdLogMessage::Flush()
(ForkTest pid=27120)     ray::RayLog::~RayLog()
(ForkTest pid=27120)     ray::core::CoreWorkerDirectTaskReceiver::HandleTask()::{lambda()#1}::operator()()
(ForkTest pid=27120)     std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120)     ray::core::InboundRequest::Accept()
(ForkTest pid=27120)     ray::core::ActorSchedulingQueue::ScheduleRequests()
(ForkTest pid=27120)     ray::core::ActorSchedulingQueue::Add()
(ForkTest pid=27120)     ray::core::CoreWorkerDirectTaskReceiver::HandleTask()
(ForkTest pid=27120)     std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120)     EventTracker::RecordExecution()
(ForkTest pid=27120)     std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120)     boost::asio::detail::completion_handler<>::do_complete()
(ForkTest pid=27120)     boost::asio::detail::scheduler::do_run_one()
(ForkTest pid=27120)     boost::asio::detail::scheduler::run()
(ForkTest pid=27120)     boost::asio::io_context::run()
(ForkTest pid=27120)     ray::core::CoreWorker::RunTaskExecutionLoop()
(ForkTest pid=27120)     ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop()
(ForkTest pid=27120)     ray::core::CoreWorkerProcess::RunTaskExecutionLoop()
(ForkTest pid=27120)     __pyx_pw_3ray_7_raylet_10CoreWorker_7run_task_loop()
(ForkTest pid=27120)     method_vectorcall_NOARGS
(ForkTest pid=27120) 

My use case

In my business scenario, I need to use pytorch’s Dataloader in ray’s worker to load training data, and generally use multi-process data loading for performance. However, subprocesses started by pytorch’s dataloader may also continue to start subprocesses, such as the TurboJPEG library. I’ve read a few ray issues related to fork , but haven’t been entirely sure what the root cause of this issue is. I’d be very grateful if someone could give some pointers on how to use multiprocessing in ray’s workers.

Versions / Dependencies

OS

CentOS Linux 7 (Core)

Hardware info

252G Mem
2 CPU with 16 core per CPU and totally 64 processor

Ray , Python and other libraries that are used.

ray                       1.13.0                   pypi_0    pypi
python                    3.9.13               haa1d7c7_2    defaults
eventlet                  0.33.2                   pypi_0    pypi
conda 4.12.0

Reproduction script

import ray
import os
import time
import multiprocessing as mp
import eventlet


@ray.remote(num_cpus=32)
class ForkTest:

    def father_process(self, test_count):
        eventlet.monkey_patch()
        ctx = mp.get_context("fork")
        print("++++++++++++++++ father_process [{}] start [{}] iter ++++++++++++++++".format(os.getpid(), test_count))

        plist=[]
        for i in range(64):
            p = ctx.Process(target=ForkTest.child_process, args=(i,))
            plist.append(p)
            p.start()

        try:
            for i in range(64):
                with eventlet.Timeout(10, True):
                    plist[i].join()
                    plist[i].close()
        except Exception as e :
            print(e)
            return False

        print("++++++++++++++++ father_process [{}] end [{}] iter ++++++++++++++++".format(os.getpid(), test_count))
        return True

    @staticmethod
    def child_process(rank):
        try:
            eventlet.monkey_patch()
            plist=[]
            for i in range(2):
                p=mp.Process(target=ForkTest.grand_child_process, args=(i + 100*rank,))
                plist.append(p)
                p.start()

            for i in range(2):
                try:
                    with eventlet.Timeout(10, True):
                        plist[i].join()
                        plist[i].close()
                except Exception as e :
                    print(e)
                    raise e
        except Exception as e:
            print("child_process has exception! {}".format(e))
            raise e

        print("child_process: Pid:{}, N.{} end".format(os.getpid(), rank))

    @staticmethod
    def grand_child_process(i):
        a_list = [i for i in range(10000)]
        b_list = [i*2 for i in range(10000)]
        print("grand_child_process pid:{}, N.{} start".format(os.getpid(), i))


if __name__ == "__main__":
    ray.init()
    assert ray.is_initialized() == True
    test_count = 0
    while True:
        actor = ForkTest.remote()
        re = ray.get(actor.father_process.remote(test_count))
        if re is False:
            print("Test failed!")
            break
        test_count+=1
        time.sleep(0.03)
    ray.shutdown()

Issue Severity

High: It blocks me from completing my task.

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Reactions: 1
  • Comments: 17 (12 by maintainers)

Most upvoted comments

looks issues around core worker forking subprocess.

  • 38490 is the parent process (actor process): image
  • 385100 is the child process (fork from actor process), it already receives RPC request: image
  • those two processes listen to the same RPC port: image