tensorflow: Session hang issue with python multiprocessing

Issue summary

I am having trouble allocating GPU devices for a multiprocessing pool. Please see the short code reproduction below. I would like to understand why I am getting the CUDA_ERROR_NOT_INITIALIZED error in case 4. For this case, the program hangs, and I have to stop my docker container to exit.

Minimal reproducible example

core code:

import tensorflow as tf

def run_session(device):
    gpu_options = tf.GPUOptions(allow_growth=True, visible_device_list=device)
    sess = tf.Session(config=tf.ConfigProto(gpu_options=gpu_options))
    print('Using device #%s' % device)
    a = tf.placeholder(tf.int16, name='a')
    y = tf.identity(a, name='y')
    print sess.run(y, feed_dict={a: 3})
    sess.close()
    print('Done.')

Case 1 (this works fine):

run_session('0')
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:08:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 980 Ti, pci bus id: 0000:08:00.0)
Using device #0
3
Done.

Case 2 (this works fine):

run_session('0')
run_session('1')
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:08:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 980 Ti, pci bus id: 0000:08:00.0)
Using device #0
3
Done.
W tensorflow/stream_executor/cuda/cuda_driver.cc:590] creating context when one is currently active; existing: 0x24cbbe0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:84:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 1
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 1:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 1, name: GeForce GTX 980 Ti, pci bus id: 0000:84:00.0)
Using device #1
3
Done.

Case 3 (this works fine):

import multiprocessing as mp

p = mp.Pool(2)
p.map(run_session, ['0', '1'])
p.close()
p.join()
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:84:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 1
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 1:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 1, name: GeForce GTX 980 Ti, pci bus id: 0000:84:00.0)
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:08:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 980 Ti, pci bus id: 0000:08:00.0)
Using device #1
Using device #0
3
Done.
3
Done.

Case 4 (here, the program hangs):

import multiprocessing as mp

run_session('0')
p = mp.Pool(2)
p.map(run_session, ['0', '1'])
p.close()
p.join()
I tensorflow/core/common_runtime/gpu/gpu_device.cc:885] Found device 0 with properties:
name: GeForce GTX 980 Ti
major: 5 minor: 2 memoryClockRate (GHz) 1.076
pciBusID 0000:08:00.0
Total memory: 5.97GiB
Free memory: 5.86GiB
I tensorflow/core/common_runtime/gpu/gpu_device.cc:906] DMA: 0
I tensorflow/core/common_runtime/gpu/gpu_device.cc:916] 0:   Y
I tensorflow/core/common_runtime/gpu/gpu_device.cc:975] Creating TensorFlow device (/gpu:0) -> (device: 0, name: GeForce GTX 980 Ti, pci bus id: 0000:08:00.0)
Using device #0
3
Done.
E tensorflow/stream_executor/cuda/cuda_driver.cc:1368] could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
Using device #0
E tensorflow/stream_executor/cuda/cuda_driver.cc:1368] could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED
Using device #1

Environment info

Operating System: Ubuntu 14.04.4 LTS (GNU/Linux 3.19.0-25-generic x86_64) Docker container: gcr.io/tensorflow/tensorflow:latest-devel-gpu CUDA version: 8.0.61 cuDNN version: 5.1.10

Related GitHub issues

#1578

About this issue

  • Original URL
  • State: closed
  • Created 7 years ago
  • Reactions: 49
  • Comments: 23 (1 by maintainers)

Commits related to this issue

Most upvoted comments

@suharshs Python multiprocessing works fine with tensorflow. The only thing should be noticed is that tensorflow must be imported independently inside each process (must use multiprocessing instead of multithreading since tensorflow will take over the entire process). Below is how I achieved multi-GPU and multiprocessing inferencing and I hope it helps:

import os
import multiprocessing


class Predictor(multiprocessing.Process):
    def __init__(self, input_queue, gpu_id):
        multiprocessing.Process.__init__(self)
        self.input_queue = input_queue
        self.gpu_id = gpu_id
    def run(self):
        #set GPU id before importing tensorflow!!!!!!!!!!!!!
        os.environ["CUDA_VISIBLE_DEVICES"] = "{}".format(self.gpu_id)
        #import tensorflow here
        import tensorflow as tf
        sess = tf.Session()
        print('Using device #%s' % self.gpu_id)
        a = tf.placeholder(tf.int16, name='a')
        y = tf.identity(a, name='y')
        while True:
            input = self.input_queue.get()
            if input is None:
                self.input_queue.task_done()
                print("Exiting Process %d" % self.gpu_id)
                break
            else:
                print sess.run(y, feed_dict={a: input})
                self.input_queue.task_done()
        sess.close()
        return

