airflow: Airflow Scheduler liveness probe crashing (version 2.0)
Apache Airflow version 2.0:
Kubernetes version 1.18.14
Environment: Azure - AKS
What happened:
I have just upgraded my Airflow from 1.10.13 to 2.0. I am running it in Kubernetes (AKS Azure) with Kubernetes Executor. Unfortunately, I see my Scheduler getting killed every 15-20 mins due to Liveness probe failing. Hence my pod keeps restarting.
Liveness probe
import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys
with create_session() as session:
job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)
Scheduler logs
[2021-02-16 12:18:22,422] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,426] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,426] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,426] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,427] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,427] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,427] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,439] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-02-16 12:18:22,452] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12819)
[2021-02-16 12:18:22,460] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor490-Process' pid=12819 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,009] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12826)
[2021-02-16 12:18:23,017] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor491-Process' pid=12826 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,594] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12833)
... Many of these Disposing DB connection pool entries here
[2021-02-16 12:20:08,212] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor675-Process' pid=14146 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:08,916] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14153)
[2021-02-16 12:20:08,924] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor676-Process' pid=14153 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:09,475] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14160)
[2021-02-16 12:20:09,484] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor677-Process' pid=14160 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,044] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14167)
[2021-02-16 12:20:10,053] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor678-Process' pid=14167 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,610] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14180)
[2021-02-16 12:23:42,287] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
[2021-02-16 12:23:43,290] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,494] {process_utils.py:201} INFO - Waiting up to 5 seconds for processes to exit...
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=14180, status='terminated', started='12:20:09') (14180) terminated with exit code None
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=9286, status='terminated', exitcode=0, started='12:13:35') (9286) terminated with exit code 0
[2021-02-16 12:23:43,506] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,506] {scheduler_job.py:1296} INFO - Exited execute loop
[2021-02-16 12:23:43,523] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-02-16 12:23:43,525] {settings.py:290} DEBUG - Disposing DB connection pool (PID 7)
Scheduler deployment
---
################################
## Airflow Scheduler Deployment/StatefulSet
#################################
kind: Deployment
apiVersion: apps/v1
metadata:
name: airflow-scheduler
namespace: airflow
labels:
tier: airflow
component: scheduler
spec:
replicas: 1
selector:
matchLabels:
tier: airflow
component: scheduler
template:
metadata:
labels:
tier: airflow
component: scheduler
annotations:
cluster-autoscaler.kubernetes.io/safe-to-evict: "true"
spec:
nodeSelector:
{}
affinity:
{}
tolerations:
[]
restartPolicy: Always
terminationGracePeriodSeconds: 10
serviceAccountName: airflow-scheduler
securityContext:
runAsUser: 50000
fsGroup: 50000
initContainers:
- name: run-airflow-migrations
image: apache/airflow:2.0.0-python3.8
imagePullPolicy: IfNotPresent
# Support running against 1.10.x and 2.0.0dev/master
args: ["bash", "-c", "airflow db upgrade"]
env:
# Dynamically created environment variables
# Dynamically created secret envs
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
containers:
# Always run the main scheduler container.
- name: scheduler
image: apache/airflow:2.0.0-python3.8
imagePullPolicy: Always
args: ["bash", "-c", "exec airflow scheduler"]
env:
# Dynamically created environment variables
# Dynamically created secret envs
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: fernet-key
key: fernet-key
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: airflow-airflow-metadata
key: connection
- name: DEPENDENCIES
value: "/opt/airflow/dags/repo/dags/dependencies/"
# If the scheduler stops heartbeating for 5 minutes (10*30s) kill the
# scheduler and let Kubernetes restart it
livenessProbe:
failureThreshold: 10
periodSeconds: 30
exec:
command:
- python
- -Wignore
- -c
- |
import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'
from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys
with create_session() as session:
job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
SchedulerJob.latest_heartbeat.desc()).limit(1).first()
sys.exit(0 if job.is_alive() else 1)
resources:
{}
volumeMounts:
- name: config
mountPath: /opt/airflow/pod_templates/pod_template_file.yaml
subPath: pod_template_file.yaml
readOnly: true
- name: logs
mountPath: "/opt/airflow/logs"
- name: config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
- name: dags
mountPath: /opt/airflow/dags
- name: logs-conf
mountPath: "/opt/airflow/config/log_config.py"
subPath: log_config.py
readOnly: true
- name: logs-conf-ini
mountPath: "/opt/airflow/config/__init__.py"
subPath: __init__.py
readOnly: true
- name: git-sync
image: "k8s.gcr.io/git-sync:v3.1.6"
securityContext:
runAsUser: 65533
env:
- name: GIT_SYNC_REV
value: "HEAD"
- name: GIT_SYNC_BRANCH
value: "master"
- name: GIT_SYNC_REPO
value: HIDDEN
- name: GIT_SYNC_DEPTH
value: "1"
- name: GIT_SYNC_ROOT
value: "/git"
- name: GIT_SYNC_DEST
value: "repo"
- name: GIT_SYNC_ADD_USER
value: "true"
- name: GIT_SYNC_WAIT
value: "60"
- name: GIT_SYNC_MAX_SYNC_FAILURES
value: "0"
- name: GIT_SYNC_USERNAME
valueFrom:
secretKeyRef:
name: 'codecommit-key'
key: username
- name: GIT_SYNC_PASSWORD
valueFrom:
secretKeyRef:
name: 'codecommit-key'
key: password
volumeMounts:
- name: dags
mountPath: /git
# Always start the garbage collector sidecar.
- name: scheduler-gc
image: apache/airflow:2.0.0-python3.8
imagePullPolicy: Always
args: ["bash", "/clean-logs"]
volumeMounts:
- name: logs
mountPath: "/opt/airflow/logs"
- name: logs-conf
mountPath: "/opt/airflow/config/log_config.py"
subPath: log_config.py
readOnly: true
- name: logs-conf-ini
mountPath: "/opt/airflow/config/__init__.py"
subPath: __init__.py
readOnly: true
volumes:
- name: config
configMap:
name: airflow-airflow-config
- name: dags
emptyDir: {}
- name: logs
emptyDir: {}
- name: logs-conf
configMap:
name: airflow-airflow-config
- name: logs-conf-ini
configMap:
name: airflow-airflow-config
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Reactions: 3
- Comments: 23 (7 by maintainers)
I managed to fix my restart by setting up the following configs:
I have another Airflow instance running in AWS - Kubernetes. That one runs fine with any version, I realized the problem is with Azure Kubernetes, the rest api calls to the api server.
We are facing the same issue (scheduler liveness probe always failing and restarting the scheduler). Details:
Airflow: Version 1.10.14 & 1.10.13 Kubernetes: Version 1.20.2 (DigitalOcean) Helm airflow-stable/airflow: Version 7.16.0
And the logs are basically on a loop:
EDIT: Tried it on Airflow 1.10.13 and same thing. Updated versions above.
I’m seeing a similar issue when trying to run airflow on minikube.
I can reproduce it always with:
After running for a while, scheduler restarts periodically:
We are experiencing this when deploying the chart into a local installations of k3d
We also found some interesting
WARNING
’s when looking into thewait-for-airflow-migrations
container…I don’t think that Azure hooks should be interpreted by default…