airflow: "dag_id could not be found" when running airflow on KubernetesExecutor

Apache Airflow version: 2.0.0

Kubernetes version (if you are using kubernetes) (use kubectl version): v1.19.4

What happened: I get this error when try to execute tasks using kubernetes

[2021-01-14 19:39:17,628] {dagbag.py:440} INFO - Filling up the DagBag from /opt/airflow/dags/repo/bash.py
Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 216, in task_run
    dag = get_dag(args.subdir, args.dag_id)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 189, in get_dag
    'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: bash. Either the dag did not exist or it failed to parse.

What you expected to happen: get executed and terminate

How to reproduce it: deploy airflow helm chart using this values.yaml:

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
---
# Default values for airflow.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.

# User and group of airflow user
uid: 50000
gid: 50000

# Airflow home directory
# Used for mount paths
airflowHome: "/opt/airflow"

# Default airflow repository -- overrides all the specific images below
defaultAirflowRepository: apache/airflow

# Default airflow tag to deploy
defaultAirflowTag: 2.0.0


# Select certain nodes for airflow pods.
nodeSelector: { }
affinity: { }
tolerations: [ ]

# Add common labels to all objects and pods defined in this chart.
labels: { }

# Ingress configuration
ingress:
  # Enable ingress resource
  enabled: false

  # Configs for the Ingress of the web Service
  web:
    # Annotations for the web Ingress
    annotations: { }

    # The path for the web Ingress
    path: ""

    # The hostname for the web Ingress
    host: ""

    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the web Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

    # HTTP paths to add to the web Ingress before the default path
    precedingPaths: [ ]

    # Http paths to add to the web Ingress after the default path
    succeedingPaths: [ ]

  # Configs for the Ingress of the flower Service
  flower:
    # Annotations for the flower Ingress
    annotations: { }

    # The path for the flower Ingress
    path: ""

    # The hostname for the flower Ingress
    host: ""

    # configs for web Ingress TLS
    tls:
      # Enable TLS termination for the flower Ingress
      enabled: false
      # the name of a pre-created Secret containing a TLS private key and certificate
      secretName: ""

    # HTTP paths to add to the flower Ingress before the default path
    precedingPaths: [ ]

    # Http paths to add to the flower Ingress after the default path
    succeedingPaths: [ ]

# Network policy configuration
networkPolicies:
  # Enabled network policies
  enabled: false


# Extra annotations to apply to all
# Airflow pods
airflowPodAnnotations: { }

# Enable RBAC (default on most clusters these days)
rbacEnabled: true

# Airflow executor
# Options: SequentialExecutor, LocalExecutor, CeleryExecutor, KubernetesExecutor
executor: "KubernetesExecutor"

# If this is true and using LocalExecutor/SequentialExecutor/KubernetesExecutor, the scheduler's
# service account will have access to communicate with the api-server and launch pods.
# If this is true and using the CeleryExecutor, the workers will be able to launch pods.
allowPodLaunching: true

# Images
images:
  airflow:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  pod_template:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  flower:
    repository: ~
    tag: ~
    pullPolicy: IfNotPresent
  statsd:
    repository: apache/airflow
    tag: airflow-statsd-exporter-2020.09.05-v0.17.0
    pullPolicy: IfNotPresent
  redis:
    repository: redis
    tag: 6-buster
    pullPolicy: IfNotPresent
  pgbouncer:
    repository: apache/airflow
    tag: airflow-pgbouncer-2020.09.05-1.14.0
    pullPolicy: IfNotPresent
  pgbouncerExporter:
    repository: apache/airflow
    tag: airflow-pgbouncer-exporter-2020.09.25-0.5.0
    pullPolicy: IfNotPresent
  gitSync:
    repository: k8s.gcr.io/git-sync
    tag: v3.1.6
    pullPolicy: IfNotPresent

# Environment variables for all airflow containers
env:
  - name: "AIRFLOW__KUBERNETES__GIT_SYNC_RUN_AS_USER"
    value: "65533"

# Secrets for all airflow containers
secret: [ ]
# - envName: ""
#   secretName: ""
#   secretKey: ""

