mpire: enable_insights=True failed to work on UNIX when start method is not fork
Hi Mr. Jansen @sybrenjansen , I’m currently having serious issue with the enable_insights=True parameter when using it in the initialization of the pool object as mpire.WorkerPool(n_jobs=25, daemon=False, start_method='forkserver'), and then within the with mpire.WorkerPool() as pool:, I followed the documentation of mpire with insights = pool.get_insights() and print(insights). The error message is really weird: one time I got cannot pickle 'weakref.ReferenceType' object, but most of the time, the whole Python code just quits very quickly (even when I put the mpire.WorkerPool() statement inside the try...except clause to catch the potential Exception, nothing was caught).
Could you please help investigate this bug? The system I ran the code for enable_insights=True is UNIX on Jenkins, and I’m surprised to see how misleading the documentation is if this is a bug.
About this issue
- Original URL
- State: closed
- Created 8 months ago
- Comments: 34 (14 by maintainers)
Using fork with fork should be absolutely fine. The documentation doesn’t advice against it, does it? And you cannot use threading on the parent process and then fork. Believe me, I tested it. Creating processes is not thread-safe and it will break everything 😃
It shouldn’t as far as I’m aware.
You can use a manager, no problem. That is usually the way to go when you need your processes to write results to the same object. However, managers are really slow. Data needs to be serialized and send to a server process, which unserializes it and sequentially adds the data. So there’s also a lot of locking required. Even if you only read from it. If you don’t need writing, I would advice using a normal list/dict and passing it on the processes using mpire’s
shared_objectsSo, to get this straight. You have your main process in which you created 10 workers (using fork), and each worker creates 6 threads. If you get
Child 9 failsthen that should mean the exception is happening within one of the fork-workers.When an error is triggered from one of the threads, it is passed on to the parent process, which will then raise it on behalf of the child process. But the
StopWorkerone is happening in the forked process.The
timeoutparameter shouldn’t matter. Also,daemonis ignored when using threading as start method. It only applies to the other start methods.If pulp has trouble with regular multiprocessing, it will likely have the same issues with mpire as mpire is using parts of Python’s multiprocessing library under the hood
Please don’t ping random people on GitHub
Have a look at https://stackoverflow.com/questions/8640367/manager-dict-in-multiprocessing.
What I suspect is that you’re updating the dict values like so:
This doesn’t trigger a data update on the Manager’s side, because you’re updating a mutable object inside the dict and the Manager has no way of knowing you updated that object (list in this case). What you need to do instead is:
This will trigger a change because
__setitem__will now be called on the dict object and you will get output similar to:However, notice that we’re missing some values here. This is because we now introduced a racing condition. If multiple workers are reading/writing to the same idx at the same time, this can happen. You can circumvent that with a lock
Now it works, yay! But the downside of this approach is that this is now slower than without multiprocessing because of the locking.
This issue of assigning/locking only happens when you’re updating mutable dictionary values. You can remove the locking if you can guarantee two workers won’t read/write to the same key at the same time. Or, if that’s not possible, create as many locks as keys in your dictionary and only use the lock for the specific key. Then you should only have a small overhead of acquiring a lock, but multiple workers should still be able to update different keys at the same time.
I haven’t experimented with that. Daemon does the following (taken from the multiprocessing docs):
So choose based on what you need.
You can use
os.getpid()to get the current process ID. You can also get the parent process ID by usingos.getppid(). This should give you unique identifiers for each worker.It still shouldn’t happen though. So I’m still curious to see a working example 😃
If you want that you should use a manager. You can import it yourself and pass it on to the workers like so:
This outputs
[0, 1, 4, 2, 3, 5, 8, 6, 7, 9]on my machine. But the order will be different each run.I cannot convert every list/dict someone passes on as shared objects to a manager object as I can’t infer if the user wants it to be writable or not. As managers are slow, I will also not simply assume it is so.
No, the exception you saw shouldn’t happen. So I’m still curious to see the example and try it for myself to see what’s going on
The timeout is in seconds, yes. However, you should never see a
StopWorkerexception in your terminal, as that is used internally only.You can email me at [github-username]@gmail.com
Can you provide a minimal working example that gives this error? Without exactly knowing what your code looks like it’s really hard to debug
No problem, happy to help.
To answer your question, as long as you are sure all locks are released before starting forking processes, it shouldn’t be a problem.
The problems start when you have a thread in your main process that holds a lock and then start forking. A child process then inherits an acquired lock which cannot be released anymore because the thread that holds it is not inherited. So whenever you do something with that lock again it will deadlock because it cannot acquire an already acquired lock.
Using fork is only unsafe when using threading/locks before starting the child processes with fork. So if you don’t have a thread running in your main process that holds a lock, you should be fine to use fork in the main process and create child processes with it. Using threading after fork should be fine.
MPIRE first creates all workers and starts them (i.e. calls
process.start()). But it doesn’t wait for the workers to be started. One worker can already start a task while another is still starting. It could be that your console output uses a buffer forstdoutand outputs every x time, giving the impression that mpire waits/syncs.See the following output:
On my machine this prints:
The order of shared objects doesn’t matter, each worker will get the same shared objects. However, modifying shared objects might not work like you think it would. They are not mutable in a multiprocessing context. When one process makes modifications, it does so on a copy.
E.g., see the following example:
This will output:
As you can see, only the shared objects for one worker are updated (worker
0in this case).When using threading as start method, this will work, however. The output, with the same code (but changing start method) is then:
If a process should see the changes another one is making (and you’re not using threading), you need to make use of some process communication primitive. Depending on your data type, either using
multiprocessing.Value,multiprocessing.Arrayormultiprocessing.Manager.Child processes are created on the same machine, so they can access the same disks/files no problem. The difference with fork/forkserver/spawn is with the memory (RAM). Fork processes can access stuff that is already loaded in RAM by the main process at the point the forking happened. Forkserver/spawn processes can not and you have to reload stuff in memory.
Example:
This will output:
For spawn the output will be the same. However, for fork, you get:
This is because forked processes reuse memory. For forkserver and spawn get a fresh start.
Hope this helps.
I haven’t used nested pools that often. But generally,
fork>forkserver>spawnwhen talking about the speed of creating child processes. Spawning fresh new processes is slow, hencespawnis slow.forkis fast because it simply reuses a lot of things that are already loaded in memory.forkserveris faster thanspawnbecause you only spawn one new process and then fork from there (so it’s a hybrid).However,
forkalso can have its downsides when doing stuff with locks and threading. I.e., it’s safer to useforkserverorspawn, but if you don’t do any of that tricky stuff thenforkis usually faster and more convenient.This works fine on my end in an ipython terminal:
This prints
Note that I’ve used mpire with
start_method='threading'. You won’t have to use aThreadPoolExecutorfromconcurrent.futuresif you have mpire 😉But you can if you want, by changing the
jobfunction:Regarding the progress bar, newlines can appear when you have not closed a previous progress bar correctly.
I get the same issue when using forkserver or spawn on linux. It works fine on fork though.
I see that it has to do with the syncmanager that’s not pickleable. I will need to update the
WorkerInsightsclass and implement get/set state accordingly.I’ll work on it when I have the time. In the mean time, if you can use
fork, which will fix your issue. If not, then this temporary workaround should work (although it will disable showing the arguments passed on to the longest running tasks):This will make mpire run as if it would run on Windows. On that platform it disables the use of the syncmanager and should make your problem go away.