mpire: enable_insights=True failed to work on UNIX when start method is not fork

Hi Mr. Jansen @sybrenjansen , I’m currently having serious issue with the enable_insights=True parameter when using it in the initialization of the pool object as mpire.WorkerPool(n_jobs=25, daemon=False, start_method='forkserver'), and then within the with mpire.WorkerPool() as pool:, I followed the documentation of mpire with insights = pool.get_insights() and print(insights). The error message is really weird: one time I got cannot pickle 'weakref.ReferenceType' object, but most of the time, the whole Python code just quits very quickly (even when I put the mpire.WorkerPool() statement inside the try...except clause to catch the potential Exception, nothing was caught).

Could you please help investigate this bug? The system I ran the code for enable_insights=True is UNIX on Jenkins, and I’m surprised to see how misleading the documentation is if this is a bug.

About this issue

  • Original URL
  • State: closed
  • Created 8 months ago
  • Comments: 34 (14 by maintainers)

Commits related to this issue

Most upvoted comments

I know this is impossible based on the mpire’s documentation, but Is it possible at this point to use fork in both parent and child processes? Or threading on the parent process and then fork on the child process?

Using fork with fork should be absolutely fine. The documentation doesn’t advice against it, does it? And you cannot use threading on the parent process and then fork. Believe me, I tested it. Creating processes is not thread-safe and it will break everything 😃

Also, if I potentially encounter maximum recursion depth exceeded, would that invoke the ExceptionError above?

It shouldn’t as far as I’m aware.

if I want to have a shared_object among all the processes, can I do that by specifying those objects via multiprocessing.Manager().list()/dict(), or do I have to turn on the shared_objects=([list of parameters]) in mpire.WorkerPool()?

You can use a manager, no problem. That is usually the way to go when you need your processes to write results to the same object. However, managers are really slow. Data needs to be serialized and send to a server process, which unserializes it and sequentially adds the data. So there’s also a lot of locking required. Even if you only read from it. If you don’t need writing, I would advice using a normal list/dict and passing it on the processes using mpire’s shared_objects

When I tried printing out the Worker_ID to see which worker caused that error I got a very strange number, as I only used 6 threads per each parent process (and I have 10 of them):

So, to get this straight. You have your main process in which you created 10 workers (using fork), and each worker creates 6 threads. If you get Child 9 fails then that should mean the exception is happening within one of the fork-workers.

When an error is triggered from one of the threads, it is passed on to the parent process, which will then raise it on behalf of the child process. But the StopWorker one is happening in the forked process.

I think there are only two possible causes: first is the parameter timeout=0.1, and second is the fact that daemon=True was the default value when applying threading for the child process (the parent process has daemon=False)? Do you think I should set daemon=False for the child process, or if I should change timeout=0.1 to something else?

The timeout parameter shouldn’t matter. Also, daemon is ignored when using threading as start method. It only applies to the other start methods.

If pulp has trouble with regular multiprocessing, it will likely have the same issues with mpire as mpire is using parts of Python’s multiprocessing library under the hood

Please don’t ping random people on GitHub

Have a look at https://stackoverflow.com/questions/8640367/manager-dict-in-multiprocessing.

What I suspect is that you’re updating the dict values like so:

x[idx].append(some_value)

This doesn’t trigger a data update on the Manager’s side, because you’re updating a mutable object inside the dict and the Manager has no way of knowing you updated that object (list in this case). What you need to do instead is:

x[idx] = x[idx] + [some_value]

This will trigger a change because __setitem__ will now be called on the dict object and you will get output similar to:

In [19]: def opt_month(worker_id, shared_dict, t):
    ...:     print(worker_id, t)
    ...:     shared_dict[0] = shared_dict[0] + [t]
    ...: 

In [20]: x = m.dict()

In [21]: x[0] = []

In [22]: with WorkerPool(n_jobs=4, shared_objects=x, pass_worker_id=True, start_method='fork', daemon=False) as pool:
    ...:     pool.map(opt_month, range(10))
    ...: 
0 0
1 1
2 2
0 4
0 8
2 6
1 5
2 9
3 3
3 7

In [23]: x[0]
Out[23]: [0, 4, 1, 6, 5, 9, 3, 7]

However, notice that we’re missing some values here. This is because we now introduced a racing condition. If multiple workers are reading/writing to the same idx at the same time, this can happen. You can circumvent that with a lock

In [33]: def opt_month(worker_id, shared_objects, t):
    ...:     shared_dict, shared_lock = shared_objects
    ...:     with shared_lock:
    ...:         shared_dict[0] = shared_dict[0] + [t]
    ...: 

In [33]: l = m.Lock()

In [35]: x[0] = []

In [36]: with WorkerPool(n_jobs=4, shared_objects=(x, l), pass_worker_id=True, start_method='fork', daemon=False) as pool:
    ...:     pool.map(opt_month, range(10))
    ...: 