# Extra secrets that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
#    * key (can be templated) is the the name the secret that will be created
#    * value: an object with the standard 'data' or 'stringData' key (or both).
#          The value associated with those keys must be a string (can be templated)
extraSecrets: { }
# eg:
# extraSecrets:
#   {{ .Release.Name }}-airflow-connections:
#     data: |
#       AIRFLOW_CONN_GCP: 'base64_encoded_gcp_conn_string'
#       AIRFLOW_CONN_AWS: 'base64_encoded_aws_conn_string'
#     stringData: |
#       AIRFLOW_CONN_OTHER: 'other_conn'
#   {{ .Release.Name }}-other-secret-name-suffix: |
#     data: |
#        ...

# Extra ConfigMaps that will be managed by the chart
# (You can use them with extraEnv or extraEnvFrom or some of the extraVolumes values).
# The format is "key/value" where
#    * key (can be templated) is the the name the configmap that will be created
#    * value: an object with the standard 'data' key.
#          The value associated with this keys must be a string (can be templated)
extraConfigMaps: { }
# eg:
# extraConfigMaps:
#   {{ .Release.Name }}-airflow-variables:
#     data: |
#       AIRFLOW_VAR_HELLO_MESSAGE: "Hi!"
#       AIRFLOW_VAR_KUBERNETES_NAMESPACE: "{{ .Release.Namespace }}"

# Extra env 'items' that will be added to the definition of airflow containers
# a string is expected (can be templated).
extraEnv: ~
# eg:
# extraEnv: |
#   - name: PLATFORM
#     value: FR

# Extra envFrom 'items' that will be added to the definition of airflow containers
# A string is expected (can be templated).
extraEnvFrom: ~
# eg:
# extraEnvFrom: |
#   - secretRef:
#       name: '{{ .Release.Name }}-airflow-connections'
#   - configMapRef:
#       name: '{{ .Release.Name }}-airflow-variables'

# Airflow database config
data:
  # If secret names are provided, use those secrets
  metadataSecretName: ~
  resultBackendSecretName: ~

  # Otherwise pass connection values in
  metadataConnection:
    user: postgres
    pass: postgres
    host: ~
    port: 5432
    db: postgres
    sslmode: disable
  resultBackendConnection:
    user: postgres
    pass: postgres
    host: ~
    port: 5432
    db: postgres
    sslmode: disable

# Fernet key settings
fernetKey: ~
fernetKeySecretName: ~


# In order to use kerberos you need to create secret containing the keytab file
# The secret name should follow naming convention of the application where resources are
# name {{ .Release-name }}-<POSTFIX>. In case of the keytab file, the postfix is "kerberos-keytab"
# So if your release is named "my-release" the name of the secret should be "my-release-kerberos-keytab"
#
# The Keytab content should be available in the "kerberos.keytab" key of the secret.
#
#  apiVersion: v1
#  kind: Secret
#  data:
#    kerberos.keytab: <base64_encoded keytab file content>
#  type: Opaque
#
#
#  If you have such keytab file you can do it with similar
#
#  kubectl create secret generic {{ .Release.name }}-kerberos-keytab --from-file=kerberos.keytab
#
kerberos:
  enabled: false
  ccacheMountPath: '/var/kerberos-ccache'
  ccacheFileName: 'cache'
  configPath: '/etc/krb5.conf'
  keytabPath: '/etc/airflow.keytab'
  principal: 'airflow@FOO.COM'
  reinitFrequency: 3600
  config: |
    # This is an example config showing how you can use templating and how "example" config
    # might look like. It works with the test kerberos server that we are using during integration
    # testing at Apache Airflow (see `scripts/ci/docker-compose/integration-kerberos.yml` but in
    # order to make it production-ready you must replace it with your own configuration that
    # Matches your kerberos deployment. Administrators of your Kerberos instance should
    # provide the right configuration.

    [logging]
    default = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_libs.log"
    kdc = "FILE:{{ template "airflow_logs_no_quote" . }}/kerberos_kdc.log"
    admin_server = "FILE:{{ template "airflow_logs_no_quote" . }}/kadmind.log"

    [libdefaults]
    default_realm = FOO.COM
    ticket_lifetime = 10h
    renew_lifetime = 7d
    forwardable = true

    [realms]
    FOO.COM = {
      kdc = kdc-server.foo.com
      admin_server = admin_server.foo.com
    }

