airflow: Airflow can't import DAG in UI and logs, but manual DAG trigger works

Hi, I have a very strange and specific behaviour of Airflow on AWS EKS cluster after deploying Calico to enforce network policies. I have also created AWS support case, but I also need support from Airflow team. I will be very appreciated for any help. What happened: I have Airflow set-up running as 2 k8s pods (Airflow webserver and scheduler). Both Airflow pods use git-sync sidecar container to get DAGs from git and store it at k8s emptyDir volume. All works well on fresh EKS cluster without errors. But at the moment of deploing Calico https://docs.aws.amazon.com/eks/latest/userguide/calico.html to EKS cluster all DAGs with local imports become broken. Airflow has default k8s Network policy which allow all ingress/egress traffic without restrictions, and Airflow UI is accessible. But in the Airflow there is a message DAG "helloWorld" seems to be missing. and Airflow webserver became to generate an error in the logs:

[2020-07-08 14:43:38,784] {__init__.py:51} INFO - Using executor SequentialExecutor                                                                          │
│ [2020-07-08 14:43:38,784] {dagbag.py:396} INFO - Filling up the DagBag from /usr/local/airflow/dags/repo                                                     │
│ [2020-07-08 14:43:38,785] {dagbag.py:225} DEBUG - Importing /usr/local/airflow/dags/repo/airflow_dags/dag_test.py                                            │
│ [2020-07-08 14:43:39,016] {dagbag.py:239} ERROR - Failed to import: /usr/local/airflow/dags/repo/airflow_dags/dag_test.py                                    │
│ Traceback (most recent call last):                                                                                                                           │
│   File "/usr/local/lib/python3.7/site-packages/airflow/models/dagbag.py", line 236, in process_file                                                          │
│     m = imp.load_source(mod_name, filepath)                                                                                                                  │
│   File "/usr/local/lib/python3.7/imp.py", line 171, in load_source                                                                                           │
│     module = _load(spec)                                                                                                                                     │
│   File "<frozen importlib._bootstrap>", line 696, in _load                                                                                                   │
│   File "<frozen importlib._bootstrap>", line 677, in _load_unlocked                                                                                          │
│   File "<frozen importlib._bootstrap_external>", line 728, in exec_module                                                                                    │
│   File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed                                                                               │
│   File "/usr/local/airflow/dags/repo/airflow_dags/dag_test.py", line 5, in <module>                                                                          │
│     from airflow_dags.common import DEFAULT_ARGS                                                                                                             │
│ ModuleNotFoundError: No module named 'airflow_dags'

The DAG itself consists of 2 files: dag_test.py and common.py. Content of the files are: common.py

from datetime import datetime, timedelta

DEFAULT_ARGS = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2020, 3, 26),
    'retry_delay': timedelta(minutes=1),
}

dag_test.py

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

from airflow_dags.common import DEFAULT_ARGS

dag = DAG('helloWorld', schedule_interval='*/5 * * * *', default_args=DEFAULT_ARGS)

t1 = BashOperator(
    task_id='task_1',
    bash_command='echo "Hello World from Task 1"; sleep 30',
    dag=dag
)

What I have already tried at the webserver and scheduler pods:

  • ssh to Airflow pod and enter Python shell. All imports work fine, for example:
