airflow: Duplicate key value violates unique constraint "task_instance_pkey" error
I’m using Airflow 1.10.9 with celery executor and postgres and I need to have a nested loop with the first level having 2 options and the second level going into 1800 options to loop through.
I found if I go above 600 options on the second loop I get a duplicate key value error. It seems either two sessions of postgres connects or something else is triggering the duplicate entry of my first task.
Here is an example DAG that exhibits the issue when you adjust the range from 10 to 1800 for j
. Below should reproduce the issue.
The setup is in docker and is using this docker compose file https://github.com/puckel/docker-airflow/blob/master/docker-compose-CeleryExecutor.yml
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, date, timedelta
from airflow.models import DAG
import random
import csv
from airflow.utils.dates import days_ago
from airflow.models import Variable
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(2),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def return_branch(**kwargs):
branches = ['branch_0,''branch_1', 'branch_2', 'branch_3', 'branch_4']
return random.choice(branches)
with DAG(
dag_id='branch_demo',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval='0 11 * * *'
) as dag:
kick_off_dag = BashOperator(task_id='run_this_first',bash_command='echo "first"')
branching = BranchPythonOperator(
task_id='branching',
python_callable=return_branch,
provide_context=True)
kick_off_dag >> branching
for i in range(2):
d = BashOperator(task_id='branch_{0}'.format(i),bash_command='echo "job"')
for j in range(1800):
m = BashOperator(task_id='branch_{0}_{1}'.format(i, j),bash_command='echo "done"')
d >> m
branching >> d
Postgres log error
ERROR: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(run_this_first, branch_demo, 2020-04-07 18:16:14.566623+00) already exists.
STATEMENT: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES ('run_this_first', 'branch_demo', '2020-04-07T18:16:14.566623+00:00'::timestamptz, NULL, NULL, NULL, NULL, 0, 1, '', 'root', NULL, 'default_pool', 'default', 3604, NULL, NULL, NULL, '\x80047d942e'::bytea)
Traceback from scheduler logs:
Process DagFileProcessor102313-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 297, in _bootstrap
self.run()
File "/usr/local/lib/python3.7/multiprocessing/process.py", line 99, in run
self._target(*self._args, **self._kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 157, in _run_file_processor
pickle_dags)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1609, in process_file
self._process_dags(dagbag, dags, ti_keys_to_schedule)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 1293, in _process_dags
self._process_task_instances(dag, tis_out)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/jobs/scheduler_job.py", line 778, in _process_task_instances
run.verify_integrity(session=session)
File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 70, in wrapper
return func(*args, **kwargs)
File "/usr/local/lib/python3.7/site-packages/airflow/models/dagrun.py", line 400, in verify_integrity
session.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 1036, in commit
self.transaction.commit()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 503, in commit
self._prepare_impl()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 482, in _prepare_impl
self.session.flush()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2479, in flush
self._flush(objects)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2617, in _flush
transaction.rollback(_capture_exception=True)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/langhelpers.py", line 68, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 153, in reraise
raise value
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/session.py", line 2577, in _flush
flush_context.execute()
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 422, in execute
rec.execute(self)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/unitofwork.py", line 589, in execute
uow,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
insert,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/orm/persistence.py", line 1084, in _emit_insert_statements
c = cached_connections[connection].execute(statement, multiparams)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 982, in execute
return meth(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/sql/elements.py", line 293, in _execute_on_connection
return connection._execute_clauseelement(self, multiparams, params)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1101, in _execute_clauseelement
distilled_params,
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1250, in _execute_context
e, statement, parameters, cursor, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1476, in _handle_dbapi_exception
util.raise_from_cause(sqlalchemy_exception, exc_info)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 398, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/util/compat.py", line 152, in reraise
raise value.with_traceback(tb)
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.IntegrityError: (psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-2, 2020-04-07 13:59:38.550927+00) already exists.
[SQL: INSERT INTO task_instance (task_id, dag_id, execution_date, start_date, end_date, duration, state, try_number, max_tries, hostname, unixname, job_id, pool, queue, priority_weight, operator, queued_dttm, pid, executor_config) VALUES (%(task_id)s, %(dag_id)s, %(execution_date)s, %(start_date)s, %(end_date)s, %(duration)s, %(state)s, %(try_number)s, %(max_tries)s, %(hostname)s, %(unixname)s, %(job_id)s, %(pool)s, %(queue)s, %(priority_weight)s, %(operator)s, %(queued_dttm)s, %(pid)s, %(executor_config)s)]
[parameters: ({'task_id': 'create_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7119, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458570>}, {'task_id': 'before_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 7118, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534582d0>}, {'task_id': 'after_training', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458540>}, {'task_id': 'remove_cluster', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 1, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458330>}, {'task_id': 'train_model_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458180>}, {'task_id': 'watch_training_AGGH_eon', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458360>}, {'task_id': 'train_model_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd653458630>}, {'task_id': 'watch_training_AGGH_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6534586f0>} ... displaying 10 of 7120 total bound parameter sets ... {'task_id': 'train_model_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 3, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd652758660>}, {'task_id': 'watch_training_ZYYJ_ein', 'dag_id': 'predict_model_training_combined-2', 'execution_date': datetime.datetime(2020, 4, 7, 13, 59, 38, 550927, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 'start_date': None, 'end_date': None, 'duration': None, 'state': None, 'try_number': 0, 'max_tries': 1, 'hostname': '', 'unixname': 'root', 'job_id': None, 'pool': 'default_pool', 'queue': 'default', 'priority_weight': 2, 'operator': None, 'queued_dttm': None, 'pid': None, 'executor_config': <psycopg2.extensions.Binary object at 0x7fd6527586f0>})]
(Background on this error at: http://sqlalche.me/e/gkpj)
Process DagFileProcessor104982-Process:
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/engine/base.py", line 1226, in _execute_context
cursor, statement, parameters, context
File "/usr/local/lib/python3.7/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py", line 854, in do_executemany
cursor.executemany(statement, parameters)
psycopg2.errors.UniqueViolation: duplicate key value violates unique constraint "task_instance_pkey"
DETAIL: Key (task_id, dag_id, execution_date)=(create_cluster, predict_model_training_combined-4, 2020-04-07 14:41:02.111727+00) already exists.
About this issue
- Original URL
- State: closed
- Created 4 years ago
- Reactions: 2
- Comments: 23 (8 by maintainers)
we noticed this issue as well in airflow 2.3.3
As soon as someone provides more information coming from the latest version of Airlfow ideally - including logs and circumstances - versions etc. (or ideally opens a new issue with all those details) I doubt there wil be solution @JasperSui @LucaSoato @tnyz we can try to diagnose and fix it.
I suggest to update to latest released version of Airflow and if the issue is there - report it (again ideally as a new issue with stacktraces and all details and linked to that isssue as likely similar). Adding “I also have the same problem” is good information but without extra details and stacktraces from the latest version it does not bring us any closer to finding a solution.
Please try to help the community by providing those details - this is the only
We implement hundreds of fixes in each minor release. And it’s possible the problem has been fixed there. We ONLY release fixes in the latest active branch - so you will have to upgrade to latest 2.* version anyway to get a fix (if there will be one) - upgrading now and reporting if the error is still there is the most efficient way how to speed up solving the issue.
@JasperSui @LucaSoato @tnyz - can we count on your help to the community with this one?
Same here.
As an update, latest 1.10.10 is still affected by this problem.
We met another similar problem when using
RestApi
POST /api/experimental/dags/<DAG_ID>/dag_runs
to trigger a DAG Run.However there is a unique key for
(dag_id, execution_date)
andexecution_date
istimestamp
(i.e. ms) level. When trigger multi DAG Runs programatically, it’s likely to get two DAG Runs with the sameexecution_date
andduplicate key error
will occur.I think
execution_date
is a decent but not a good design to identify the DAG Run’s uniqueness. 🤔In the other hand, the mysql
id
is more reasonable in some certain scenes.Yeah, I upgraded to 2.4.1 and it doesn’t happen again, I believe upgrading to 2.4.* will fix this.
Same issue here, is there a solution?