In [37]: x[0]
Out[37]: [0, 1, 2, 4, 5, 8, 6, 9, 3, 7]

Now it works, yay! But the downside of this approach is that this is now slower than without multiprocessing because of the locking.

This issue of assigning/locking only happens when you’re updating mutable dictionary values. You can remove the locking if you can guarantee two workers won’t read/write to the same key at the same time. Or, if that’s not possible, create as many locks as keys in your dictionary and only use the lock for the specific key. Then you should only have a small overhead of acquiring a lock, but multiple workers should still be able to update different keys at the same time.

Do you recommend setting both daemon=False with fork and fork?

I haven’t experimented with that. Daemon does the following (taken from the multiprocessing docs):

When a process exits, it attempts to terminate all of its daemonic child processes.

Note that a daemonic process is not allowed to create child processes. Otherwise a daemonic process would leave its children orphaned if it gets terminated when its parent process exits.

So choose based on what you need.

Is there a way to know which worker belongs to a parent process, and which worker belongs to the child process, mainly for debugging purposes, given that they might have the same Worker_ID

You can use os.getpid() to get the current process ID. You can also get the parent process ID by using os.getppid(). This should give you unique identifiers for each worker.

I got a workaround solution for mpire.Exception.StopWorker by using the try…except clause.

It still shouldn’t happen though. So I’m still curious to see a working example 😃

Is it possible that you can add a functionality for mpire to have the shared_objects of list/dict/tuple being written over time by multiple processes?

If you want that you should use a manager. You can import it yourself and pass it on to the workers like so:

import multiprocessing as mp
from mpire import WorkerPool

def job(shared_list, x):
    shared_list.append(x)

def main():
    shared_list = mp.Manager().list()
    with WorkerPool(4, shared_objects=shared_list) as pool:
        pool.map(job, range(10))
    print(shared_list)

This outputs [0, 1, 4, 2, 3, 5, 8, 6, 7, 9] on my machine. But the order will be different each run.

I cannot convert every list/dict someone passes on as shared objects to a manager object as I can’t infer if the user wants it to be writable or not. As managers are slow, I will also not simply assume it is so.

No, the exception you saw shouldn’t happen. So I’m still curious to see the example and try it for myself to see what’s going on

The timeout is in seconds, yes. However, you should never see a StopWorker exception in your terminal, as that is used internally only.

You can email me at [github-username]@gmail.com

Can you provide a minimal working example that gives this error? Without exactly knowing what your code looks like it’s really hard to debug

No problem, happy to help.

To answer your question, as long as you are sure all locks are released before starting forking processes, it shouldn’t be a problem.

The problems start when you have a thread in your main process that holds a lock and then start forking. A child process then inherits an acquired lock which cannot be released anymore because the thread that holds it is not inherited. So whenever you do something with that lock again it will deadlock because it cannot acquire an already acquired lock.

since I need use muti-thread within multi-cores, I think my best option is forkserver then?

Using fork is only unsafe when using threading/locks before starting the child processes with fork. So if you don’t have a thread running in your main process that holds a lock, you should be fine to use fork in the main process and create child processes with it. Using threading after fork should be fine.

Also, I noticed something interesting when tracking the Console Output on Jenkins: the message seems to be printed out at around 1- 1.5 hour interval. Does it mean the workers are waiting for each other to complete the task at hand, before they start communicating with each other and printing out stuffs? I was thinking that they would not wait for each other and just do the task assigned to them independently?

MPIRE first creates all workers and starts them (i.e. calls process.start()). But it doesn’t wait for the workers to be started. One worker can already start a task while another is still starting. It could be that your console output uses a buffer for stdout and outputs every x time, giving the impression that mpire waits/syncs.

See the following output:

from mpire import WorkerPool

def w_init(worker_id):
    print(worker_id, "started")

def job(worker_id, x):
    print(worker_id, "got task", x)
    return x * 2

with WorkerPool(10, pass_worker_id=True) as pool:
    pool.map(job, range(10), worker_init=w_init)

On my machine this prints:

0 started
1 started
2 started
3 started
0 got task 0
1 got task 1
4 started
5 started
2 got task 2
6 started
3 got task 3
4 got task 4
5 got task 5
7 started
6 got task 6
8 started
9 started
7 got task 7
8 got task 8
9 got task 9

Also, if I need to have some tuples of shared_objects that would be modified by one parent process and reused by another parent worker, could I just simply turned on shared_objects = ([args_1, args_2]) in the with WorkerPool clause? Does the order of shared_objects parameter relative to the other parameter in WorkerPool() function matter?

The order of shared objects doesn’t matter, each worker will get the same shared objects. However, modifying shared objects might not work like you think it would. They are not mutable in a multiprocessing context. When one process makes modifications, it does so on a copy.

E.g., see the following example:

def job(worker_id, shared_objects, x):
     args_1, args_2 = shared_objects
     print(worker_id, args_1)
 
     # Other processes won't notice any changes in args
     if worker_id == 0:
         args_1.append(3)
 
