airflow: Error: 'daemonic processes are not allowed to have children', after upgradeing to airflow:2.0.1
Apache Airflow version: 2.0.1
Kubernetes version (if you are using kubernetes) (use kubectl version
): /
Environment:
- OS (e.g. from /etc/os-release): Ubuntu 18.04
- Kernel (e.g.
uname -a
): Linux ivan-pc 5.4.0-66-generic #74~18.04.2-Ubuntu SMP Fri Feb 5 11:17:31 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
What happened: I am using LocalExecutor, and I was using it on Apache/Airflow 1.10.12 the same way. I mean I had one PythonOperator which runs python method which runs multiprocessing job using ProcessPoolExecutor (concurrent.futures). And on earlier version it ran successfully without any problems, but now I get this error:
[2021-03-18 15:38:37,552] {taskinstance.py:1455} ERROR - daemonic processes are not allowed to have children
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
return_value = self.execute_callable()
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/home/airflow/Edmond/edmond/backend/main_parts/run_algorithm_o_book.py", line 15, in run_algorithm_o_book
alg_o_output = run_o(k_output, capacity, OModelBook, config)
File "/home/airflow/Edmond/edmond/models/O/model.py", line 388, in run_o
for mid_result in executor.map(_run, args):
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 496, in map
timeout=timeout)
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in map
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
fs = [self.submit(fn, *args) for args in zip(*iterables)]
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
self._start_queue_management_thread()
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
self._adjust_process_count()
File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
p.start()
File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
What you expected to happen: I expected it to run as it was running on Airflow 1.10.12
How to reproduce it: Run airflow using docker-compose like this:
version: '3.8'
x-airflow-common:
&airflow-common
image: edmond_image
env_file:
- compose-services.env
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./scripts:/opt/airflow/scripts
- ./notebooks:/home/airflow/Edmond/notebooks
- ./data:/home/airflow/Edmond/data
depends_on:
- postgres
restart: always
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8090:8080
airflow-scheduler:
<<: *airflow-common
command: scheduler
airflow-init:
<<: *airflow-common
restart: on-failure
environment:
_AIRFLOW_DB_UPGRADE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: 'Admin'
_AIRFLOW_WWW_USER_PASSWORD: 'Admin'
And inside run python operator which runs ProcessPoolExecutor from concurrent.futures
Anything else we need to know: This problem occurs every time I run python operator with multiprocessing. I have searched everywhere without any luck. There seems to be a similar error when using Celery Executor but it doesn’t help (as I am using LocalExecutor) and there is no import collisions.
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 25 (17 by maintainers)
There is a known solution to this problem. Please use
billiard
https://pypi.org/project/billiard/ instead ofmultiprocessing
. It is a fork of multiprocessing internally used in Celery and it was specifically forked by Celery developers to handle similar problems and limitations of the original multiprocessing library. Since it is used by Celery - you should have it installed already, and it is 1-1 replacement to multiprocessing (literally you can doimport billiard as multiprocessing
).I saw already quite a number of confirmations that it works for people with similar problems, and I am just considering making an entry in Best Practices of Airflow to cover that piece (@ashb @kaxil - you might also be interested in that 😃. This is an interesting finding I had by following up a number of issues and trying to help people.
@pmlewis, @damon09273 @ahazeemi @ivanrezic-maistra - if you can also confirm that this solution works, I’d be more confident in adding best practice for that one.
Well, I think we are talking about two different issues with “deamonic processes” and this is the source of confusion here.
The original stack trace here (by @ivanrezic-maistra ) was about using PythonOperator running “multiprocessing” using LocalExecutor - and this is the one which I wrote about in the
billiard
context. The issue was about using multiprocessing inside your own customised Python operators - which I saw several people solved by simply usingimport billiard as multiprocessing
in their own code.The second issue added in the same thread by @damon09273 and @ahazeemi also mentioned by @kaxil have been merged and solved and released in Airflow 2.1.2 (see milestone at https://github.com/apache/airflow/pull/16700) - but this one was about CeleryKubernetes Executor and that was not a “custom” code - it was Airflow itself that failed in this case.,
Even if the error message was the same, those two issues have very different root cause, and while they were somewhat hijacked here - the error 1) still requires the
billiard
importing (in the custom code) to be solved.So answering the question @damon09273 - it’s fine for you now. but if someone tries to use local executor and do multi-processing in their custom code within Python Operator (or writes a custom operator) then
biliard
instead ofmultiprocessing
used in the custom code should solve the problem. No need to change anything in celery executor.@potiuk Thanks!
import billiard as multiprocessing
works like a charm.You can also run your python code using python operator with virtualenv - that launches a new python interpreter and you can launch multiprocessing there.
@pmlewis Because
execute_tasks_new_python_interpreter = True
will create Python interpreter for each task so it will have performance implications – few seconds / ms atleast.@damon09273 @ahazeemi I posted the workaround and solution for
CeleryKubernetesExecutor
in https://github.com/apache/airflow/issues/16326#issuecomment-870140570It broke because of optimisations implemented in airflow that make use of multiprocessing. The best way for you to proceed will be to turn your multiprocessing jobs into separate Airflow tasks
You simply should not use multiprocessing inside Local Executor. This is likely going to break. The assertion is correct and you should likely change your approach
Ok, it works now, but what if I need assertions in my code? Why it worked in earlier versions but not in the current one? Is it going to be fixed?