# Airflow Worker Config
workers:
  # Number of airflow celery workers in StatefulSet
  replicas: 1

  # Allow KEDA autoscaling.
  # Persistence.enabled must be set to false to use KEDA.
  keda:
    enabled: false
    namespaceLabels: { }

    # How often KEDA polls the airflow DB to report new scale requests to the HPA
    pollingInterval: 5

    # How many seconds KEDA will wait before scaling to zero.
    # Note that HPA has a separate cooldown period for scale-downs
    cooldownPeriod: 30

    # Maximum number of workers created by keda
    maxReplicaCount: 10

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 100Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:
    # Execute init container to chown log directory.
    # This is currently only needed in KinD, due to usage
    # of local-path provisioner.
    fixPermissions: false

  kerberosSidecar:
    # Enable kerberos sidecar
    enabled: false

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # Grace period for tasks to finish after SIGTERM is sent from kubernetes
  terminationGracePeriodSeconds: 600

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true
  # Annotations to add to worker kubernetes service account.
  serviceAccountAnnotations: { }
  # Mount additional volumes into worker.
  extraVolumes: [ ]
  extraVolumeMounts: [ ]

# Airflow scheduler settings
scheduler:
  # Airflow 2.0 allows users to run multiple schedulers,
  # However this feature is only recommended for MySQL 8+ and Postgres
  replicas: 1
  # Scheduler pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # This setting can overwrite
  # podMutation settings.
  airflowLocalSettings: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

  # Annotations to add to scheduler kubernetes service account.
  serviceAccountAnnotations: { }

  # Mount additional volumes into scheduler.
  extraVolumes: [ ]
  extraVolumeMounts: [ ]

# Airflow webserver settings
webserver:
  allowPodLogReading: true
  livenessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5

  readinessProbe:
    initialDelaySeconds: 15
    timeoutSeconds: 30
    failureThreshold: 20
    periodSeconds: 5

  # Number of webservers
  replicas: 1

  # Additional network policies as needed
  extraNetworkPolicies: [ ]

  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # Create initial user.
  defaultUser:
    enabled: true
    role: Admin
    username: admin
    email: admin@example.com
    firstName: admin
    lastName: user
    password: admin

  # Mount additional volumes into webserver.
  extraVolumes: [ ]
  #    - name: airflow-ui
  #      emptyDir: { }
  extraVolumeMounts: [ ]
  #    - name: airflow-ui
  #      mountPath: /opt/airflow

  # This will be mounted into the Airflow Webserver as a custom
  # webserver_config.py. You can bake a webserver_config.py in to your image
  # instead
  webserverConfig: ~
  # webserverConfig: |
  #   from airflow import configuration as conf

  #   # The SQLAlchemy connection string.
  #   SQLALCHEMY_DATABASE_URI = conf.get('core', 'SQL_ALCHEMY_CONN')

  #   # Flask-WTF flag for CSRF
  #   CSRF_ENABLED = True

  service:
    type: NodePort
    ## service annotations
    annotations: { }

  # Annotations to add to webserver kubernetes service account.
  serviceAccountAnnotations: { }

# Flower settings
flower:
  # Additional network policies as needed
  extraNetworkPolicies: [ ]
  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  # A secret containing the connection
  secretName: ~

  # Else, if username and password are set, create secret from username and password
  username: ~
  password: ~

  service:
    type: ClusterIP

# Statsd settings
statsd:
  enabled: true
  # Additional network policies as needed
  extraNetworkPolicies: [ ]
  resources: { }
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi

  service:
    extraAnnotations: { }

