Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion airflow-core/src/airflow/jobs/scheduler_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2400,7 +2400,13 @@ def _schedule_all_dag_runs(
session: Session,
) -> list[tuple[DagRun, DagCallbackRequest | None]]:
"""Make scheduling decisions for all `dag_runs`."""
callback_tuples = [(run, self._schedule_dag_run(run, session=session)) for run in dag_runs]
callback_tuples = []
for run in dag_runs:
try:
callback = self._schedule_dag_run(run, session=session)
callback_tuples.append((run, callback))
except Exception:
self.log.exception("Error scheduling DAG run %s of %s", run.run_id, run.dag_id)
guard.commit()
return callback_tuples

Expand Down
58 changes: 58 additions & 0 deletions airflow-core/tests/unit/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -5319,6 +5319,64 @@ def test_scheduler_create_dag_runs_does_not_crash_on_deserialization_error(self,
f"Expected deserialization error log, got: {scheduler_messages}"
)

def test_schedule_all_dag_runs_does_not_crash_on_single_dag_run_error(self, dag_maker, caplog, session):
"""Test that _schedule_all_dag_runs continues processing other DAG runs
when one DAG run raises an exception during scheduling.

Previously, _schedule_all_dag_runs used a list comprehension that would
abort entirely if any single _schedule_dag_run call raised, crashing
the entire scheduler and stopping scheduling for ALL DAGs.

While the specific scenario used to reproduce this (a TaskInstance with
state=UP_FOR_RETRY and end_date=NULL) is nearly impossible under normal
operation, the lack of per-dag-run fault isolation means ANY unexpected
exception from ANY dag run would have the same catastrophic effect.
"""
# Create two DAGs with running DAG runs
with dag_maker(dag_id="good_dag", schedule="@once"):
EmptyOperator(task_id="good_task")
good_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)

with dag_maker(dag_id="bad_dag", schedule="@once"):
EmptyOperator(task_id="bad_task")
bad_run = dag_maker.create_dagrun(state=DagRunState.RUNNING)

session.flush()

scheduler_job = Job()
self.job_runner = SchedulerJobRunner(job=scheduler_job, executors=[self.null_exec])

caplog.clear()
with (
caplog.at_level("ERROR", logger="airflow.jobs.scheduler_job_runner"),
patch.object(
self.job_runner,
"_schedule_dag_run",
side_effect=[
TypeError("simulated crash from corrupted task instance"), # bad_run
None, # good_run
],
) as mock_schedule,
):
from airflow.utils.sqlalchemy import prohibit_commit

with prohibit_commit(session) as guard:
result = self.job_runner._schedule_all_dag_runs(guard, [bad_run, good_run], session=session)

# The good DAG run should have been processed despite the bad one failing
assert len(result) == 1
assert result[0][0] == good_run

# Both dag runs should have been attempted
assert mock_schedule.call_count == 2

# The error should have been logged
error_messages = [r.message for r in caplog.records if r.levelno >= logging.ERROR]
assert any(
msg == f"Error scheduling DAG run {bad_run.run_id} of {bad_run.dag_id}"
for msg in error_messages
)

def test_bulk_write_to_db_external_trigger_dont_skip_scheduled_run(self, dag_maker, testing_dag_bundle):
"""
Test that externally triggered Dag Runs should not affect (by skipping) next
Expand Down