-
Notifications
You must be signed in to change notification settings - Fork 16.4k
Closed
Labels
area:APIAirflow's REST/HTTP APIAirflow's REST/HTTP APIarea:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet
Description
Apache Airflow version
Other Airflow 2/3 version (please specify below)
If "Other Airflow 2/3 version" selected, which one?
3.1.3
What happened?
We recently upgraded Airflow from v3.0.4 to v3.1.3 and since the upgrade, we are seeing that if you manually trigger a DAG with schedule set, the run ID doesn't contain the current timestamp anymore and in fact uses the schedule intervals. This means if you want to trigger the DAG manually twice, it uses the same run-id and fails with 409 conflict error.
If it helps, this error doesn't come if schedule=None.
Example 1:
- schedule="@daily",
- Trigger DAG manually at 4 PM UTC - The run id would be
manual__2025-12-11T00:00:00+00:00 - Trigger DAG manually at 4:01 PM UTC - The run id would be same and fails with below error
409 Conflict
reason: Unique constraint violationstatement: INSERT INTO dag_run (dag_id, queued_at, logical_date, start_date, end_date, state, run_id, creating_job_id, run_type, triggered_by, triggering_user_name, conf, data_interval_start, data_interval_end, run_after, last_scheduling_decision, log_template_id, updated_at, clear_number, backfill_id, bundle_version, scheduled_by_job_id, context_carrier, created_dag_version_id) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)orig_error: (1062, "Duplicate entry '<dag-name>-manual__2025-12-11T00:00:00+00:00' for key 'dag_run.dag_run_dag_id_run_id_key'")message: Error with id gQSY9sM9
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/_exception_handler.py", line 42, in wrapped_app
await app(scope, receive, sender)
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/routing.py", line 75, in app
response = await f(request)
^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py", line 308, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/fastapi/routing.py", line 221, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/starlette/concurrency.py", line 38, in run_in_threadpool
return await anyio.to_thread.run_sync(func)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 2485, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/anyio/_backends/_asyncio.py", line 976, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/api_fastapi/core_api/routes/public/dag_run.py", line 445, in trigger_dag_run
dag_run = dag.create_dagrun(
^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 3293, in create_dagrun
orm_dagrun = _create_orm_dagrun(
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/utils/session.py", line 98, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/airflow/serialization/serialized_objects.py", line 2353, in _create_orm_dagrun
session.flush()
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3449, in flush
self._flush(objects)
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3588, in _flush
with util.safe_reraise():
^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
compat.raise_(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 3549, in _flush
flush_context.execute()
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 456, in execute
rec.execute(self)
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/unitofwork.py", line 630, in execute
util.preloaded.orm_persistence.save_obj(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 245, in save_obj
_emit_insert_statements(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/orm/persistence.py", line 1238, in _emit_insert_statements
result = connection._execute_20(
^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1710, in _execute_20
return meth(self, args_10style, kwargs_10style, execution_options)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/sql/elements.py", line 334, in _execute_on_connection
return connection._execute_clauseelement(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1577, in _execute_clauseelement
ret = self._execute_context(
^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1953, in _execute_context
self._handle_dbapi_exception(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 2134, in _handle_dbapi_exception
util.raise_(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/util/compat.py", line 211, in raise_
raise exception
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/base.py", line 1910, in _execute_context
self.dialect.do_execute(
File "/home/airflow/.local/lib/python3.12/site-packages/sqlalchemy/engine/default.py", line 736, in do_execute
cursor.execute(statement, parameters)
File "/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/cursors.py", line 179, in execute
res = self._query(mogrified_query)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/cursors.py", line 330, in _query
db.query(q)
File "/home/airflow/.local/lib/python3.12/site-packages/MySQLdb/connections.py", line 280, in query
_mysql.connection.query(self, query)
Example 2:
- schedule=None
- Trigger DAG manually - The run id would be
manual__2025-12-11T20:26:02+00:00(as expected)
What you think should happen instead?
In Example 1 above, the run ID should be different for each manual trigger regardless of the configured schedule.
How to reproduce
- Manually trigger a DAG with a configured schedule
- Manually trigger the second time the same DAG and it would fail with 409 error
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- Yes I am willing to submit a PR!
Code of Conduct
- I agree to follow this project's Code of Conduct
hnomichith, Zingeryo, amit-mittal and spokelse1
Metadata
Metadata
Assignees
Labels
area:APIAirflow's REST/HTTP APIAirflow's REST/HTTP APIarea:corekind:bugThis is a clearly a bugThis is a clearly a bugneeds-triagelabel for new issues that we didn't triage yetlabel for new issues that we didn't triage yet