# Pgbouncer settings
pgbouncer:
  # Enable pgbouncer
  enabled: false
  # Additional network policies as needed
  extraNetworkPolicies: [ ]

  # Pool sizes
  metadataPoolSize: 10
  resultBackendPoolSize: 5

  # Maximum clients that can connect to pgbouncer (higher = more file descriptors)
  maxClientConn: 100

  # Pgbouner pod disruption budget
  podDisruptionBudget:
    enabled: false

    # PDB configuration
    config:
      maxUnavailable: 1

  # Limit the resources to pgbouncerExported.
  # When you specify the resource request the scheduler uses this information to decide which node to place
  # the Pod on. When you specify a resource limit for a Container, the kubelet enforces those limits so
  # that the running container is not allowed to use more of that resource than the limit you set.
  # See: https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/
  # Example:
  #
  # resource:
  #   limits:
  #     cpu: 100m
  #     memory: 128Mi
  #   requests:
  #     cpu: 100m
  #     memory: 128Mi
  resources: { }

  service:
    extraAnnotations: { }

  # https://www.pgbouncer.org/config.html
  verbose: 0
  logDisconnections: 0
  logConnections: 0

  sslmode: "prefer"
  ciphers: "normal"

  ssl:
    ca: ~
    cert: ~
    key: ~

redis:
  terminationGracePeriodSeconds: 600

  persistence:
    # Enable persistent volumes
    enabled: true
    # Volume size for worker StatefulSet
    size: 1Gi
    # If using a custom storageClass, pass name ref to all statefulSets here
    storageClassName:

  resources: { }
  #  limits:
  #   cpu: 100m
  #   memory: 128Mi
  #  requests:
  #   cpu: 100m
  #   memory: 128Mi

  # If set use as redis secret
  passwordSecretName: ~
  brokerURLSecretName: ~

  # Else, if password is set, create secret with it,
  # else generate a new one on install
  password: ~

  # This setting tells kubernetes that its ok to evict
  # when it wants to scale a node down.
  safeToEvict: true

# Auth secret for a private registry
# This is used if pulling airflow images from a private registry
registry:
  secretName: ~

  # Example:
  # connection:
  #   user: ~
  #   pass: ~
  #   host: ~
  #   email: ~
  connection: { }

# Elasticsearch logging configuration
elasticsearch:
  # Enable elasticsearch task logging
  enabled: true
  # A secret containing the connection
  #  secretName: ~
  # Or an object representing the connection
  # Example:
  connection:
    #     user:
    #     pass:
    host: elasticsearch-master-headless.elk.svc.cluster.local
    port: 9200
#  connection: {}


# All ports used by chart
ports:
  flowerUI: 5555
  airflowUI: 8080
  workerLogs: 8793
  redisDB: 6379
  statsdIngest: 9125
  statsdScrape: 9102
  pgbouncer: 6543
  pgbouncerScrape: 9127

# Define any ResourceQuotas for namespace
quotas: { }

# Define default/max/min values for pods and containers in namespace
limits: [ ]

# This runs as a CronJob to cleanup old pods.
cleanup:
  enabled: false
  # Run every 15 minutes
  schedule: "*/15 * * * *"

# Configuration for postgresql subchart
# Not recommended for production
postgresql:
  enabled: true
  postgresqlPassword: postgres
  postgresqlUsername: postgres

# Config settings to go into the mounted airflow.cfg
#
# Please note that these values are passed through the `tpl` function, so are
# all subject to being rendered as go templates. If you need to include a
# literal `{{` in a value, it must be expressed like this:
#
#    a: '{{ "{{ not a template }}" }}'
#
# yamllint disable rule:line-length
config:
  core:
    dags_folder: '{{ include "airflow_dags" . }}'
    load_examples: 'False'
    executor: '{{ .Values.executor }}'
    # For Airflow 1.10, backward compatibility
    colored_console_log: 'True'
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
  # Authentication backend used for the experimental API
  api:
    auth_backend: airflow.api.auth.backend.deny_all
  logging:
    remote_logging: '{{- ternary "True" "False" .Values.elasticsearch.enabled }}'
    colored_console_log: 'True'
    logging_level: INFO
  metrics:
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
  webserver:
    enable_proxy_fix: 'True'
    expose_config: 'True'
    rbac: 'True'
  celery:
    default_queue: celery
  scheduler:
    scheduler_heartbeat_sec: 5
    # For Airflow 1.10, backward compatibility
    statsd_on: '{{ ternary "True" "False" .Values.statsd.enabled }}'
    statsd_port: 9125
    statsd_prefix: airflow
    statsd_host: '{{ printf "%s-statsd" .Release.Name }}'
    # Restart Scheduler every 41460 seconds (11 hours 31 minutes)
    # The odd time is chosen so it is not always restarting on the same "hour" boundary
    run_duration: 41460
  elasticsearch:
    json_format: 'True'
    log_id_template: "{dag_id}_{task_id}_{execution_date}_{try_number}"
  elasticsearch_configs:
    max_retries: 3
    timeout: 30
    retry_timeout: 'True'
  kerberos:
    keytab: '{{ .Values.kerberos.keytabPath }}'
    reinit_frequency: '{{ .Values.kerberos.reinitFrequency }}'
    principal: '{{ .Values.kerberos.principal }}'
    ccache: '{{ .Values.kerberos.ccacheMountPath }}/{{ .Values.kerberos.ccacheFileName }}'
  kubernetes:
    namespace: '{{ .Release.Namespace }}'
    airflow_configmap: '{{ include "airflow_config" . }}'
    airflow_local_settings_configmap: '{{ include "airflow_config" . }}'
    pod_template_file: '{{ include "airflow_pod_template_file" . }}/pod_template_file.yaml'
    worker_container_repository: '{{ .Values.images.airflow.repository | default .Values.defaultAirflowRepository }}'
    worker_container_tag: '{{ .Values.images.airflow.tag | default .Values.defaultAirflowTag }}'
    delete_worker_pods: 'False'
    multi_namespace_mode: '{{ if .Values.multiNamespaceMode }}True{{ else }}False{{ end }}'
