Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,7 @@ def validate_context(self, dag: SerializedDAG) -> dict:
end=timezone.coerce_datetime(self.data_interval_end),
)
else:
data_interval = dag.timetable.infer_manual_data_interval(
run_after=coerced_logical_date or timezone.coerce_datetime(run_after)
)
run_after = data_interval.end
data_interval = dag.timetable.infer_manual_data_interval(run_after=coerced_logical_date)

run_id = self.dag_run_id or dag.timetable.generate_run_id(
run_type=DagRunType.MANUAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1883,6 +1883,41 @@ def test_should_respond_200_with_null_logical_date(self, test_client):
"partition_key": None,
}

@pytest.mark.usefixtures("configure_git_connection_for_dag_bundle")
def test_should_generate_unique_run_id_for_scheduled_dag(self, dag_maker, test_client, session):
"Ensure manual triggers on scheduled DAGs don't conflict on run_id"
scheduled_dag_id = "test_scheduled_dag"
with dag_maker(
dag_id=scheduled_dag_id,
schedule="@daily",
start_date=START_DATE1,
session=session,
serialized=True,
):
EmptyOperator(task_id="test_task")

session.commit()

response_1 = test_client.post(
f"/dags/{scheduled_dag_id}/dagRuns",
json={
"logical_date": "2025-12-11T16:00:00+00:00",
"run_after": "2025-12-11T16:00:00+00:00",
},
)
assert response_1.status_code == 200

response_2 = test_client.post(
f"/dags/{scheduled_dag_id}/dagRuns",
json={
"logical_date": "2025-12-11T16:01:00+00:00",
"run_after": "2025-12-11T16:01:00+00:00",
},
)
assert response_2.status_code == 200

assert response_1.json()["dag_run_id"] != response_2.json()["dag_run_id"]

@time_machine.travel("2025-10-02 12:00:00", tick=False)
@pytest.mark.usefixtures("custom_timetable_plugin")
def test_custom_timetable_generate_run_id_for_manual_trigger(self, dag_maker, test_client, session):
Expand Down
Loading