airflow: Error: 'daemonic processes are not allowed to have children', after upgradeing to airflow:2.0.1

Apache Airflow version: 2.0.1

Kubernetes version (if you are using kubernetes) (use kubectl version): /

Environment:

  • OS (e.g. from /etc/os-release): Ubuntu 18.04
  • Kernel (e.g. uname -a): Linux ivan-pc 5.4.0-66-generic #74~18.04.2-Ubuntu SMP Fri Feb 5 11:17:31 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux

What happened: I am using LocalExecutor, and I was using it on Apache/Airflow 1.10.12 the same way. I mean I had one PythonOperator which runs python method which runs multiprocessing job using ProcessPoolExecutor (concurrent.futures). And on earlier version it ran successfully without any problems, but now I get this error:

[2021-03-18 15:38:37,552] {taskinstance.py:1455} ERROR - daemonic processes are not allowed to have children
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1112, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1285, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/taskinstance.py", line 1315, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 117, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py", line 128, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/home/airflow/Edmond/edmond/backend/main_parts/run_algorithm_o_book.py", line 15, in run_algorithm_o_book
    alg_o_output = run_o(k_output, capacity, OModelBook, config)
  File "/home/airflow/Edmond/edmond/models/O/model.py", line 388, in run_o
    for mid_result in executor.map(_run, args):
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 496, in map
    timeout=timeout)
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/local/lib/python3.6/concurrent/futures/_base.py", line 575, in <listcomp>
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 466, in submit
    self._start_queue_management_thread()
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 427, in _start_queue_management_thread
    self._adjust_process_count()
  File "/usr/local/lib/python3.6/concurrent/futures/process.py", line 446, in _adjust_process_count
    p.start()
  File "/usr/local/lib/python3.6/multiprocessing/process.py", line 103, in start
    'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children

What you expected to happen: I expected it to run as it was running on Airflow 1.10.12

How to reproduce it: Run airflow using docker-compose like this:

version: '3.8'
x-airflow-common:
  &airflow-common
  image: edmond_image
  env_file:
    - compose-services.env
  volumes:
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./scripts:/opt/airflow/scripts
    - ./notebooks:/home/airflow/Edmond/notebooks
    - ./data:/home/airflow/Edmond/data
  depends_on:
    - postgres
  restart: always

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8090:8080

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler

  airflow-init:
    <<: *airflow-common
    restart: on-failure
    environment:
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: 'Admin'
      _AIRFLOW_WWW_USER_PASSWORD: 'Admin'

And inside run python operator which runs ProcessPoolExecutor from concurrent.futures

Anything else we need to know: This problem occurs every time I run python operator with multiprocessing. I have searched everywhere without any luck. There seems to be a similar error when using Celery Executor but it doesn’t help (as I am using LocalExecutor) and there is no import collisions.

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 25 (17 by maintainers)

Most upvoted comments

There is a known solution to this problem. Please use billiard https://pypi.org/project/billiard/ instead of multiprocessing. It is a fork of multiprocessing internally used in Celery and it was specifically forked by Celery developers to handle similar problems and limitations of the original multiprocessing library. Since it is used by Celery - you should have it installed already, and it is 1-1 replacement to multiprocessing (literally you can do import billiard as multiprocessing).

I saw already quite a number of confirmations that it works for people with similar problems, and I am just considering making an entry in Best Practices of Airflow to cover that piece (@ashb @kaxil - you might also be interested in that 😃. This is an interesting finding I had by following up a number of issues and trying to help people.

@pmlewis, @damon09273 @ahazeemi @ivanrezic-maistra - if you can also confirm that this solution works, I’d be more confident in adding best practice for that one.

Well, I think we are talking about two different issues with “deamonic processes” and this is the source of confusion here.

  1. The original stack trace here (by @ivanrezic-maistra ) was about using PythonOperator running “multiprocessing” using LocalExecutor - and this is the one which I wrote about in the billiard context. The issue was about using multiprocessing inside your own customised Python operators - which I saw several people solved by simply using import billiard as multiprocessing in their own code.

  2. The second issue added in the same thread by @damon09273 and @ahazeemi also mentioned by @kaxil have been merged and solved and released in Airflow 2.1.2 (see milestone at https://github.com/apache/airflow/pull/16700) - but this one was about CeleryKubernetes Executor and that was not a “custom” code - it was Airflow itself that failed in this case.,

Even if the error message was the same, those two issues have very different root cause, and while they were somewhat hijacked here - the error 1) still requires the billiard importing (in the custom code) to be solved.

So answering the question @damon09273 - it’s fine for you now. but if someone tries to use local executor and do multi-processing in their custom code within Python Operator (or writes a custom operator) then biliard instead of multiprocessing used in the custom code should solve the problem. No need to change anything in celery executor.

@potiuk Thanks! import billiard as multiprocessing works like a charm.

You can also run your python code using python operator with virtualenv - that launches a new python interpreter and you can launch multiprocessing there.

@pmlewis Because execute_tasks_new_python_interpreter = True will create Python interpreter for each task so it will have performance implications – few seconds / ms atleast.

@damon09273 @ahazeemi I posted the workaround and solution for CeleryKubernetesExecutor in https://github.com/apache/airflow/issues/16326#issuecomment-870140570

It broke because of optimisations implemented in airflow that make use of multiprocessing. The best way for you to proceed will be to turn your multiprocessing jobs into separate Airflow tasks

You simply should not use multiprocessing inside Local Executor. This is likely going to break. The assertion is correct and you should likely change your approach

Ok, it works now, but what if I need assertions in my code? Why it worked in earlier versions but not in the current one? Is it going to be fixed?