# yamllint enable rule:line-length

multiNamespaceMode: false

podTemplate:

# Git sync
dags:
  persistence:
    # Enable persistent volume for storing dags
    enabled: false
    # Volume size for dags
    size: 1Gi
    # If using a custom storageClass, pass name here
    storageClassName: gp2
    # access mode of the persistent volume
    accessMode: ReadWriteMany
    ## the name of an existing PVC to use
    existingClaim: "airflow-dags"
  gitSync:
    enabled: true
    repo: git@github.com:Tikna-inc/airflow.git
    branch: main
    rev: HEAD
    root: "/git"
    dest: "repo"
    depth: 1
    maxFailures: 0
    subPath: ""
    sshKeySecret: airflow-ssh-secret
    wait: 60
    containerName: git-sync
    uid: 65533

and this is the dag with its tasks



from datetime import timedelta

import requests
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

logging.getLogger().setLevel(level=logging.INFO)

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}


def get_active_customers():
    requests.get("localhost:8080")


dag = DAG(
    'bash',
    default_args=default_args,
    description='A simple test DAG',
    schedule_interval='*/2 * * * *',
    start_date=days_ago(1),
    tags=['Test'],
    is_paused_upon_creation=False,
    catchup=False
)

t1 = BashOperator(
    task_id='print_date',
    bash_command='mkdir ./itsMe',
    dag=dag
)

t1

This is airflow.cfg file

[api]
auth_backend = airflow.api.auth.backend.deny_all

[celery]
default_queue = celery

[core]
colored_console_log = True
dags_folder = /opt/airflow/dags/repo/
executor = KubernetesExecutor
load_examples = False
remote_logging = False

[elasticsearch]
json_format = True
log_id_template = {dag_id}_{task_id}_{execution_date}_{try_number}

[elasticsearch_configs]
max_retries = 3
retry_timeout = True
timeout = 30

[kerberos]
ccache = /var/kerberos-ccache/cache
keytab = /etc/airflow.keytab
principal = airflow@FOO.COM
reinit_frequency = 3600

[kubernetes]
airflow_configmap = airflow-airflow-config
airflow_local_settings_configmap = airflow-airflow-config
dags_in_image = False
delete_worker_pods = False
multi_namespace_mode = False
namespace = airflow
pod_template_file = /opt/airflow/pod_templates/pod_template_file.yaml
worker_container_repository = apache/airflow
worker_container_tag = 2.0.0

[logging]
colored_console_log = True
logging_level = INFO
remote_logging = False

[metrics]
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[scheduler]
run_duration = 41460
scheduler_heartbeat_sec = 5
statsd_host = airflow-statsd
statsd_on = True
statsd_port = 9125
statsd_prefix = airflow

[webserver]
enable_proxy_fix = True
expose_config = True

This is the pod yaml file for the new tasks

