diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py index fc2fe3ed2d2aa..fb841c55480d4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py @@ -23,11 +23,18 @@ from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse +class LatestRunStats(BaseModel): + """Stats for the latest DAG run.""" + + task_instance_counts: dict[str, int] + + class DAGWithLatestDagRunsResponse(DAGResponse): """DAG with latest dag runs response serializer.""" asset_expression: dict | None latest_dag_runs: list[DAGRunLightResponse] + latest_run_stats: dict[TaskInstanceState, int] | None = None pending_actions: list[HITLDetail] is_favorite: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index dbbbf43758cae..81e11c7ef8adc 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1707,6 +1707,10 @@ components: $ref: '#/components/schemas/DAGRunLightResponse' type: array title: Latest Dag Runs + latest_run_stats: + anyOf: + - $ref: '#/components/schemas/LatestRunStats' + - type: 'null' pending_actions: items: $ref: '#/components/schemas/HITLDetail' @@ -2213,6 +2217,18 @@ components: - unixname title: JobResponse description: Job serializer for responses. + LatestRunStats: + properties: + task_instance_counts: + additionalProperties: + type: integer + type: object + title: Task Instance Counts + type: object + required: + - task_instance_counts + title: LatestRunStats + description: Stats for the latest DAG run. LightGridTaskInstanceSummary: properties: task_id: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index 93914cca5c401..a693191340dc9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -57,6 +57,7 @@ from airflow.api_fastapi.core_api.datamodels.ui.dags import ( DAGWithLatestDagRunsCollectionResponse, DAGWithLatestDagRunsResponse, + LatestRunStats, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import ( @@ -211,6 +212,48 @@ def get_dags( recent_dag_runs = session.execute(recent_dag_runs_select) + # Fetch task instance summary stats for the latest DagRun per DAG + latest_run_per_dag_subquery = ( + select( + DagRun.dag_id, + DagRun.run_id, + func.rank() + .over( + partition_by=DagRun.dag_id, + order_by=DagRun.run_after.desc(), + ) + .label("rank"), + ) + .where(DagRun.dag_id.in_([dag.dag_id for dag in dags])) + .subquery() + ) + + task_instance_stats_select = ( + select( + latest_run_per_dag_subquery.c.dag_id, + TaskInstance.state, + func.count(TaskInstance.task_id).label("count"), + ) + .join( + TaskInstance, + and_( + TaskInstance.dag_id == latest_run_per_dag_subquery.c.dag_id, + TaskInstance.run_id == latest_run_per_dag_subquery.c.run_id, + ), + ) + .where(latest_run_per_dag_subquery.c.rank == 1) + .group_by(latest_run_per_dag_subquery.c.dag_id, TaskInstance.state) + ) + + task_instance_stats_rows = session.execute(task_instance_stats_select) + + # Normalize task instance stats by dag_id + latest_run_stats_by_dag: dict[str, dict[str, int]] = {dag.dag_id: {} for dag in dags} + + for dag_id, state, count in task_instance_stats_rows: + if state is not None: + latest_run_stats_by_dag[dag_id][state] = count + # Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags} if has_pending_actions.value: @@ -241,6 +284,7 @@ def get_dags( **DAGResponse.model_validate(dag).model_dump(), "asset_expression": dag.asset_expression, "latest_dag_runs": [], + "latest_run_stats": LatestRunStats(task_instance_counts=latest_run_stats_by_dag.get(dag.dag_id, {})), "pending_actions": pending_actions_by_dag_id[dag.dag_id], "is_favorite": dag.dag_id in favorite_dag_ids, } diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index de34ac2f9be2a..8b1f665d6a352 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -7654,6 +7654,16 @@ export const $DAGWithLatestDagRunsResponse = { type: 'array', title: 'Latest Dag Runs' }, + latest_run_stats: { + anyOf: [ + { + '$ref': '#/components/schemas/LatestRunStats' + }, + { + type: 'null' + } + ] + }, pending_actions: { items: { '$ref': '#/components/schemas/HITLDetail' @@ -7941,6 +7951,22 @@ export const $HistoricalMetricDataResponse = { description: 'Historical Metric Data serializer for responses.' } as const; +export const $LatestRunStats = { + properties: { + task_instance_counts: { + additionalProperties: { + type: 'integer' + }, + type: 'object', + title: 'Task Instance Counts' + } + }, + type: 'object', + required: ['task_instance_counts'], + title: 'LatestRunStats', + description: 'Stats for the latest DAG run.' +} as const; + export const $LightGridTaskInstanceSummary = { properties: { task_id: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index ef22addbc4f45..21833ee2fa869 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1887,6 +1887,7 @@ export type DAGWithLatestDagRunsResponse = { [key: string]: unknown; } | null; latest_dag_runs: Array; + latest_run_stats?: LatestRunStats | null; pending_actions: Array; is_favorite: boolean; /** @@ -1965,6 +1966,15 @@ export type HistoricalMetricDataResponse = { task_instance_states: TaskInstanceStateCount; }; +/** + * Stats for the latest DAG run. + */ +export type LatestRunStats = { + task_instance_counts: { + [key: string]: (number); +}; +}; + /** * Task Instance Summary model for the Grid UI. */ diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 7821e6babf7c0..45c963cb5db14 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -263,6 +263,7 @@ }, "taskInstance_one": "Task Instance", "taskInstance_other": "Task Instances", + "taskInstanceSummary": "Task Instance Summary", "timeRange": { "last12Hours": "Last 12 Hours", "last24Hours": "Last 24 Hours", diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 04fdb830636b0..c7bb5ad7682e6 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -107,6 +107,7 @@ const mockDag = { pending_actions: [], relative_fileloc: "nested_task_groups.py", tags: [], + latest_run_stats: { task_instance_counts: { success: 2, failed: 1 } }, timetable_description: "Every minute", timetable_summary: "* * * * *", } satisfies DAGWithLatestDagRunsResponse; @@ -198,22 +199,14 @@ describe("DagCard", () => { expect(latestRunElement).toHaveTextContent("2025-09-19 19:22:00"); }); - it("DagCard should render next run section with timestamp", () => { - render(, { wrapper: GMTWrapper }); - const nextRunElement = screen.getByTestId("next-run"); - - expect(nextRunElement).toBeInTheDocument(); - // Should display the formatted next run timestamp (converted to GMT timezone) - expect(nextRunElement).toHaveTextContent("2024-08-22 19:00:00"); - }); - it("DagCard should render StateBadge as success", () => { render(, { wrapper: GMTWrapper }); - const stateBadge = screen.getByTestId("state-badge"); + const stateBadges = screen.getAllByTestId("state-badge"); - expect(stateBadge).toBeInTheDocument(); + // First badge is from DagRunInfo (latest run state) + expect(stateBadges[0]).toBeInTheDocument(); // Should have the success state from mockDag.latest_dag_runs[0].state - expect(stateBadge).toHaveAttribute("aria-label", "success"); + expect(stateBadges[0]).toHaveAttribute("aria-label", "success"); }); it("DagCard should render StateBadge as failed", () => { @@ -234,10 +227,30 @@ describe("DagCard", () => { } satisfies DAGWithLatestDagRunsResponse; render(, { wrapper: GMTWrapper }); - const stateBadge = screen.getByTestId("state-badge"); + const stateBadges = screen.getAllByTestId("state-badge"); - expect(stateBadge).toBeInTheDocument(); + // First badge is from DagRunInfo (latest run state) + expect(stateBadges[0]).toBeInTheDocument(); // Should have the failed state - expect(stateBadge).toHaveAttribute("aria-label", "failed"); + expect(stateBadges[0]).toHaveAttribute("aria-label", "failed"); + }); + + it("DagCard should render TaskInstanceSummary when DAG has latest_run_stats", () => { + render(, { wrapper: GMTWrapper }); + const taskInstanceSummary = screen.getByTestId("task-instance-summary"); + + expect(taskInstanceSummary).toBeInTheDocument(); + }); + + it("DagCard should not render TaskInstanceSummary when DAG has no latest_run_stats", () => { + const mockDagWithNoSummary = { + ...mockDag, + latest_run_stats: null, + } satisfies DAGWithLatestDagRunsResponse; + + render(, { wrapper: GMTWrapper }); + const taskInstanceSummary = screen.queryByTestId("task-instance-summary"); + + expect(taskInstanceSummary).toBeNull(); }); }); diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index a43233feb9b10..dde00684d64e7 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -34,6 +34,7 @@ import { isStatePending, useAutoRefresh } from "src/utils"; import { DagTags } from "./DagTags"; import { RecentRuns } from "./RecentRuns"; import { Schedule } from "./Schedule"; +import { TaskInstanceSummary } from "./TaskInstanceSummary"; type Props = { readonly dag: DAGWithLatestDagRunsResponse; @@ -76,6 +77,7 @@ export const DagCard = ({ dag }: Props) => { + { timetableSummary={dag.timetable_summary} /> + + + {Boolean(dag.next_dagrun_run_after) ? ( + + ) : undefined} + + {latestRun ? ( @@ -104,14 +116,9 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - - {Boolean(dag.next_dagrun_run_after) ? ( - - ) : undefined} - + + + diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx new file mode 100644 index 0000000000000..0f71086b05685 --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -0,0 +1,54 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +import { HStack } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; + +import type { LatestRunStats, TaskInstanceState } from "openapi/requests/types.gen"; +import { Stat } from "src/components/Stat"; +import { StateBadge } from "src/components/StateBadge"; + +type Props = { + readonly latestRunStats: LatestRunStats | null | undefined; +}; + +export const TaskInstanceSummary = ({ latestRunStats }: Props) => { + const { t: translate } = useTranslation("common"); + + if (!latestRunStats) { + return null; + } + + const stateEntries = Object.entries(latestRunStats.task_instance_counts); + + if (stateEntries.length === 0) { + return null; + } + + return ( + + + {stateEntries.map(([state, count]) => ( + + {count} + + ))} + + + ); +}; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 801f8754ed13c..baf42d56f976a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -16,18 +16,19 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING from unittest import mock import pendulum import pytest from fastapi.testclient import TestClient +from sqlalchemy import insert from sqlalchemy.orm import Session from airflow.models import DagRun from airflow.models.dag import DagModel, DagTag from airflow.models.dag_favorite import DagFavorite from airflow.models.hitl import HITLDetail +from airflow.models.taskinstance import TaskInstance as TI from airflow.sdk.timezone import utcnow from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, TaskInstanceState @@ -43,9 +44,6 @@ TestDagEndpoint as TestPublicDagEndpoint, ) -if TYPE_CHECKING: - from tests_common.pytest_plugin import TaskInstance - pytestmark = pytest.mark.db_test @@ -130,7 +128,7 @@ def test_should_return_200(self, test_client, query_params, expected_ids, expect previous_run_after = dag_run["run_after"] @pytest.fixture - def setup_hitl_data(self, create_task_instance: TaskInstance, session: Session): + def setup_hitl_data(self, create_task_instance: TI, session: Session): """Setup HITL test data for parametrized tests.""" # 3 Dags (test_dag0 created here and test_dag1, test_dag2 created in setup_dag_runs) # 5 task instances in test_dag0 @@ -384,3 +382,184 @@ def test_is_favorite_field_user_specific(self, test_client, session): # Verify that DAG1 is not marked as favorite for the test user dag1_data = next(dag for dag in body["dags"] if dag["dag_id"] == DAG1_ID) assert dag1_data["is_favorite"] is False + + def test_latest_run_stats_returns_aggregated_counts( + self, test_client, session + ): + """Test that latest_run_stats returns aggregated task instance counts by state.""" + dag_id = "test_dag_ti_summary" + + # Create a DAG + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.flush() + + # Create a DAG run + start_date = pendulum.datetime(2024, 1, 1, 0, 0, 0, tz="UTC") + dag_run = DagRun( + dag_id=dag_id, + run_id="test_run_1", + run_type=DagRunType.MANUAL, + start_date=start_date, + logical_date=start_date, + run_after=start_date, + state=DagRunState.RUNNING, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(dag_run) + session.flush() + + # Create task instances with different states using raw insert + states = [ + TaskInstanceState.SUCCESS, + TaskInstanceState.SUCCESS, + TaskInstanceState.SUCCESS, + TaskInstanceState.FAILED, + TaskInstanceState.FAILED, + TaskInstanceState.RUNNING, + ] + for i, state in enumerate(states): + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="test_run_1", + task_id=f"task_{i}", + state=state, + map_index=-1, + pool="default_pool", + ) + ) + + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify latest_run_stats is present and correct + assert body["total_entries"] == 1 + dag_data = body["dags"][0] + assert "latest_run_stats" in dag_data + + summary = dag_data["latest_run_stats"]["task_instance_counts"] + assert summary.get("success") == 3 + assert summary.get("failed") == 2 + assert summary.get("running") == 1 + + def test_latest_run_stats_only_includes_latest_run( + self, test_client, session + ): + """Test that latest_run_stats only includes task instances from the latest DAG run.""" + dag_id = "test_dag_ti_summary_latest" + + # Create a DAG + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.flush() + + # Create an older DAG run + older_date = pendulum.datetime(2024, 1, 1, 0, 0, 0, tz="UTC") + older_run = DagRun( + dag_id=dag_id, + run_id="older_run", + run_type=DagRunType.MANUAL, + start_date=older_date, + logical_date=older_date, + run_after=older_date, + state=DagRunState.SUCCESS, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(older_run) + session.flush() + + # Create task instances for older run (all failed) + for i in range(3): + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="older_run", + task_id=f"old_task_{i}", + state=TaskInstanceState.FAILED, + map_index=-1, + pool="default_pool", + ) + ) + + # Create a newer DAG run + newer_date = pendulum.datetime(2024, 2, 1, 0, 0, 0, tz="UTC") + newer_run = DagRun( + dag_id=dag_id, + run_id="newer_run", + run_type=DagRunType.MANUAL, + start_date=newer_date, + logical_date=newer_date, + run_after=newer_date, + state=DagRunState.RUNNING, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(newer_run) + session.flush() + + # Create task instances for newer run (all success) + for i in range(2): + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="newer_run", + task_id=f"new_task_{i}", + state=TaskInstanceState.SUCCESS, + map_index=-1, + pool="default_pool", + ) + ) + + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify latest_run_stats only reflects the latest run + dag_data = body["dags"][0] + summary = dag_data["latest_run_stats"]["task_instance_counts"] + + # Should only have success states from the newer run + assert summary.get("success") == 2 + # Should NOT include failed states from older run + assert summary.get("failed") is None + + def test_latest_run_stats_empty_when_no_task_instances(self, test_client, session): + """Test that latest_run_stats is empty when DAG has no task instances.""" + dag_id = "test_dag_no_ti" + + # Create a DAG without any task instances + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify latest_run_stats has empty task_instance_counts + dag_data = body["dags"][0] + assert dag_data["latest_run_stats"]["task_instance_counts"] == {} +