diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 367447968c2e0..22a3abcad426b 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2400,7 +2400,13 @@ def _schedule_all_dag_runs( session: Session, ) -> list[tuple[DagRun, DagCallbackRequest | None]]: """Make scheduling decisions for all `dag_runs`.""" - callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs] + callback_tuples = [] + for run in dag_runs: + try: + callback = self._schedule_dag_run(run, session=session) + callback_tuples.append((run, callback)) + except Exception: + self.log.exception("Error scheduling DAG run %s of %s", run.run_id, run.dag_id) guard.commit() return callback_tuples diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index ae6388f1b8a01..1ebc3529efee7 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -5319,6 +5319,64 @@ def test_scheduler_create_dag_runs_does_not_crash_on_deserialization_error(self, f"Expected deserialization error log, got: {scheduler_messages}" ) + def test_schedule_all_dag_runs_does_not_crash_on_single_dag_run_error(self, dag_maker, caplog, session): + """Test that _schedule_all_dag_runs continues processing other DAG runs + when one DAG run raises an exception during scheduling. + + Previously, _schedule_all_dag_runs used a list comprehension that would + abort entirely if any single _schedule_dag_run call raised, crashing + the entire scheduler and stopping scheduling for ALL DAGs. + + While the specific scenario used to reproduce this (a TaskInstance with + state=UP_FOR_RETRY and end_date=NULL) is nearly impossible under normal + operation, the lack of per-dag-run fault isolation means ANY unexpected + exception from ANY dag run would have the same catastrophic effect. + """ + # Create two DAGs with running DAG runs + with dag_maker(dag_id="good_dag", schedule="@once"): + EmptyOperator(task_id="good_task") + good_run = dag_maker.create_dagrun(state=DagRunState.RUNNING) + + with dag_maker(dag_id="bad_dag", schedule="@once"): + EmptyOperator(task_id="bad_task") + bad_run = dag_maker.create_dagrun(state=DagRunState.RUNNING) + + session.flush() + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec]) + + caplog.clear() + with ( + caplog.at_level("ERROR", logger="airflow.jobs.scheduler_job_runner"), + patch.object( + self.job_runner, + "_schedule_dag_run", + side_effect=[ + TypeError("simulated crash from corrupted task instance"), # bad_run + None, # good_run + ], + ) as mock_schedule, + ): + from airflow.utils.sqlalchemy import prohibit_commit + + with prohibit_commit(session) as guard: + result = self.job_runner._schedule_all_dag_runs(guard, [bad_run, good_run], session=session) + + # The good DAG run should have been processed despite the bad one failing + assert len(result) == 1 + assert result[0][0] == good_run + + # Both dag runs should have been attempted + assert mock_schedule.call_count == 2 + + # The error should have been logged + error_messages = [r.message for r in caplog.records if r.levelno >= logging.ERROR] + assert any( + msg == f"Error scheduling DAG run {bad_run.run_id} of {bad_run.dag_id}" + for msg in error_messages + ) + def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_maker, testing_dag_bundle): """ Test that externally triggered Dag Runs should not affect (by skipping) next