apiVersion: v1
kind: Pod
metadata:
  annotations:
    dag_id: bash2
    execution_date: "2021-01-14T20:16:00+00:00"
    kubernetes.io/psp: eks.privileged
    task_id: create_dir
    try_number: "2"
  labels:
    airflow-worker: "38"
    airflow_version: 2.0.0
    dag_id: bash2
    execution_date: 2021-01-14T20_16_00_plus_00_00
    kubernetes_executor: "True"
    task_id: create_dir
    try_number: "2"
  name: sss3
  namespace: airflow
spec:
  containers:
    - args:
        - airflow
        - tasks
        - run
        - bash2
        - create_dir
        - "2021-01-14T20:16:00+00:00"
        - --local
        - --pool
        - default_pool
        - --subdir
        - /opt/airflow/dags/repo/bash.py
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              key: fernet-key
              name: airflow-fernet-key
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              key: connection
              name: airflow-airflow-metadata
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              key: connection
              name: airflow-airflow-metadata
        - name: AIRFLOW_IS_K8S_EXECUTOR_POD
          value: "True"
      image: apache/airflow:2.0.0
      imagePullPolicy: IfNotPresent
      name: base
      resources: { }
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
      volumeMounts:
        - mountPath: /opt/airflow/logs
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: config
          readOnly: true
          subPath: airflow.cfg
        - mountPath: /etc/git-secret/ssh
          name: git-sync-ssh-key
          subPath: ssh
        - mountPath: /opt/airflow/dags
          name: dags
          readOnly: true
        - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
          name: airflow-worker-token-7sdtr
          readOnly: true
  dnsPolicy: ClusterFirst
  enableServiceLinks: true
  initContainers:
    - env:
        - name: GIT_SSH_KEY_FILE
          value: /etc/git-secret/ssh
        - name: GIT_SYNC_SSH
          value: "true"
        - name: GIT_KNOWN_HOSTS
          value: "false"
        - name: GIT_SYNC_REV
          value: HEAD
        - name: GIT_SYNC_BRANCH
          value: main
        - name: GIT_SYNC_REPO
          value: git@github.com:Tikna-inc/airflow.git
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: /git
        - name: GIT_SYNC_DEST
          value: repo
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_WAIT
          value: "60"
        - name: GIT_SYNC_MAX_SYNC_FAILURES
          value: "0"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      image: k8s.gcr.io/git-sync:v3.1.6
      imagePullPolicy: IfNotPresent
      name: git-sync
      resources: { }
      securityContext:
        runAsUser: 65533
      terminationMessagePath: /dev/termination-log
      terminationMessagePolicy: File
      volumeMounts:
        - mountPath: /git
          name: dags
        - mountPath: /etc/git-secret/ssh
          name: git-sync-ssh-key
          readOnly: true
          subPath: gitSshKey
        - mountPath: /var/run/secrets/kubernetes.io/serviceaccount
          name: airflow-worker-token-7sdtr
          readOnly: true
  nodeName: ip-172-31-41-37.eu-south-1.compute.internal
  priority: 0
  restartPolicy: Never
  schedulerName: default-scheduler
  securityContext:
    runAsUser: 50000
  serviceAccount: airflow-worker
  serviceAccountName: airflow-worker
  terminationGracePeriodSeconds: 30
  tolerations:
    - effect: NoExecute
      key: node.kubernetes.io/not-ready
      operator: Exists
      tolerationSeconds: 300
    - effect: NoExecute
      key: node.kubernetes.io/unreachable
      operator: Exists
      tolerationSeconds: 300
  volumes:
    - emptyDir: { }
      name: dags
    - name: git-sync-ssh-key
      secret:
        defaultMode: 288
        secretName: airflow-ssh-secret
    - emptyDir: { }
      name: airflow-logs
    - configMap:
        defaultMode: 420
        name: airflow-airflow-config
      name: config
    - name: airflow-worker-token-7sdtr
      secret:
        defaultMode: 420
        secretName: airflow-worker-token-7sdtr

-----------------------Important----------------------------

Debugging

for debugging purpose I have changed the pod args rather than running the task, I ran it with

spec:
  containers:
    - args:
        - airflow
        - webserver

