airflow: DagProcessor Performance Regression

Apache Airflow version

2.5.3

What happened

Upgrading from 2.4.3 to 2.5.3 caused a significant increase in dag processing time on standalone dag processor (~1-2s to 60s):

/opt/airflow/dags/ecco_airflow/dags/image_processing/product_image_load.py                                                0          -1  56.68s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/known_consumers/known_consumers.py                                                    0          -1  56.64s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/monitoring/row_counts.py                                                              0          -1  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/base.py                                                                   0          -1  56.66s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_data.py                                                                0          -1  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_stream.py                                                              0          -1  56.52s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/reporting/reporting_data_foundation.py                                                0          -1  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/retail_analysis/retail_analysis_dbt.py                                                0          -1  56.66s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/rfm_segments/rfm_segments.py                                                          0          -1  56.02s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/utils/airflow.py                                                                           0          -1  56.65s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/aad_users_listing.py                                                           1           0  55.51s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/funnel_io.py                                                                   1           0  56.13s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/iar_param.py                                                                   1           0  56.50s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/sfmc_copy.py                                                                   1           0  56.59s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/bronze/us_legacy_datawarehouse.py                                                     1           0  55.15s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_auditing.py                                                              1           0  56.54s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_budget_daily_phasing.py                                                  1           0  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_gold_rm_tests.py                                                         1           0  55.00s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/consumer_entity_matching/graph_entity_matching.py                                     1           0  56.67s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/data_backup/data_backup.py                                                            1           0  56.69s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/hive/adhoc_entity_publish.py                                                          1           0  55.33s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/image_regression/train.py                                                             1           0  56.63s          2023-04-26T12:56:15
/opt/airflow/dags/ecco_airflow/dags/maintenance/db_maintenance.py                                                         1           0  56.58s          2023-04-26T12:56:15

Also seeing messages like these