airflow@airflow-webserver-78bc695cc7-l7z9s:~$ pwd
/usr/local/airflow
airflow@airflow-webserver-78bc695cc7-l7z9s:~$ python
Python 3.7.4 (default, Oct 17 2019, 06:10:02)
[GCC 8.3.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> from airflow_dags.common import DEFAULT_ARGS
>>> print(DEFAULT_ARGS)
{'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.datetime(2020, 3, 26, 0, 0), 'retry_delay': datetime.timedelta(seconds=60)}
>>>
  • from pod bash shell, I can execute airflow command and list_tasks, and DAG is not broken:
airflow@airflow-webserver-78bc695cc7-l7z9s:~$ airflow list_tasks helloWorld
[2020-07-08 15:37:24,309] {settings.py:212} DEBUG - Setting up DB connection pool (PID 275)
[2020-07-08 15:37:24,310] {settings.py:253} DEBUG - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=275
[2020-07-08 15:37:24,366] {cli_action_loggers.py:42} DEBUG - Adding <function default_action_log at 0x7fb9b5a4f710> to pre execution callback
[2020-07-08 15:37:24,817] {cli_action_loggers.py:68} DEBUG - Calling callbacks: [<function default_action_log at 0x7fb9b5a4f710>]
[2020-07-08 15:37:24,847] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-07-08 15:37:24,848] {dagbag.py:396} INFO - Filling up the DagBag from /usr/local/airflow/dags/repo
[2020-07-08 15:37:24,849] {dagbag.py:225} DEBUG - Importing /usr/local/airflow/dags/repo/airflow_dags/dag_test.py
[2020-07-08 15:37:25,081] {dagbag.py:363} DEBUG - Loaded DAG <DAG: helloWorld>
[2020-07-08 15:37:25,082] {dagbag.py:225} DEBUG - Importing /usr/local/airflow/dags/repo/airflow_dags/dagbg_add.py
task_1
[2020-07-08 15:37:25,083] {cli_action_loggers.py:86} DEBUG - Calling callbacks: []
[2020-07-08 15:37:25,083] {settings.py:278} DEBUG - Disposing DB connection pool (PID 275)

airflow@airflow-webserver-78bc695cc7-l7z9s:~$ airflow trigger_dag helloWorld
[2020-07-08 15:50:25,446] {settings.py:212} DEBUG - Setting up DB connection pool (PID 717)
[2020-07-08 15:50:25,446] {settings.py:253} DEBUG - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=717
[2020-07-08 15:50:25,502] {cli_action_loggers.py:42} DEBUG - Adding <function default_action_log at 0x7fe05c254710> to pre execution callback
[2020-07-08 15:50:25,986] {cli_action_loggers.py:68} DEBUG - Calling callbacks: [<function default_action_log at 0x7fe05c254710>]
[2020-07-08 15:50:26,024] {__init__.py:51} INFO - Using executor SequentialExecutor
[2020-07-08 15:50:26,024] {dagbag.py:396} INFO - Filling up the DagBag from /usr/local/airflow/dags/repo/airflow_dags/dag_test.py
[2020-07-08 15:50:26,024] {dagbag.py:225} DEBUG - Importing /usr/local/airflow/dags/repo/airflow_dags/dag_test.py
[2020-07-08 15:50:26,253] {dagbag.py:363} DEBUG - Loaded DAG <DAG: helloWorld>
Created <DagRun helloWorld @ 2020-07-08 15:50:26+00:00: manual__2020-07-08T15:50:26+00:00, externally triggered: True>
[2020-07-08 15:50:26,289] {cli_action_loggers.py:86} DEBUG - Calling callbacks: []
[2020-07-08 15:50:26,289] {settings.py:278} DEBUG - Disposing DB connection pool (PID 717)

To summarise: Airflow DAGs which has local imports become broken in UI and in webserver logs, but is executable from a manual trigger when using EKS cluster with Calico network policies.

Please help me to understand why Airflow DAGs imports become broken in UI.

Apache Airflow version: 1.10.10 Kubernetes version (if you are using kubernetes) (use kubectl version):

Server Version: version.Info{Major:"1", Minor:"15+", GitVersion:"v1.15.11-eks-af3caf", GitCommit:"af3caf6136cd355f467083651cc1010a499f59b1", GitTreeState:"clean", BuildDate:"2020-03-27T21:51:36Z", GoVersion:"go1.12.17", Compiler:"gc", Platform:"linux/amd64"}

Environment:

  • Cloud provider or hardware configuration: AWS, EKS
  • OS (e.g. from /etc/os-release): EKS workers nodes, EC2 instances:
NAME="Amazon Linux"
VERSION="2"
ID="amzn"
ID_LIKE="centos rhel fedora"
VERSION_ID="2"
PRETTY_NAME="Amazon Linux 2"
ANSI_COLOR="0;33"
CPE_NAME="cpe:2.3:o:amazon:amazon_linux:2"
HOME_URL="https://amazonlinux.com/"

Docker image with installed Airflow:

PRETTY_NAME="Debian GNU/Linux 10 (buster)"
NAME="Debian GNU/Linux"
VERSION_ID="10"
VERSION="10 (buster)"
VERSION_CODENAME=buster
ID=debian
HOME_URL="https://www.debian.org/"
SUPPORT_URL="https://www.debian.org/support"
BUG_REPORT_URL="https://bugs.debian.org/"
  • Kernel (e.g. uname -a):
Linux airflow-webserver-78bc695cc7-dmzh2 4.14.181-140.257.amzn2.x86_64 #1 SMP Wed May 27 02:17:36 UTC 2020 x86_64 GNU/Linux
  • Install tools: we use pipenv to install Airflow to system pipenv install --system --deploy --clear
  • Others:

How to reproduce it: Create EKS cluster and deploy Calico. Use DAG with local imports.

Anything else we need to know: I have all required env, such as AIRFLOW_HOME=/usr/local/airflow, AIRFLOW_DAGS_FOLDER=/usr/local/airflow/dags/repo, PYTHONPATH=/usr/local/airflow/dags/repo and on EKS cluster without network policies all works fine.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 20 (9 by maintainers)

Most upvoted comments

@BobasB Hi, I met the same situation and want to share some information. I did some experiments

  • new module, referred(imported) by a existed DAG, in a directory existed before airflow started -> import succeed
  • new module, referred(imported) by a existed DAG, in a directory created after airflow started -> import failure
  • new DAG file in a directory existed before airflow started -> import succeed
  • new DAG file in a directory created after airflow started -> import succeed

For all these directories and files are under path {AIRFLOW_HOME}/dags

In my opinion, DAG files can be scanned as single python scripts by airflow, but modules can not be.

Hopefully, it can help you.

Hi, For those who interested in this issue. I have solved it for myself, and for short, the problem was init-container with git-sync for Webserver. In my deployment, I have Scheduler with init + side-car containers for git-sync, and for Webserver only side-car container for git-sync.

  • Without network policies, git-sync container starts faster than Airflow Webserver and Airflow see this PATH for import and can import.
  • When network policies were applied to Airflow, Airflow starts faster than side-car container (git-sync) and does NOT saw PATH for imports. (Also, if I start another Airflow Webserver with another PID/k8s port in already running container it works correctly).

By adding git-sync init container to Webserver and Scheduler, dags volume always will be initialized before Airflow starts.

Maybe it is Airflow bug, but the problem is: when Airflow starts and PATH doesn’t exist it will never be visible for import in future.

@fernhtls The issue is fixed by PR#16339 which is already merged and waiting for being released

Thanks for the reply @potiuk I’m sure that there are no issues with permissions. The reason of the problem is how airflow imports DAGs and modules. Looks like it’s a race between git-sync and scheduler containers. I have added delay to scheduler container here and it solved the issue

args: ["bash", "-c", "sleep 60 && exec airflow scheduler"]

Of course it is very dirty hack and the issue should be fixed somehow else, for example by adding git-sync container to initContainers.