airflow: Duplicate key value violates unique constraint "task_instance_pkey" error

I’m using Airflow 1.10.9 with celery executor and postgres and I need to have a nested loop with the first level having 2 options and the second level going into 1800 options to loop through.

I found if I go above 600 options on the second loop I get a duplicate key value error. It seems either two sessions of postgres connects or something else is triggering the duplicate entry of my first task.

Here is an example DAG that exhibits the issue when you adjust the range from 10 to 1800 for j. Below should reproduce the issue.

The setup is in docker and is using this docker compose file https://github.com/puckel/docker-airflow/blob/master/docker-compose-CeleryExecutor.yml

from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, date, timedelta
from airflow.models import DAG
import random
import csv
from airflow.utils.dates import days_ago
from airflow.models import Variable


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(2),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}



def return_branch(**kwargs):

    branches = ['branch_0,''branch_1', 'branch_2', 'branch_3', 'branch_4']

    return random.choice(branches)

with DAG(
    dag_id='branch_demo',
    default_args=default_args,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 11 * * *'
) as dag:
    kick_off_dag = BashOperator(task_id='run_this_first',bash_command='echo "first"')

    branching = BranchPythonOperator(
        task_id='branching',
        python_callable=return_branch,
        provide_context=True)

    kick_off_dag >> branching

    for i in range(2):
        d = BashOperator(task_id='branch_{0}'.format(i),bash_command='echo "job"')
        for j in range(1800):
            m = BashOperator(task_id='branch_{0}_{1}'.format(i, j),bash_command='echo "done"')

            d >> m

        branching >> d

Postgres log error

ERROR:  duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(run_this_first, branch_demo, 2020-04-07 18:16:14.566623+00) already exists.
STATEMENT:  INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES ('run_this_first', 'branch_demo', '2020-04-07T18:16:14.566623+00:00'::timestamptz, NULL, NULL, NULL, NULL, 0, 1, '', 'root', NULL, 'default_pool', 'default', 3604, NULL, NULL, NULL, '\x80047d942e'::bytea)

Traceback from scheduler logs:

Process DagFileProcessor102313-Process:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
    cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
    self.run()
  File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 157, in _run_file_processor
    pickle_dags)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
    self._process_dags(dagbag, dags, ti_keys_to_schedule)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1293, in _process_dags
    self._process_task_instances(dag, tis_out)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 778, in _process_task_instances
    run.verify_integrity(session=session)
  File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 400, in verify_integrity
    session.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
    self.transaction.commit()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 503, in commit
    self._prepare_impl()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
    self.session.flush()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2479, in flush
    self._flush(objects)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2617, in _flush
    transaction.rollback(_capture_exception=True)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
    compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
    raise value
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2577, in _flush
    flush_context.execute()
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
    rec.execute(self)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
    uow,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
    insert,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1084, in _emit_insert_statements
    c = cached_connections[connection].execute(statement, multiparams)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
    return meth(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
    return connection._execute_clauseelement(self, multiparams, params)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
    distilled_params,
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
    e, statement, parameters, cursor, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
    util.raise_from_cause(sqlalchemy_exception, exc_info)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
    reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
    raise value.with_traceback(tb)
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
    cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
[SQL: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(duration)s, %(state)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(job_id)s, %(pool)s, %(queue)s, %(priority_weight)s, %(operator)s, %(queued_dttm)s, %(pid)s, %(executor_config)s)]
[parameters: ({'task_id': 'create_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7119, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458570>}, {'task_id': 'before_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7118, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534582d0>}, {'task_id': 'after_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458540>}, {'task_id': 'remove_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458330>}, {'task_id': 'train_model_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458180>}, {'task_id': 'watch_training_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458360>}, {'task_id': 'train_model_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458630>}, {'task_id': 'watch_training_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534586f0>}  ... displaying 10 of 7120 total bound parameter sets ...  {'task_id': 'train_model_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd652758660>}, {'task_id': 'watch_training_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6527586f0>})]
(Background on this error at: http://sqlalche.me/e/gkpj)
Process DagFileProcessor104982-Process:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
    cursor, statement, parameters, context
  File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
    cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL:  Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-4, 2020-04-07 14:41:02.111727+00) already exists.

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 2
  • Comments: 23 (8 by maintainers)

Most upvoted comments

we noticed this issue as well in airflow 2.3.3

psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
80
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(ingestion-scheduler, check_for_success_flag, manual__2022-10-03T15:56:03.241451+00:00, 0) already exists.

Same issue here, is there a solution?

As soon as someone provides more information coming from the latest version of Airlfow ideally - including logs and circumstances - versions etc. (or ideally opens a new issue with all those details) I doubt there wil be solution @JasperSui @LucaSoato @tnyz we can try to diagnose and fix it.

I suggest to update to latest released version of Airflow and if the issue is there - report it (again ideally as a new issue with stacktraces and all details and linked to that isssue as likely similar). Adding “I also have the same problem” is good information but without extra details and stacktraces from the latest version it does not bring us any closer to finding a solution.

Please try to help the community by providing those details - this is the only

We implement hundreds of fixes in each minor release. And it’s possible the problem has been fixed there. We ONLY release fixes in the latest active branch - so you will have to upgrade to latest 2.* version anyway to get a fix (if there will be one) - upgrading now and reporting if the error is still there is the most efficient way how to speed up solving the issue.

@JasperSui @LucaSoato @tnyz - can we count on your help to the community with this one?

we noticed this issue as well in airflow 2.3.3

psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
80
DETAIL:  Key (dag_id, task_id, run_id, map_index)=(ingestion-scheduler, check_for_success_flag, manual__2022-10-03T15:56:03.241451+00:00, 0) already exists.

Same here.

As an update, latest 1.10.10 is still affected by this problem.

We met another similar problem when using RestApi POST /api/experimental/dags/<DAG_ID>/dag_runs to trigger a DAG Run.

However there is a unique key for (dag_id, execution_date) and execution_date is timestamp (i.e. ms) level. When trigger multi DAG Runs programatically, it’s likely to get two DAG Runs with the same execution_date and duplicate key error will occur.

I think execution_date is a decent but not a good design to identify the DAG Run’s uniqueness. 🤔

In the other hand, the mysql id is more reasonable in some certain scenes.

Same issue here, is there a solution?

As soon as someone provides more information coming from the latest version of Airlfow ideally - including logs and circumstances - versions etc. (or ideally opens a new issue with all those details) I doubt there wil be solution @JasperSui @LucaSoato @tnyz we can try to diagnose and fix it.

I suggest to update to latest released version of Airflow and if the issue is there - report it (again ideally as a new issue with stacktraces and all details and linked to that isssue as likely similar). Adding “I also have the same problem” is good information but without extra details and stacktraces from the latest version it does not bring us any closer to finding a solution.

Please try to help the community by providing those details - this is the only

We implement hundreds of fixes in each minor release. And it’s possible the problem has been fixed there. We ONLY release fixes in the latest active branch - so you will have to upgrade to latest 2.* version anyway to get a fix (if there will be one) - upgrading now and reporting if the error is still there is the most efficient way how to speed up solving the issue.

@JasperSui @LucaSoato @tnyz - can we count on your help to the community with this one?

Yeah, I upgraded to 2.4.1 and it doesn’t happen again, I believe upgrading to 2.4.* will fix this.

Same issue here, is there a solution?