airflow: celery_executor becomes stuck if child process receives signal before reset_signals is called

Apache Airflow version: 1.10.13 onwards (Any version that picked up #11278, including Airflow 2.0.* and 2.1.*)

Environment:

  • Cloud provider or hardware configuration: Any
  • OS (e.g. from /etc/os-release): Only tested on Debian Linux, but others may be affected too
  • Kernel (e.g. uname -a): Any
  • Install tools: Any
  • Others: Only celery_executor is affected

What happened: This was first reported here. airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks with this last line in the log. This happen at random times, a few times a week. Happens more often if the scheduler machine is slow.

{scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15

The problem is that sometimes the machine is slow, reset_signals() of one or more slow child processes is not yet called before other child processes send SIGTERM when they exit. As a result, the slow child processes respond to the SIGTERM as if they are the main scheduler process. Thus we see the Exiting gracefully upon receiving signal 15 in the scheduler log. Since the probability of this happening is very low, this issue is really difficult to reproduce reliably in production.

Related to #7935 Most likely caused by #11278

What you expected to happen: Scheduler should not become stuck

How to reproduce it:

Here’s a small reproducing example of the problem. There’s roughly 1/25 chance it will be stuck. Run it many times to see it happen.

#!/usr/bin/env python3.8
import os
import random
import signal
import time
from multiprocessing import Pool


def send_task_to_executor(arg):
    pass


def _exit_gracefully(signum, frame):
    print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")


def register_signals():
    print(f"{os.getpid()} register_signals()")
    signal.signal(signal.SIGINT, _exit_gracefully)
    signal.signal(signal.SIGTERM, _exit_gracefully)
    signal.signal(signal.SIGUSR2, _exit_gracefully)


def reset_signals():
    if random.randint(0, 500) == 0:
        # This sleep statement here simulates the machine being busy
        print(f"{os.getpid()} is slow")
        time.sleep(0.1)
    signal.signal(signal.SIGINT, signal.SIG_DFL)
    signal.signal(signal.SIGTERM, signal.SIG_DFL)
    signal.signal(signal.SIGUSR2, signal.SIG_DFL)


if __name__ == "__main__":
    register_signals()

    task_tuples_to_send = list(range(20))
    sync_parallelism = 15
    chunksize = 5

    with Pool(processes=sync_parallelism, initializer=reset_signals) as pool:
        pool.map(
            send_task_to_executor,
            task_tuples_to_send,
            chunksize=chunksize,
        )


The reproducing example above can become stuck with a py-spy dump that looks exactly like what airflow scheduler does:

py-spy dump for the parent airflow scheduler process

Python v3.8.7

Thread 0x7FB54794E740 (active): "MainThread"
    poll (multiprocessing/popen_fork.py:27)
    wait (multiprocessing/popen_fork.py:47)
    join (multiprocessing/process.py:149)
    _terminate_pool (multiprocessing/pool.py:729)
    __call__ (multiprocessing/util.py:224)
    terminate (multiprocessing/pool.py:654)
    __exit__ (multiprocessing/pool.py:736)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:331)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

py-spy dump for the child airflow scheduler process

Python v3.8.7

Thread 16232 (idle): "MainThread"
    __enter__ (multiprocessing/synchronize.py:95)
    get (multiprocessing/queues.py:355)
    worker (multiprocessing/pool.py:114)
    run (multiprocessing/process.py:108)
    _bootstrap (multiprocessing/process.py:315)
    _launch (multiprocessing/popen_fork.py:75)
    __init__ (multiprocessing/popen_fork.py:19)
    _Popen (multiprocessing/context.py:277)
    start (multiprocessing/process.py:121)
    _repopulate_pool_static (multiprocessing/pool.py:326)
    _repopulate_pool (multiprocessing/pool.py:303)
    __init__ (multiprocessing/pool.py:212)
    Pool (multiprocessing/context.py:119)
    _send_tasks_to_celery (airflow/executors/celery_executor.py:330)
    _process_tasks (airflow/executors/celery_executor.py:272)
    trigger_tasks (airflow/executors/celery_executor.py:263)
    heartbeat (airflow/executors/base_executor.py:158)
    _run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
    _execute (airflow/jobs/scheduler_job.py:1284)
    run (airflow/jobs/base_job.py:237)
    scheduler (airflow/cli/commands/scheduler_command.py:63)
    wrapper (airflow/utils/cli.py:89)
    command (airflow/cli/cli_parser.py:48)
    main (airflow/__main__.py:40)
    <module> (airflow:8)

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 6
  • Comments: 16 (11 by maintainers)

Commits related to this issue

Most upvoted comments

Just how slow does it have to be to happen? We can probably guard this by closing of the current pid when we register them, and checking that the signal is received by the same pid

Hi, @ashb it’s not clear to me how slow it must be exactly for this to happen. It looks like as long as some child processes are a fraction of a second slower than the others, they easily get into a deadlock when a SIGTERM is received. So even a transient slowness of a beefy machine can cause this to happen.

Here’s what I tried so far. Only the last method seems to fix the issue completely (i.e. we have to stop using multiprocessing.Pool):

  • Tried to reset the signal handler to signal.SIG_DFL in register_signals if the current process is a child process. This doesn’t help because the child process inherits the parent’s signal handler when it’s forked. Still hangs occasionally.
  • Tried to make _exit_gracefully a no-op if the current process is a child process. This isn’t sufficient. Still hangs occasionally.
  • Tried to change multiprocessing to use “spawn” instead of “fork” like some people suggested on the internet, it greatly reduced the chance of this issue happening. However, after running the reproducing example about 8000 times, it still happened. So it doesn’t fix the issue completely.
  • Replace multiprocessing.Pool with concurrent.futures.process.ProcessPoolExecutor. Once this is done, the reproducing example no longer hangs even after running it tens of thousands times.. So I put up PR #15989 which fixes the issue using this method.

From experience, multiprocessing.Pool is notorious for causing mysterious hangs like these. Using ProcessPoolExecutor does not cause the same problems. It has similar interface and uses similar underlying libraries. I don’t understand exactly why it fixes the issue, but in practice it always seems to help.