airflow: celery_executor becomes stuck if child process receives signal before reset_signals is called
Apache Airflow version: 1.10.13 onwards (Any version that picked up #11278, including Airflow 2.0.* and 2.1.*)
Environment:
- Cloud provider or hardware configuration: Any
- OS (e.g. from /etc/os-release): Only tested on Debian Linux, but others may be affected too
- Kernel (e.g.
uname -a
): Any - Install tools: Any
- Others: Only celery_executor is affected
What happened: This was first reported here. airflow-scheduler sometimes stops heartbeating and stops scheduling any tasks with this last line in the log. This happen at random times, a few times a week. Happens more often if the scheduler machine is slow.
{scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
The problem is that sometimes the machine is slow, reset_signals()
of one or more slow child processes is not yet called before other child processes send SIGTERM
when they exit. As a result, the slow child processes respond to the SIGTERM
as if they are the main scheduler process. Thus we see the Exiting gracefully upon receiving signal 15
in the scheduler log. Since the probability of this happening is very low, this issue is really difficult to reproduce reliably in production.
Related to #7935 Most likely caused by #11278
What you expected to happen: Scheduler should not become stuck
How to reproduce it:
Here’s a small reproducing example of the problem. There’s roughly 1/25 chance it will be stuck. Run it many times to see it happen.
#!/usr/bin/env python3.8
import os
import random
import signal
import time
from multiprocessing import Pool
def send_task_to_executor(arg):
pass
def _exit_gracefully(signum, frame):
print(f"{os.getpid()} Exiting gracefully upon receiving signal {signum}")
def register_signals():
print(f"{os.getpid()} register_signals()")
signal.signal(signal.SIGINT, _exit_gracefully)
signal.signal(signal.SIGTERM, _exit_gracefully)
signal.signal(signal.SIGUSR2, _exit_gracefully)
def reset_signals():
if random.randint(0, 500) == 0:
# This sleep statement here simulates the machine being busy
print(f"{os.getpid()} is slow")
time.sleep(0.1)
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGUSR2, signal.SIG_DFL)
if __name__ == "__main__":
register_signals()
task_tuples_to_send = list(range(20))
sync_parallelism = 15
chunksize = 5
with Pool(processes=sync_parallelism, initializer=reset_signals) as pool:
pool.map(
send_task_to_executor,
task_tuples_to_send,
chunksize=chunksize,
)
The reproducing example above can become stuck with a py-spy dump
that looks exactly like what airflow scheduler does:
py-spy dump
for the parent airflow scheduler
process
Python v3.8.7
Thread 0x7FB54794E740 (active): "MainThread"
poll (multiprocessing/popen_fork.py:27)
wait (multiprocessing/popen_fork.py:47)
join (multiprocessing/process.py:149)
_terminate_pool (multiprocessing/pool.py:729)
__call__ (multiprocessing/util.py:224)
terminate (multiprocessing/pool.py:654)
__exit__ (multiprocessing/pool.py:736)
_send_tasks_to_celery (airflow/executors/celery_executor.py:331)
_process_tasks (airflow/executors/celery_executor.py:272)
trigger_tasks (airflow/executors/celery_executor.py:263)
heartbeat (airflow/executors/base_executor.py:158)
_run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
_execute (airflow/jobs/scheduler_job.py:1284)
run (airflow/jobs/base_job.py:237)
scheduler (airflow/cli/commands/scheduler_command.py:63)
wrapper (airflow/utils/cli.py:89)
command (airflow/cli/cli_parser.py:48)
main (airflow/__main__.py:40)
<module> (airflow:8)
py-spy dump
for the child airflow scheduler
process
Python v3.8.7
Thread 16232 (idle): "MainThread"
__enter__ (multiprocessing/synchronize.py:95)
get (multiprocessing/queues.py:355)
worker (multiprocessing/pool.py:114)
run (multiprocessing/process.py:108)
_bootstrap (multiprocessing/process.py:315)
_launch (multiprocessing/popen_fork.py:75)
__init__ (multiprocessing/popen_fork.py:19)
_Popen (multiprocessing/context.py:277)
start (multiprocessing/process.py:121)
_repopulate_pool_static (multiprocessing/pool.py:326)
_repopulate_pool (multiprocessing/pool.py:303)
__init__ (multiprocessing/pool.py:212)
Pool (multiprocessing/context.py:119)
_send_tasks_to_celery (airflow/executors/celery_executor.py:330)
_process_tasks (airflow/executors/celery_executor.py:272)
trigger_tasks (airflow/executors/celery_executor.py:263)
heartbeat (airflow/executors/base_executor.py:158)
_run_scheduler_loop (airflow/jobs/scheduler_job.py:1388)
_execute (airflow/jobs/scheduler_job.py:1284)
run (airflow/jobs/base_job.py:237)
scheduler (airflow/cli/commands/scheduler_command.py:63)
wrapper (airflow/utils/cli.py:89)
command (airflow/cli/cli_parser.py:48)
main (airflow/__main__.py:40)
<module> (airflow:8)
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 6
- Comments: 16 (11 by maintainers)
Commits related to this issue
- Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989) Fixes #15938 multiprocessing.Pool is known to often become stuck. It causes celery_executor to hang ... — committed to apache/airflow by yuqian90 3 years ago
- Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989) Fixes #15938 multiprocessing.Pool is known to often become stuck. It causes celery_executor to hang ... — committed to apache/airflow by yuqian90 3 years ago
- Fix Celery executor getting stuck randomly because of reset_signals in multiprocessing (#15989) Fixes #15938 multiprocessing.Pool is known to often become stuck. It causes celery_executor to hang ra... — committed to astronomer/airflow by yuqian90 3 years ago
Hi, @ashb it’s not clear to me how slow it must be exactly for this to happen. It looks like as long as some child processes are a fraction of a second slower than the others, they easily get into a deadlock when a SIGTERM is received. So even a transient slowness of a beefy machine can cause this to happen.
Here’s what I tried so far. Only the last method seems to fix the issue completely (i.e. we have to stop using
multiprocessing.Pool
):signal.SIG_DFL
inregister_signals
if the current process is a child process. This doesn’t help because the child process inherits the parent’s signal handler when it’s forked. Still hangs occasionally._exit_gracefully
a no-op if the current process is a child process. This isn’t sufficient. Still hangs occasionally.multiprocessing.Pool
withconcurrent.futures.process.ProcessPoolExecutor
. Once this is done, the reproducing example no longer hangs even after running it tens of thousands times.. So I put up PR #15989 which fixes the issue using this method.From experience,
multiprocessing.Pool
is notorious for causing mysterious hangs like these. UsingProcessPoolExecutor
does not cause the same problems. It has similar interface and uses similar underlying libraries. I don’t understand exactly why it fixes the issue, but in practice it always seems to help.