ray: [Core] Using multiprocessing to start a child process in a ray worker and starting a grandchild process in the child process again causes a hang
What happened + What you expected to happen
The bug
Hi everyone. In my code logic, the child process needs to be started in the ray worker, and the child process may continue to start the grandchild process (some third-party libraries used by the child process will continue to create child processes). I’ve provided a simplified reproducible code snippet below. This code snippet runs correctly for a while (about 300-1000 iterations) and then hangs. I used eventlet as a timer in my code, and I found that after iterating for some time, the child process would timeout while waiting for the grandchild process to close and an errors are reported: "direct_actor_transport.cc:163: Check failed: objects_valid 0 1".
logs
(ForkTest pid=27120) grand_child_process pid:28889, N.6101 start
(ForkTest pid=27120) grand_child_process pid:28893, N.6201 start
(ForkTest pid=27120) child_process: Pid:28859, N.60 end
(ForkTest pid=27120) child_process: Pid:28867, N.61 end
(ForkTest pid=27120) child_process: Pid:28873, N.62 end
(ForkTest pid=27120) Traceback (most recent call last):
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 658, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 665, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 669, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 616, in ray._raylet.execute_task.function_executor
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 675, in actor_method_executor
(ForkTest pid=27120) return method(__ray_actor, *args, **kwargs)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 462, in _resume_span
(ForkTest pid=27120) return method(self, *_args, **_kwargs)
(ForkTest pid=27120) File "/root/wgt/ray_test/small_code.py", line 25, in father_process
(ForkTest pid=27120) plist[i].join()
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/process.py", line 149, in join
(ForkTest pid=27120) res = self._popen.wait(timeout)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
(ForkTest pid=27120) return self.poll(os.WNOHANG if timeout == 0.0 else 0)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
(ForkTest pid=27120) pid, sts = os.waitpid(self.pid, flag)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/green/os.py", line 96, in waitpid
(ForkTest pid=27120) greenthread.sleep(0.01)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/greenthread.py", line 36, in sleep
(ForkTest pid=27120) hub.switch()
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/hubs/hub.py", line 313, in switch
(ForkTest pid=27120) return self.greenlet.switch()
(ForkTest pid=27120) eventlet.timeout.Timeout: 10 seconds
(ForkTest pid=27120) Exception ignored in: 'ray._raylet.task_execution_handler'
(ForkTest pid=27120) Traceback (most recent call last):
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 799, in ray._raylet.task_execution_handler
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 618, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 658, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 665, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 669, in ray._raylet.execute_task
(ForkTest pid=27120) File "python/ray/_raylet.pyx", line 616, in ray._raylet.execute_task.function_executor
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/_private/function_manager.py", line 675, in actor_method_executor
(ForkTest pid=27120) return method(__ray_actor, *args, **kwargs)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/ray/util/tracing/tracing_helper.py", line 462, in _resume_span
(ForkTest pid=27120) return method(self, *_args, **_kwargs)
(ForkTest pid=27120) File "/root/wgt/ray_test/small_code.py", line 25, in father_process
(ForkTest pid=27120) plist[i].join()
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/process.py", line 149, in join
(ForkTest pid=27120) res = self._popen.wait(timeout)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 43, in wait
(ForkTest pid=27120) return self.poll(os.WNOHANG if timeout == 0.0 else 0)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/multiprocessing/popen_fork.py", line 27, in poll
(ForkTest pid=27120) pid, sts = os.waitpid(self.pid, flag)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/green/os.py", line 96, in waitpid
(ForkTest pid=27120) greenthread.sleep(0.01)
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/greenthread.py", line 36, in sleep
(ForkTest pid=27120) hub.switch()
(ForkTest pid=27120) File "/usr/local/anaconda3/envs/uniscale-py39/lib/python3.9/site-packages/eventlet/hubs/hub.py", line 313, in switch
(ForkTest pid=27120) return self.greenlet.switch()
(ForkTest pid=27120) eventlet.timeout.Timeout: 10 seconds
(ForkTest pid=27120) [2022-12-21 14:18:11,129 C 27120 27120] direct_actor_transport.cc:163: Check failed: objects_valid 0 1
(ForkTest pid=27120) *** StackTrace Information ***
(ForkTest pid=27120) ray::SpdLogMessage::Flush()
(ForkTest pid=27120) ray::RayLog::~RayLog()
(ForkTest pid=27120) ray::core::CoreWorkerDirectTaskReceiver::HandleTask()::{lambda()#1}::operator()()
(ForkTest pid=27120) std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120) ray::core::InboundRequest::Accept()
(ForkTest pid=27120) ray::core::ActorSchedulingQueue::ScheduleRequests()
(ForkTest pid=27120) ray::core::ActorSchedulingQueue::Add()
(ForkTest pid=27120) ray::core::CoreWorkerDirectTaskReceiver::HandleTask()
(ForkTest pid=27120) std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120) EventTracker::RecordExecution()
(ForkTest pid=27120) std::_Function_handler<>::_M_invoke()
(ForkTest pid=27120) boost::asio::detail::completion_handler<>::do_complete()
(ForkTest pid=27120) boost::asio::detail::scheduler::do_run_one()
(ForkTest pid=27120) boost::asio::detail::scheduler::run()
(ForkTest pid=27120) boost::asio::io_context::run()
(ForkTest pid=27120) ray::core::CoreWorker::RunTaskExecutionLoop()
(ForkTest pid=27120) ray::core::CoreWorkerProcessImpl::RunWorkerTaskExecutionLoop()
(ForkTest pid=27120) ray::core::CoreWorkerProcess::RunTaskExecutionLoop()
(ForkTest pid=27120) __pyx_pw_3ray_7_raylet_10CoreWorker_7run_task_loop()
(ForkTest pid=27120) method_vectorcall_NOARGS
(ForkTest pid=27120)
My use case
In my business scenario, I need to use pytorch’s Dataloader in ray’s worker to load training data, and generally use multi-process data loading for performance. However, subprocesses started by pytorch’s dataloader may also continue to start subprocesses, such as the TurboJPEG library. I’ve read a few ray issues related to fork , but haven’t been entirely sure what the root cause of this issue is. I’d be very grateful if someone could give some pointers on how to use multiprocessing in ray’s workers.
Versions / Dependencies
OS
CentOS Linux 7 (Core)
Hardware info
252G Mem
2 CPU with 16 core per CPU and totally 64 processor
Ray , Python and other libraries that are used.
ray 1.13.0 pypi_0 pypi
python 3.9.13 haa1d7c7_2 defaults
eventlet 0.33.2 pypi_0 pypi
conda 4.12.0
Reproduction script
import ray
import os
import time
import multiprocessing as mp
import eventlet
@ray.remote(num_cpus=32)
class ForkTest:
def father_process(self, test_count):
eventlet.monkey_patch()
ctx = mp.get_context("fork")
print("++++++++++++++++ father_process [{}] start [{}] iter ++++++++++++++++".format(os.getpid(), test_count))
plist=[]
for i in range(64):
p = ctx.Process(target=ForkTest.child_process, args=(i,))
plist.append(p)
p.start()
try:
for i in range(64):
with eventlet.Timeout(10, True):
plist[i].join()
plist[i].close()
except Exception as e :
print(e)
return False
print("++++++++++++++++ father_process [{}] end [{}] iter ++++++++++++++++".format(os.getpid(), test_count))
return True
@staticmethod
def child_process(rank):
try:
eventlet.monkey_patch()
plist=[]
for i in range(2):
p=mp.Process(target=ForkTest.grand_child_process, args=(i + 100*rank,))
plist.append(p)
p.start()
for i in range(2):
try:
with eventlet.Timeout(10, True):
plist[i].join()
plist[i].close()
except Exception as e :
print(e)
raise e
except Exception as e:
print("child_process has exception! {}".format(e))
raise e
print("child_process: Pid:{}, N.{} end".format(os.getpid(), rank))
@staticmethod
def grand_child_process(i):
a_list = [i for i in range(10000)]
b_list = [i*2 for i in range(10000)]
print("grand_child_process pid:{}, N.{} start".format(os.getpid(), i))
if __name__ == "__main__":
ray.init()
assert ray.is_initialized() == True
test_count = 0
while True:
actor = ForkTest.remote()
re = ray.get(actor.father_process.remote(test_count))
if re is False:
print("Test failed!")
break
test_count+=1
time.sleep(0.03)
ray.shutdown()
Issue Severity
High: It blocks me from completing my task.
About this issue
- Original URL
- State: open
- Created 2 years ago
- Reactions: 1
- Comments: 17 (12 by maintainers)
looks issues around core worker forking subprocess.