if __name__ == "__main__":
    jobs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    num_gpus = 2
    p_list = []
    input_queue = multiprocessing.JoinableQueue()

    for i in range(num_gpus):
        p = Predictor(input_queue, i)
        p_list.append(p)
    for p in p_list:
        p.start()
    for job in jobs:
        input_queue.put(job)
    for i in range(num_gpus):
        input_queue.put(None)

    input_queue.join()
    for p in p_list:
        p.join()

The python multiprocessing package seems to just call fork when creating a child process. This cannot work when the child process calls async code (i.e TensorFlow is multithreaded). From the posix spec for fork:

If a multi-threaded process calls fork(), the new process shall contain a replica of the calling thread and its entire address space, possibly including the states of mutexes and other resources. Consequently, to avoid errors, the child process may only execute async-signal-safe operations until such time as one of the exec functions is called.

So long story short, don’t use python multiprocessing for anything non-trivial and expect it to work 😃

Hi I had the same issue today, but this problem can be resolved by putting import tensorflow as tf inside your worker function (and the result is well parallelised).

Update: I have looked into this a bit more, and have a couple more interesting repro cases 😃 Works:

run_session('0')
run_session('0')

Hangs:

  run_session('0')
  p = mp.Process(target=run_session, args=('0'))
  p.start()
  p.join()

It looks like there is some shared python tensorflow state that interferes when a new python process is created (multiprocessing creates new python process whose state separation i am not to clear on). I plan to look into it very soon, but just wanted to provide an update in case that gives you any workarounds.

I am also running into this issue. Multiprocessing works unless I first run a session in the parent thread. I’ve tried moving the “import tensorflow” statement to the function as @Lancerchiang suggested with no luck. Below is my minimal repro with 4 test cases.

import os
import tensorflow
from multiprocessing.pool import Pool

def runInSubprocess(somearg):
    print('Training model on process id {}.'.format(os.getpid()))
    with tensorflow.Session() as sess:
        sess.run(tensorflow.global_variables_initializer())

# This Hangs:
runInSubprocess(2)
Pool(processes=2).map(runInSubprocess, [1,2])

# This works:
runInSubprocess(2)
runInSubprocess(2)

# This works:
Pool(processes=2).map(runInSubprocess, [1,2])
Pool(processes=2).map(runInSubprocess, [1,2])

# This works:
Pool(processes=2).map(runInSubprocess, [1,2])
runInSubprocess(2)

I wonder if I want get a return value from every processing, how should I do? return in method ‘run’? and how can I get this return value?

import os
import multiprocessing


class Predictor(multiprocessing.Process):
    def __init__(self, input_queue, output_queue, gpu_id):
        multiprocessing.Process.__init__(self)
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.gpu_id = gpu_id

    def run(self):
        #set GPU id before importing tensorflow!!!!!!!!!!!!!
        os.environ["CUDA_VISIBLE_DEVICES"] = "{}".format(self.gpu_id)
        #import tensorflow here
        import tensorflow as tf
        sess = tf.Session()
        print('Using device #%s' % self.gpu_id)
        a = tf.placeholder(tf.int16, name='a')
        y = tf.identity(a, name='y')
        while True:
            input = self.input_queue.get()
            if input is None:
                self.input_queue.task_done()
                print("Exiting Process %d" % self.gpu_id)
                break
            else:
                res = sess.run(y, feed_dict={a: input})
                self.input_queue.task_done()
                self.output_queue.put(res)
        sess.close()
        return

if __name__ == "__main__":
    jobs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    num_gpus = 2
    p_list = []
    input_queue = multiprocessing.JoinableQueue()
    output_queue = multiprocessing.Queue()
    for i in range(num_gpus):
        p = Predictor(input_queue, output_queue, i)
        p_list.append(p)

    for p in p_list:
        p.start()

    for job in jobs:
        input_queue.put(job)

    for i in range(num_gpus):
        input_queue.put(None)

    for i in range(num_gpus):
        print(output_queue.get())

    input_queue.join()
    
    for p in p_list:
        p.join()

@breckuh If you really need to run a tensorflow session in your parent process, my advice is that launching explicit child processes like I did above instead of using pool mapping, and import tensorflow in your parent process after you have done that in your child processes.

import os
import multiprocessing
import time