[2023-04-26T12:56:15.322+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/us_legacy_datawarehouse.py finished
[2023-04-26T12:56:15.323+0000] {processor.py:296} DEBUG - Waiting for <ForkProcess name='DagFileProcessor68-Process' pid=116 parent=7 stopped exitcode=0>
[2023-04-26T12:56:15.323+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/cdp/ecco_cdp_gold_rm_tests.py finished
[2023-04-26T12:56:15.323+0000] {processor.py:296} DEBUG - Waiting for <ForkProcess name='DagFileProcessor69-Process' pid=122 parent=7 stopped exitcode=0>
[2023-04-26T12:56:15.324+0000] {manager.py:979} DEBUG - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/streaming/sap_inventory_feed.py finished
[2023-04-26T12:56:15.324+0000] {processor.py:314} DEBUG - Waiting for <ForkProcess name='DagFileProcessor70-Process' pid=128 parent=7 stopped exitcode=-SIGKILL>
[2023-04-26T12:56:15.324+0000] {manager.py:986} ERROR - Processor for /opt/airflow/dags/ecco_airflow/dags/bronze/streaming/sap_inventory_feed.py exited with return code -9.

In 2.4.3:

/opt/airflow/dags/ecco_airflow/dags/image_regression/train.py                                                             1           0  1.34s           2023-04-26T14:19:08
/opt/airflow/dags/ecco_airflow/dags/known_consumers/known_consumers.py                                                    1           0  1.12s           2023-04-26T14:19:00
/opt/airflow/dags/ecco_airflow/dags/maintenance/db_maintenance.py                                                         1           0  0.63s           2023-04-26T14:18:27
/opt/airflow/dags/ecco_airflow/dags/monitoring/row_counts.py                                                              1           0  3.74s           2023-04-26T14:18:45
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_data.py                                                                1           0  1.21s           2023-04-26T14:18:47
/opt/airflow/dags/ecco_airflow/dags/omnichannel/oc_stream.py                                                              1           0  1.22s           2023-04-26T14:18:30
/opt/airflow/dags/ecco_airflow/dags/reporting/reporting_data_foundation.py                                                1           0  1.39s           2023-04-26T14:19:08
/opt/airflow/dags/ecco_airflow/dags/retail_analysis/retail_analysis_dbt.py                                                1           0  1.32s           2023-04-26T14:18:51
/opt/airflow/dags/ecco_airflow/dags/rfm_segments/rfm_segments.py                                                          1           0  1.20s           2023-04-26T14:18:34

What you think should happen instead

Dag processing time remains unchanged

How to reproduce

Provision Airflow with the following settings:

Airflow 2.5.3

  • K8s 1.25.6
  • Kubernetes executor
  • Postgres backend (Postgres 11.0)
  • Deploy using Airflow Helm v1.9.0 with image 2.5.3-python3.9
    • pgbouncer enabled
    • standalone dag processort with 3500m cpu / 4000Mi memory, single replica
    • dags and logs mounted from RWM volume (Azure files)

Airflow 2.4.3

  • K8s 1.25.6
  • Kubernetes executor
  • Postgres backend (Postgres 11.0)
  • Deploy using Airflow Helm v1.7.0 with image 2.4.3-python3.9
    • pgbouncer enabled
    • standalone dag processort with 2500m cpu / 2000Mi memory, single replica
    • dags and logs mounted from RWM volume (Azure files)

Image modifications

We use image built from apache/airflow:2.4.3-python3.9, with some dependencies added/reinstalled with different versions.

Poetry dependency spec:

For 2.5.3:

[tool.poetry.dependencies]
python = ">=3.9,<3.11"
authlib = "~1.0.1"
adapta = { version = "==2.2.3", extras = ["azure", "storage"] }
numpy = "==1.23.3"
db-dtypes = "~1.0.4"
gevent = "^21.12.0"
sqlalchemy = ">=1.4,<2.0"
snowflake-sqlalchemy = ">=1.4,<2.0"
esd-services-api-client = "~0.6.0"
apache-airflow-providers-common-sql = "~1.3.1"
apache-airflow-providers-databricks = "~3.1.0"
apache-airflow-providers-google = "==8.4.0"
apache-airflow-providers-microsoft-azure = "~5.2.1"
apache-airflow-providers-datadog = "~3.0.0"
apache-airflow-providers-snowflake = "~3.3.0"
apache-airflow = "==2.5.3"
dataclasses-json = ">=0.5.7,<0.6"

For 2.4.3:

[tool.poetry.dependencies]
python = ">=3.9,<3.11"
authlib = "~1.0.1"
adapta = { version = "==2.2.3", extras = ["azure", "storage"] }
numpy = "==1.23.3"
db-dtypes = "~1.0.4"
gevent = "^21.12.0"
sqlalchemy = ">=1.4,<2.0"
snowflake-sqlalchemy = ">=1.4,<2.0"
esd-services-api-client = "~0.6.0"
apache-airflow-providers-common-sql = "~1.3.1"
apache-airflow-providers-databricks = "~3.1.0"
apache-airflow-providers-google = "==8.4.0"
apache-airflow-providers-microsoft-azure = "~5.2.1"
apache-airflow-providers-datadog = "~3.0.0"
apache-airflow-providers-snowflake = "~3.3.0"
apache-airflow = "==2.4.3"
dataclasses-json = ">=0.5.7,<0.6"

Operating System

Container OS: Debian GNU/Linux 11 (bullseye)

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==6.0.0 apache-airflow-providers-celery==3.0.0 apache-airflow-providers-cncf-kubernetes==4.4.0 apache-airflow-providers-common-sql==1.3.4 apache-airflow-providers-databricks==3.1.0 apache-airflow-providers-datadog==3.0.0 apache-airflow-providers-docker==3.2.0 apache-airflow-providers-elasticsearch==4.2.1 apache-airflow-providers-ftp==3.3.1 apache-airflow-providers-google==8.4.0 apache-airflow-providers-grpc==3.0.0 apache-airflow-providers-hashicorp==3.1.0 apache-airflow-providers-http==4.3.0 apache-airflow-providers-imap==3.1.1 apache-airflow-providers-microsoft-azure==5.2.1 apache-airflow-providers-mysql==3.2.1 apache-airflow-providers-odbc==3.1.2 apache-airflow-providers-postgres==5.2.2 apache-airflow-providers-redis==3.0.0 apache-airflow-providers-sendgrid==3.0.0 apache-airflow-providers-sftp==4.1.0 apache-airflow-providers-slack==6.0.0 apache-airflow-providers-snowflake==3.3.0 apache-airflow-providers-sqlite==3.3.2 apache-airflow-providers-ssh==3.2.0

Deployment

Official Apache Airflow Helm Chart

Deployment details

See How-to-reproduce section

Anything else

Occurs by upgrading the helm chart from 1.7.0/2.4.3 to 1.9.0/2.5.3 installation.

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Comments: 17 (10 by maintainers)

Commits related to this issue

Most upvoted comments

Combining answers here - I’ll try with 2.6.0rc or maybe released version if it gets released before I get an hour to do a test.

@george-zubrienko - I have some extra questions that might help to find out the root cause.

  1. Does it happen all the time for all dags parsed or some of the time for some of the dags? Can you please descibe the pattern you see?

I can pinpoint a few things:

  • All dags parse much slower than usual
  • Increasing DAG_FILE_PROCESSOR_TIMEOUT doesn’t affect anything
  • Increasing number of parsing processes helps to get number of parsed/active dags up, but overall process goes very slow doing maybe one dag every 30s or so with 128 parsing processes. Initially we see around 20-30/65 dags going through
  • There doesn’t see to be any pattern related to the dag structure. Dags with a single task or 100 tasks can fail to process.
  1. Could you please tell more (and possibly share some anonymised examples of) top-level code of the dags of yours that experience this behaviour and whether you use some of those things:
  • excessive or unusual imports?

We always try import only what we need in DAG files, but sometimes there can be imports for typehinting purposes only.

  • reaching out to external sources (HTTP/Similar) while parsing top-level code?

No, never.

  • accessing any kind of database while parsing top-level code?

Infamous Variabels.get(..) is one thing that is against Airflow best-practices that we use in all dags. However, our Airflow is set to only run a parse for every 30s instead of 5s, and we only read a single variable (top-level json configuration for the pipeline)

In some dags we also read 1-2 connections stored in Airflow db. That’s it.

  • acxessing secrets/variables/connections while parsing top-level code?

See notes above.

  1. Do you see any excessive memory use and swapping while parsing happens for the DAG file processor?

Memory usage on dag file processor pod starts at around 180mb and grows to around 1.6gb on initial parse and then stabilizes around 700mb and cpu usage drops to ~2-3% from initial ~50%

  1. Do you use callbacks in your DAGs that experience the problem?

No, we also have SLA and email/error callbacks fully disabled.

Please find below a full example of DAG that consists of 1 task only:

"""Manages stream lifecycle for OC logs"""
import json
from datetime import timedelta

import pendulum
from airflow import DAG
from airflow.models import Variable

from airflow.operators.python import PythonOperator
from airflow.providers.datadog.hooks.datadog import DatadogHook

from esd_services_api_client.beast import ArgumentValue, JobSocket

from ecco_airflow.dags.omnichannel.base import activate_log_stream
from ecco_airflow.utils.k8s import executor_config

default_args = {
    "owner": "data-engineering",
    "depends_on_past": False,
    "email": ["esdsupport@ecco.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="ocLogStream",
    default_args=default_args,
    description="Activates streaming of OC logs.",
    schedule_interval="0 * * * *",
    max_active_runs=1,
    start_date=pendulum.today().add(days=-2),
    tags=["streaming", "omni_channel", "reporting"],
) as dag:
    # pylint: disable=W0104

    dd_hook = DatadogHook(datadog_conn_id="esd-datadog")
    runtime_conf = Variable.get("omni_channel_logs", deserialize_json=True)

    PythonOperator(
        task_id="stream-oc-logs",
        python_callable=activate_log_stream,
        pool="omnichannel",
        op_kwargs={
            "checkpoint_location": f"'{runtime_conf['checkpoint_location']}'",
            "compact_after": f"'{runtime_conf['compact_after']}'",
            "datadog_config": ArgumentValue(
                value=json.dumps(
                    {
                        "api_key": dd_hook.api_key,
                        "app_key": dd_hook.app_key,
                        "site": "datadoghq.eu",
                    }
                ),
                encrypt=True,
                quote=True,
            ),
            "custom_config": ArgumentValue(
                value=runtime_conf["access_credentials"],
                encrypt=True,
                quote=True,
                is_env=True,
            ),
            "project_name": runtime_conf["project_name"],
            "project_version": runtime_conf["project_version"],
            "project_runnable": runtime_conf["project_runnable"],
            "stream_group": "oc_logs",
            "inputs": [JobSocket.from_dict(src) for src in runtime_conf["sources"]],
            "outputs": [JobSocket.from_dict(trg) for trg in runtime_conf["targets"]],
        },
        executor_config=executor_config(
            nodepool_names=["general"],
            secret_env_vars={
                "RUNTIME_ENCRYPTION_KEY": {
                    "secret_key": "RUNTIME_ENCRYPTION_KEY",
                    "secret_name": "hashicorp-vault-spark-encryption-key",
                },
                "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS": {
                    "secret_key": "OC_SPARK_CRYSTAL_ACCOUNT_ACCESS",
                    "secret_name": "hashicorp-vault-oc-logs",
                },
            },
            cpu_memory_limit={"cpu": "100m", "memory": "500Mi"},
        ),
        retries=3,
        retry_delay=timedelta(seconds=300),
    )

I’d be happy to try out 2.6.0 when it releases then, as I’m not so sure about changing that setting, at least at the moment - until I understand better how it affects task execution.

Cool. We will get RC3 soon

Then we found out that increasing that setting significantly reduces number of database sessions and our problems with dying tasks were resolved. I should probably have opened an issue on that as well, but we were so happy our models can be trained again, we sort of let it slip somewhere in the backlog.

I think this is actually a very good point and comment. That actually makes me think that we should have DIFFERENT heartbeat expectations for local task (generally for each job type we have - as mentioned above historically they are the same)

I created an issue for that and I will implement it during some of the tasks I am working on soon as part of our AIP-44 work. Issue here https://github.com/apache/airflow/issues/30908

One more (really kind) request - not now but in case we do not find anything till you have the chance to experiment, but it almost looks like there is something that prevents the Variable.get to obtain the connection to retrieve anything from the database (and looks like this single Variable.get() is blocking everythig. That gives me some ideas to look (and why maybe it has not been experienced by others who have no Variables at the top-level code, but if we do not get there, just removing the Variables.get() and hardcoding the output - might be a quick way to test that hypothesis.

This is the Dockerfile content for our current image - also the one we tried to upgrade from:

FROM apache/airflow:2.4.3-python3.9

COPY requirements.txt /tmp/requirements.txt

RUN pip3 install --user --no-cache -r /tmp/requirements.txt

USER 0

RUN rm /tmp/requirements.txt

USER airflow

I don’t have SHA unfortunately, and we do not rebuild images on a regular basis, so it might not be exactly the one available on Dockerhub right now.

Update to 2.5.3 is simple:

FROM apache/airflow:2.5.3-python3.9
...

And poetry dependency spec I have provided in the issue description.

If you want to build the exact same image, you can use this GH workflow:

name: Build Airflow Image

jobs:
  build_image:
    name: Build Container Image
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v2
        with:
          fetch-depth: 0
      - uses: actions/setup-python@v2
        with:
          python-version: '3.9.x'
          architecture: 'x64'
      - name: Install Poetry and dependencies
        uses: SneaksAndData/github-actions/install_poetry@v0.0.21
        with:
          export_requirements: true
          requirements_path: .container/requirements.txt
          skip_dependencies: true
      - name: Build and Push Image (DEV)
        working-directory: .container
        env:
          AZCR_USER: ${{secrets.MY_ACR_USER}}
          AZCR_TOKEN: ${{secrets.MY_ACR_TOKEN}}
          AZCR_REPO: ${{secrets.MY_ACR_REPO}}
        run: |
          set -e
          echo "$AZCR_TOKEN" | docker login $AZCR_REPO.azurecr.io --username $AZCR_USER --password-stdin 
          version=$(git describe --tags --abbrev=7)
          airflow_version=$(cat Dockerfile | grep FROM | cut -d':' -f2)
          
          docker build . --tag=$AZCR_REPO.<my-cloud-registry.io or similiar>/my-airflow:$airflow_version-esd-$version && docker push $AZCR_REPO.<my-cloud-registry.io or similiar>/my-airflow:$airflow_version-esd-$version

I can provide the lock file if you want to go this way.

Oh whoa! That’s what I call “deep” data (not a big but “deep”). 😃. Thanks for that - it’s super helpful. Quick question though - what was the version you had before 2.5.3 / rolled back to ?

And just to explain why - I will (in the meantime) try to reproduce it locally and see if we can observe similar behaviours on our installation, but just knowing that this is the cause could allow us to possibly remediate it with removal of that commit in 2.6.0 rc while we can even if we do not know exactly what is the mechanism behind the slow-down.

Adding more info:

  • Airflow configs that are different from defaults
resource "kubernetes_config_map" "airflow_additional_config" {
  metadata {
    name      = "${local.application_name}-additional-cfg"
    namespace = kubernetes_namespace.airflow_ns.metadata[0].name
  }
  data = {
    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS                         = "True"
    AIRFLOW__KUBERNETES__DELETE_WORKER_PODS_ON_FAILURE              = terraform.workspace != "production" ? "False" : "True"
    AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE            = terraform.workspace != "production" ? 8 : 16
    AIRFLOW__KUBERNETES__WORKER_PODS_PENDING_TIMEOUT_CHECK_INTERVAL = 300
    AIRFLOW__KUBERNETES__DELETE_OPTION_KWARGS                       = "{\"grace_period_seconds\": 10}"
    AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE                       = "True"
    AIRFLOW__KUBERNETES__TCP_KEEP_IDLE                              = 30
    AIRFLOW__KUBERNETES__TCP_KEEP_INTVL                             = 30
    AIRFLOW__KUBERNETES__TCP_KEEP_CNT                               = 30
    AIRFLOW__DATABASE__SQL_ALCHEMY_POOL_SIZE                        = 32
    AIRFLOW__DATABASE__SQL_ALCHEMY_MAX_OVERFLOW                     = 8
    AIRFLOW__DATABASE__MAX_DB_RETRIES                               = 16
    AIRFLOW__CORE__PARALLELISM                                      = 64
    AIRFLOW__CORE__MAX_ACTIVE_TASKS_PER_DAG                         = 512
    AIRFLOW__CORE__MAX_ACTIVE_RUNS_PER_DAG                          = 7
    AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS                         = terraform.workspace != "production" ? "True" : "False"
    AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT                            = 60
    AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT                       = 90
    AIRFLOW__CORE__MAX_NUM_RENDERED_TI_FIELDS_PER_TASK              = 120
    AIRFLOW__CORE__CHECK_SLAS                                       = "False"
    AIRFLOW__SCHEDULER__PARSING_PROCESSES                           = 2
    AIRFLOW__SCHEDULER__CATCHUP_BY_DEFAULT                          = "False"
    AIRFLOW__SCHEDULER__STANDALONE_DAG_PROCESSOR                    = "True"
    AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTION               = "False"
    AIRFLOW__SCHEDULER__JOB_HEARTBEAT_SEC                           = 60
    AIRFLOW__LOGGING__REMOTE_LOGGING                                = "True"
    AIRFLOW__LOGGING__REMOTE_LOG_CONN_ID                            = "wasb_logstorage"
    AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER                        = "wasb-${azurerm_storage_container.airflow_logs.name}"
    AIRFLOW__API__AUTH_BACKENDS                                     = "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
    AIRFLOW__METRICS__STATSD_DATADOG_ENABLED                        = "True"
    AIRFLOW__METRICS__STATSD_PREFIX                                 = "airflow"
    AIRFLOW__METRICS__STATSD_DATADOG_TAGS                           = "environment:${terraform.workspace}"
    AIRFLOW__METRICS__STATSD_ON                                     = "True"
    AIRFLOW__METRICS__STATSD_PORT                                   = "8125"
    AIRFLOW__METRICS__STATSD_HOST                                   = "datadog-statsd.${kubernetes_namespace.airflow_ns.metadata[0].name}.svc.cluster.local"
    AIRFLOW__WEBSERVER__AUTO_REFRESH_INTERVAL                       = 10
  }
}

let me know if you want to know helm values we pass when deploying

@george-zubrienko - I have some extra questions that might help to find out the root cause.

  1. Does it happen all the time for all dags parsed or some of the time for some of the dags? Can you please descibe the pattern you see?

  2. Could you please tell more (and possibly share some anonymised examples of) top-level code of the dags of yours that experience this behaviour and whether you use some of those things:

  • excessive or unusual imports?
  • reaching out to external sources (HTTP/Similar) while parsing top-level code?
  • accessing any kind of database while parsing top-level code?
  • acxessing secrets/variables/connections while parsing top-level code?

Generally speaking I would like to know your top-level parsing violates any of the best practices described in https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#top-level-python-code ?

  1. Do you see any excessive memory use and swapping while parsing happens for the DAG file processor?

  2. Do you use callbacks in your DAGs that experience the problem?

Having those answers might help in investigating the root cuase - the problem you describe is not reproducible easily - so we need to know what’s so special about your DAGS or environment that triggers that behaviour. With the related #305903 it seemsed that the reason might be MySQL but knowing that you have Postgres suggests that either the problem is different or that it is something completely different than we suspected before.