airflow: Unable to start scheduler after stopped

Apache Airflow version: 2.0.0rc3

Kubernetes version (if you are using kubernetes) (use kubectl version):

Environment:

  • Cloud provider or hardware configuration: Linux
  • OS (e.g. from /etc/os-release): Ubuntu
  • Kernel (e.g. uname -a):
  • Install tools:
  • Others:

What happened:

After shutting down the scheduler, while tasks were in running state, trying to restart the scheduler results in pk violations…

Traceback (most recent call last):
  File "/home/jcoder/git/airflow_2.0/pyenv/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/home/jcoder/git/airflow_2.0/pyenv/lib/python3.7/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "dag_run_dag_id_run_id_key"
DETAIL:  Key (dag_id, run_id)=(example_task_group, scheduled__2020-12-14T04:31:00+00:00) already exists.

What you expected to happen:

Scheduler restarts and picks up where it left off.

How to reproduce it:

Set example dag ( I used task_group) to schedule_interval * * * * * and start the scheduler and let it run for a few minutes. Shut down the scheduler Attempt to restart the scheduler

Anything else we need to know: I came across this doing testing using the LocalExecutor in a virtual env. If no else is able to reproduce it, I’ll try again in a clean virtual env.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Comments: 41 (39 by maintainers)

Commits related to this issue

Most upvoted comments

of course, sorry about that. I

[2020-12-15 22:43:15,642] {scheduler_job.py:181} INFO - Started process (PID=46978) to work on /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:15,644] {scheduler_job.py:629} INFO - Processing file /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py for tasks to queue
[2020-12-15 22:43:15,645] {logging_mixin.py:103} INFO - [2020-12-15 22:43:15,644] {dagbag.py:440} INFO - Filling up the DagBag from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:15,656] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['example_task_group']) retrieved from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:15,658] {logging_mixin.py:103} INFO - [2020-12-15 22:43:15,658] {dag.py:1813} INFO - Sync 1 DAGs
[2020-12-15 22:43:15,690] {logging_mixin.py:103} INFO - [2020-12-15 22:43:15,690] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2020-12-14T04:21:00+00:00
[2020-12-15 22:43:15,708] {scheduler_job.py:189} INFO - Processing /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py took 0.070 seconds
[2020-12-15 22:43:16,449] {scheduler_job.py:181} INFO - Started process (PID=47049) to work on /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,450] {scheduler_job.py:629} INFO - Processing file /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py for tasks to queue
[2020-12-15 22:43:16,451] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,451] {dagbag.py:440} INFO - Filling up the DagBag from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,460] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['example_task_group']) retrieved from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,481] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,481] {dag.py:1813} INFO - Sync 1 DAGs
[2020-12-15 22:43:16,501] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,501] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2020-12-14T04:21:00+00:00
[2020-12-15 22:43:16,522] {scheduler_job.py:189} INFO - Processing /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py took 0.078 seconds
[2020-12-15 22:43:16,524] {scheduler_job.py:181} INFO - Started process (PID=47075) to work on /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,526] {scheduler_job.py:629} INFO - Processing file /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py for tasks to queue
[2020-12-15 22:43:16,527] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,527] {dagbag.py:440} INFO - Filling up the DagBag from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,538] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['example_task_group']) retrieved from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:16,580] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,580] {dag.py:1813} INFO - Sync 1 DAGs
[2020-12-15 22:43:16,606] {logging_mixin.py:103} INFO - [2020-12-15 22:43:16,606] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2020-12-14T04:21:00+00:00
[2020-12-15 22:43:16,628] {scheduler_job.py:189} INFO - Processing /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py took 0.108 seconds
[2020-12-15 22:43:18,346] {scheduler_job.py:181} INFO - Started process (PID=47308) to work on /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:18,348] {scheduler_job.py:629} INFO - Processing file /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py for tasks to queue
[2020-12-15 22:43:18,348] {logging_mixin.py:103} INFO - [2020-12-15 22:43:18,348] {dagbag.py:440} INFO - Filling up the DagBag from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:18,363] {scheduler_job.py:639} INFO - DAG(s) dict_keys(['example_task_group']) retrieved from /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py
[2020-12-15 22:43:18,365] {logging_mixin.py:103} INFO - [2020-12-15 22:43:18,365] {dag.py:1813} INFO - Sync 1 DAGs
[2020-12-15 22:43:18,402] {logging_mixin.py:103} INFO - [2020-12-15 22:43:18,401] {dag.py:2266} INFO - Setting next_dagrun for example_task_group to 2020-12-14T04:30:00+00:00
[2020-12-15 22:43:18,427] {scheduler_job.py:189} INFO - Processing /home/jcoder/git/airflow_2.0/macs-dags/macs-dags/dags/example_task_group.py took 0.085 seconds```

Happy New Year @ashb There is a little bit of both. There is an outstanding issue where use of Variables (or probably anything that interacts with the backed db in the global scope) in a plugin raises the UNEXPECTED COMMIT RunTime error. The problem I reported with being unable to restart the scheduler seems to have been caused by wrapping my Variable.get in an overly generous try/except, hiding the run time error and causing things in the db to get out of sync. Running the below should recreate the issue, I did this on python 3.7

mkdir airflow_scheduler_test
cd airflow_scheduler_test
export AIRFLOW_HOME=$(pwd)
virtualenv pyenv 
source pyenv/bin/activate
pip install apache-airflow==2.0.0

airflow db init
airflow variables set test TEST
mkdir plugins
cat << EOF > plugins/test.py
from airflow.models.variable import Variable

print(Variable.get('test'))
EOF

airflow dags unpause example_bash_operator
airflow scheduler

Marking this as can’t reproduce for now – it may be a race condition, but… 🤷🏻