if __name__ == "__main__":
    args_1, args_2 = [1], [2]
    with WorkerPool(4, pass_worker_id=True, shared_objects=[args_1, args_2]) as pool:
        pool.map(job, range(10))

This will output:

0 [1]
1 [1]
2 [1]
0 [1, 3]
3 [1]
1 [1]
2 [1]
0 [1, 3, 3]
3 [1]
1 [1]

As you can see, only the shared objects for one worker are updated (worker 0 in this case).

When using threading as start method, this will work, however. The output, with the same code (but changing start method) is then:

0 [1]
1 [1, 3]
3 [1, 3]
0 [1, 3]
2 [1, 3, 3]
1 [1, 3, 3]
3 [1, 3, 3]
0 [1, 3, 3]
2 [1, 3, 3, 3]
1 [1, 3, 3, 3]

If a process should see the changes another one is making (and you’re not using threading), you need to make use of some process communication primitive. Depending on your data type, either using multiprocessing.Value, multiprocessing.Array or multiprocessing.Manager.

does the forkserver method in mpire actually allows workers to access (i.e, read & write) the same local files that are saved on the disk? If not, can you elaborate on what it can actually copy?

Child processes are created on the same machine, so they can access the same disks/files no problem. The difference with fork/forkserver/spawn is with the memory (RAM). Fork processes can access stuff that is already loaded in RAM by the main process at the point the forking happened. Forkserver/spawn processes can not and you have to reload stuff in memory.

Example:

DATA = None

def job(x):
    print(DATA)

if __name__ == "__main__":
    DATA = "some string"
    with WorkerPool(4, start_method="forkserver") as pool:
        pool.map(job, range(5))

This will output:

None
None
None
None
None

For spawn the output will be the same. However, for fork, you get:

some string
some string
some string
some string
some string

This is because forked processes reuse memory. For forkserver and spawn get a fresh start.

Hope this helps.

I haven’t used nested pools that often. But generally, fork > forkserver > spawn when talking about the speed of creating child processes. Spawning fresh new processes is slow, hence spawn is slow. fork is fast because it simply reuses a lot of things that are already loaded in memory. forkserver is faster than spawn because you only spawn one new process and then fork from there (so it’s a hybrid).

However, fork also can have its downsides when doing stuff with locks and threading. I.e., it’s safer to use forkserver or spawn, but if you don’t do any of that tricky stuff then fork is usually faster and more convenient.

This works fine on my end in an ipython terminal:

from mpire import WorkerPool

def foo(x, y):
    return x + y * 2

def job(x):
     with WorkerPool(n_jobs=4, start_method='threading') as pool:
         return pool.map(foo, [(x, y) for y in range(20)])

if __name__ == "__main__":
    with WorkerPool(4, start_method="forkserver", daemon=False) as pool:
        results = pool.map(job, range(20), progress_bar=True)
        for result in results:
            print(result)

This prints

[0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38]
[1, 3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39]
[2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40]
[3, 5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41]
[4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42]
[5, 7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43]
[6, 8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44]
[7, 9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45]
[8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46]
[9, 11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47]
[10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48]
[11, 13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49]
[12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50]
[13, 15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51]
[14, 16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52]
[15, 17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53]
[16, 18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54]
[17, 19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55]
[18, 20, 22, 24, 26, 28, 30, 32, 34, 36, 38, 40, 42, 44, 46, 48, 50, 52, 54, 56]
[19, 21, 23, 25, 27, 29, 31, 33, 35, 37, 39, 41, 43, 45, 47, 49, 51, 53, 55, 57]

Note that I’ve used mpire with start_method='threading'. You won’t have to use a ThreadPoolExecutor from concurrent.futures if you have mpire 😉

But you can if you want, by changing the job function:

from concurrent.futures import ThreadPoolExecutor

def job(x):
     with ThreadPoolExecutor(4) as executor:
         futures = [executor.submit(foo, x, y) for y in range(20)]
         return [future.result() for future in futures]

Regarding the progress bar, newlines can appear when you have not closed a previous progress bar correctly.

I get the same issue when using forkserver or spawn on linux. It works fine on fork though.

I see that it has to do with the syncmanager that’s not pickleable. I will need to update the WorkerInsights class and implement get/set state accordingly.

I’ll work on it when I have the time. In the mean time, if you can use fork, which will fix your issue. If not, then this temporary workaround should work (although it will disable showing the arguments passed on to the longest running tasks):

from mpire import WorkerPool
from unittest.mock import patch
import time


def foo(i):
    time.sleep(1)
    return i * 2

if __name__ == "__main__":
    with patch('mpire.insights.RUNNING_WINDOWS', new=True), WorkerPool(25, daemon=False, start_method="forkserver", enable_insights=True) as pool:
        results = pool.map(foo, range(100), progress_bar=True)
        pool.print_insights()

This will make mpire run as if it would run on Windows. On that platform it disables the use of the syncmanager and should make your problem go away.