class Predictor(multiprocessing.Process):
    def __init__(self, input_queue, gpu_id):
        multiprocessing.Process.__init__(self)
        self.input_queue = input_queue
        self.gpu_id = gpu_id
    def run(self):
        #set GPU id before importing tensorflow!!
        #os.environ["CUDA_VISIBLE_DEVICES"] = "{}".format(self.gpu_id)
        import tensorflow as tf
        sess = tf.Session()
        print('Using device #%s' % self.gpu_id)
        a = tf.placeholder(tf.int16, name='a')
        y = tf.identity(a, name='y')
        while True:
            input = self.input_queue.get()
            if input is None:
                self.input_queue.task_done()
                print("Exiting Process %d" % self.gpu_id)
                break
            else:
                print sess.run(y, feed_dict={a: input})
                self.input_queue.task_done()
        sess.close()
        return

if __name__ == "__main__":
    works = [4,5]
    num_gpus = 2
    p_list = []
    input_queue = multiprocessing.JoinableQueue()

    for i in range(num_gpus):
        p = Predictor(input_queue, i)
        p_list.append(p)
    for p in p_list:
        p.start()

    time.sleep(2)

    import tensorflow as tf

    with tf.Session() as sess:
        sess.run(tf.global_variables_initializer())

    for work in works:
        input_queue.put(work)
    for i in range(num_gpus):
        input_queue.put(None)

    input_queue.join()
    for p in p_list:
        p.join()

It would give:

2018-05-11 11:01:57.844637: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.2 AVX AVX2 FMA
2018-05-11 11:01:57.844638: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.2 AVX AVX2 FMA
Using device #1
Using device #0
2018-05-11 11:01:59.207167: I tensorflow/core/platform/cpu_feature_guard.cc:137] Your CPU supports instructions that this TensorFlow binary was not compiled to use: SSE4.2 AVX AVX2 FMA
4
5
Exiting Process 1
Exiting Process 0

You can see the three tensorflow sessions finished successfully.

@mrry . I am facing the problem - “could not retrieve CUDA device count: CUDA_ERROR_NOT_INITIALIZED” due to memory issue while using django celery. Is there any work around for python multiprocessing or celery in case of distributed tensorflow with gpu.

I just hit the same issue when using celery worker to run tensorflow gpu. I this issue solved?

@suharshs Python multiprocessing works fine with tensorflow. The only thing should be noticed is that tensorflow must be imported independently inside each process (must use multiprocessing instead of multithreading since tensorflow will take over the entire process). Below is how I achieved multi-GPU and multiprocessing inferencing and I hope it helps:

import os
import multiprocessing


class Predictor(multiprocessing.Process):
    def __init__(self, input_queue, gpu_id):
        multiprocessing.Process.__init__(self)
        self.input_queue = input_queue
        self.gpu_id = gpu_id
    def run(self):
        #set GPU id before importing tensorflow!!!!!!!!!!!!!
        os.environ["CUDA_VISIBLE_DEVICES"] = "{}".format(self.gpu_id)
        #import tensorflow here
        import tensorflow as tf
        sess = tf.Session()
        print('Using device #%s' % self.gpu_id)
        a = tf.placeholder(tf.int16, name='a')
        y = tf.identity(a, name='y')
        while True:
            input = self.input_queue.get()
            if input is None:
                self.input_queue.task_done()
                print("Exiting Process %d" % self.gpu_id)
                break
            else:
                print sess.run(y, feed_dict={a: input})
                self.input_queue.task_done()
        sess.close()
        return

if __name__ == "__main__":
    jobs = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
    num_gpus = 2
    p_list = []
    input_queue = multiprocessing.JoinableQueue()

    for i in range(num_gpus):
        p = Predictor(input_queue, i)
        p_list.append(p)
    for p in p_list:
        p.start()
    for job in jobs:
        input_queue.put(job)
    for i in range(num_gpus):
        input_queue.put(None)

    input_queue.join()
    for p in p_list:
        p.join()

That would be very slow when inference, since import TensorFlow and load model consume seconds

Thanks @Lancerchiang that makes sense. I don’t actually know if we’ll ever have this use case in practice, it only came up because our test suite was failing when certain tests were run in different orders. Then we fell down a rabbit hole isolating this 😃. In the end we just had the workaround where we specifically arranged our suite to run the tests in the child processes first, and then the tests in the parent processes after. Not ideal but good enough for now. What I would like to do is add a line or two to check if this hang might hit and then Throw/Alert the user, so no one is left hanging.