airflow: Airflow Triggerer facing frequent restarts

Apache Airflow version

Other Airflow 2 version (please specify below)

What happened

We are using airflow version: 2.6.3

We have the metastore in aws.

At around 12:00 AM PST, we have around 150+ async sensor starting at same time. They act as our sensors to wait for upstream data. We have them waiting for around 6-12 hours daily. Now after the upgrade, after running for 4-5 days we see that triggerer get restarts automatically.

On investigation wee found that the query used by triggerer to get list of trigger is taking lot of time, causing the triggerer to kill python code and hence restart.

We are able to resolve it after doing analyze command on task_instance table.

Query used

select
    `trigger`.id as trigger_id,
    `trigger`.classpath as trigger_classpath,
    `trigger`.kwargs as trigger_kwargs,
    `trigger`.created_date as trigger_created_date,
    `trigger`.triggerer_id as trigger_triggerer_id,
    task_instance_1.try_number as task_instance_1_try_number,
    job_1.id as job_1_id,
    job_1.dag_id as job_1_dag_id,
    job_1.state as job_1_state,
    job_1.job_type as job_1_job_type,
    job_1.start_date as job_1_start_date,
    job_1.end_date as job_1_end_date,
    job_1.latest_heartbeat as job_1_latest_heartbeat,
    job_1.executor_class as job_1_executor_class,
    job_1.hostname as job_1_hostname,
    job_1.unixname as job_1_unixname,
    trigger_1.id as trigger_1_id,
    trigger_1.classpath as trigger_1_classpath,
    trigger_1.kwargs as trigger_1_kwargs,
    trigger_1.created_date as trigger_1_created_date,
    trigger_1.triggerer_id as trigger_1_triggerer_id,
    dag_run_1.state as dag_run_1_state,
    dag_run_1.id as dag_run_1_id,
    dag_run_1.dag_id as dag_run_1_dag_id,
    dag_run_1.queued_at as dag_run_1_queued_at,
    dag_run_1.execution_date as dag_run_1_execution_date,
    dag_run_1.start_date as dag_run_1_start_date,
    dag_run_1.end_date as dag_run_1_end_date,
    dag_run_1.run_id as dag_run_1_run_id,
    dag_run_1.creating_job_id as dag_run_1_creating_job_id,
    dag_run_1.external_trigger as dag_run_1_external_trigger,
    dag_run_1.run_type as dag_run_1_run_type,
    dag_run_1.conf as dag_run_1_conf,
    dag_run_1.data_interval_start as dag_run_1_data_interval_start,
    dag_run_1.data_interval_end as dag_run_1_data_interval_end,
    dag_run_1.last_scheduling_decision as dag_run_1_last_scheduling_decision,
    dag_run_1.dag_hash as dag_run_1_dag_hash,
    dag_run_1.log_template_id as dag_run_1_log_template_id,
    dag_run_1.updated_at as dag_run_1_updated_at,
    task_instance_1.task_id as task_instance_1_task_id,
    task_instance_1.dag_id as task_instance_1_dag_id,
    task_instance_1.run_id as task_instance_1_run_id,
    task_instance_1.map_index as task_instance_1_map_index,
    task_instance_1.start_date as task_instance_1_start_date,
    task_instance_1.end_date as task_instance_1_end_date,
    task_instance_1.duration as task_instance_1_duration,
    task_instance_1.state as task_instance_1_state,
    task_instance_1.max_tries as task_instance_1_max_tries,
    task_instance_1.hostname as task_instance_1_hostname,
    task_instance_1.unixname as task_instance_1_unixname,
    task_instance_1.job_id as task_instance_1_job_id,
    task_instance_1.pool as task_instance_1_pool,
    task_instance_1.pool_slots as task_instance_1_pool_slots,
    task_instance_1.queue as task_instance_1_queue,
    task_instance_1.priority_weight as task_instance_1_priority_weight,
    task_instance_1.operator as task_instance_1_operator,
    task_instance_1.queued_dttm as task_instance_1_queued_dttm,
    task_instance_1.queued_by_job_id as task_instance_1_queued_by_job_id,
    task_instance_1.pid as task_instance_1_pid,
    task_instance_1.executor_config as task_instance_1_executor_config,
    task_instance_1.updated_at as task_instance_1_updated_at,
    task_instance_1.external_executor_id as task_instance_1_external_executor_id,
    task_instance_1.trigger_id as task_instance_1_trigger_id,
    task_instance_1.trigger_timeout as task_instance_1_trigger_timeout,
    task_instance_1.next_method as task_instance_1_next_method,
    task_instance_1.next_kwargs as task_instance_1_next_kwargs
from
    `trigger`
    left outer join (task_instance as task_instance_1 inner join dag_run as dag_run_1
                     on dag_run_1.dag_id = task_instance_1.dag_id and dag_run_1.run_id = task_instance_1.run_id) on `trigger`.id = task_instance_1.trigger_id
    left outer join `trigger` as trigger_1 on trigger_1.id = task_instance_1.trigger_id
    left outer join job as job_1 on job_1.id = trigger_1.triggerer_id
where
        `trigger`.id in
        (); -- list of ids

It takes around 5-6 mins and when run analyze command it takes less than 1 second.

The number of sensor is same before and after upgrade.

What you think should happen instead

No response

How to reproduce

  • Upgrade to V2.6.3
  • Swapn 150 sensors and wait (It doesnt happen every time but only some days

Operating System

NAME=“Debian GNU/Linux” VERSION_ID=“11” VERSION=“11 (bullseye)”

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

About this issue

  • Original URL
  • State: open
  • Created 10 months ago
  • Reactions: 1
  • Comments: 33 (13 by maintainers)

Most upvoted comments

For what it’s worth, I managed to fix this issue by cleaning up some old records from the trigger table. The table only had 88 rows, of which none were old than ±2 months. After cleaning up 27 records older than 1 month, the triggerers started heartbeating reliably again and are no longer restarting.

From 2.8.0 onwards airflow db clean has the option to clean that table (see https://github.com/apache/airflow/pull/34908). I just did it manually in a postgres shell.