and tried to look for the Dags , and found None. It seems like gitSync is not working with the pods triggered by kubernetesExecutor.

Any help please ???

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 2
  • Comments: 22 (5 by maintainers)

Commits related to this issue

Most upvoted comments

It turns out that as of 2.0.0, users are required to provide a pod_template_file https://airflow.apache.org/docs/apache-airflow/stable/executor/kubernetes.html. This template replaces the functionality lost by the removal of the config values I mentioned above. I haven’t tested that this solves my issue yet, but I suppose it should, as I can manually add the proper volume mounts to the template

I had to create a pod template file : dev-airflow-worker.yaml

apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
  - env:
    - name: AIRFLOW__CORE__DAGS_FOLDER
      value: /opt/bitnami/airflow/dags/git-airflow-dags/repo/
    - name: AIRFLOW__CORE__EXECUTOR
      value: LocalExecutor
    - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
      value: xxxxxxxxxxxxxx
    - name: AIRFLOW__CORE__FERNET_KEY
      value: xxxxxxxxxxxxxxx
    image: bitnami/airflow-worker:2.0.0-debian-10-r5
    name: base
    volumeMounts:
    - mountPath: /opt/bitnami/airflow/dags/git-airflow-dags
      name: airflow-dags
      readOnly: true
      subPath: ./repo/
  initContainers:
  - env:
    - name: GIT_SYNC_REPO
      value: https://gitlab..........................
    - name: GIT_SYNC_BRANCH
      value: dev
    - name: GIT_SYNC_ROOT
      value: /dags-airflow-dags
    - name: GIT_SYNC_DEST
      value: repo
    - name: GIT_SYNC_DEPTH
      value: "1"
    - name: GIT_SYNC_ONE_TIME
      value: "true"
    - name: GIT_SYNC_REV
    - name: GIT_SYNC_USERNAME
      value: aleopold
    - name: GIT_SYNC_PASSWORD
      value: xxxxxxxxxxxxxxxxxxxxxxxxxxx
    - name: GIT_KNOWN_HOSTS
      value: "false"
    image: k8s.gcr.io/git-sync:v3.1.1
    imagePullPolicy: IfNotPresent
    name: git-sync-clone
    securityContext:
      runAsUser: 65533
    volumeMounts:
    - mountPath: /dags-airflow-dags
      name: airflow-dags
  securityContext:
    fsGroup: 50000
    runAsUser: 50000
  serviceAccount: airflow-deployement
  serviceAccountName: airflow-deployement
  volumes:
  - name: airflow-dags

And in the helm-chart’s values.yaml https://artifacthub.io/packages/helm/bitnami/airflow

airflow:
  extraEnvVars:
    - name: AIRFLOW_EXECUTOR
      value: "KubernetesExecutor"
    - name: AIRFLOW__KUBERNETES__POD_TEMPLATE_FILE
      value: "/opt/bitnami/airflow/dags/git-airflow-dags/dev-airflow-worker.yaml"
    - name: AIRFLOW__KUBERNETES__FS_GROUP
      value: "50000"

hope it helps

I’m having this issue too. After upgrading to 2.0.0, while running with KubernetesExecutor in an Azure k8s cluster, the worker pods have been failing with the same error message

Traceback (most recent call last):
  File "/home/airflow/.local/bin/airflow", line 8, in <module>
    sys.exit(main())
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/__main__.py", line 40, in main
    args.func(args)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/cli_parser.py", line 48, in command
    return func(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 89, in wrapper
    return f(*args, **kwargs)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/cli/commands/task_command.py", line 216, in task_run
    dag = get_dag(args.subdir, args.dag_id)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/cli.py", line 189, in get_dag
    'parse.'.format(dag_id)
airflow.exceptions.AirflowException: dag_id could not be found: test. Either the dag did not exist or it failed to parse.

I access my DAGs via an Azure file store mounted as a volume using a pvc, and have found that the worker pods are not being created with the same mounted volume. It seems that the previously available airflow.cfg values

[kubernetes]
dags_volume_claim = 
dags_volume_subpath = 

no longer exist in version 2.0.0, and the worker pods therefore cannot be created with the the proper volume mounts, leading to the above error.

Any way around this?