From 42249d3b706e92a4bd6bb93e5c8e00ac144d302a Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Sun, 4 Jan 2026 21:35:52 +0530 Subject: [PATCH 01/13] UI: add task instance summary to DAG cards --- .../airflow/ui/src/pages/DagsList/DagCard.tsx | 87 +++++++++++++++++-- 1 file changed, 80 insertions(+), 7 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index a43233feb9b10..5f991a7f07b35 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -16,7 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -import { Box, Flex, HStack, SimpleGrid, Link, Spinner } from "@chakra-ui/react"; + +import { + Box, + Flex, + HStack, + SimpleGrid, + Link, + Spinner, + Badge, + Wrap, + WrapItem, +} from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink } from "react-router-dom"; @@ -30,6 +41,7 @@ import { TogglePause } from "src/components/TogglePause"; import TriggerDAGButton from "src/components/TriggerDag/TriggerDAGButton"; import { Tooltip } from "src/components/ui"; import { isStatePending, useAutoRefresh } from "src/utils"; +import { useTaskInstances } from "src/queries/useTaskInstances"; import { DagTags } from "./DagTags"; import { RecentRuns } from "./RecentRuns"; @@ -39,6 +51,65 @@ type Props = { readonly dag: DAGWithLatestDagRunsResponse; }; +/* ------------------------------- + * Task Instance Summary Component + * ------------------------------- */ +type TaskSummaryProps = { + dagId: string; + runId: string; +}; + +const TaskInstanceSummary = ({ dagId, runId }: TaskSummaryProps) => { + const { data, isLoading } = useTaskInstances({ + dagId, + dagRunId: runId, + limit: 0, + }); + + if (isLoading) { + return ; + } + + if (!data?.task_instances?.length) { + return null; + } + + const counts = data.task_instances.reduce>((acc, ti) => { + if (ti.state) { + acc[ti.state] = (acc[ti.state] ?? 0) + 1; + } + return acc; + }, {}); + + const STATE_COLORS: Record = { + success: "green", + failed: "red", + running: "yellow", + skipped: "gray", + upstream_failed: "red", + }; + + return ( + + {Object.entries(counts).map(([state, count]) => ( + + + + {state}: {count} + + + + ))} + + ); +}; + +/* ------------------------------- + * DAG Card + * ------------------------------- */ export const DagCard = ({ dag }: Props) => { const { t: translate } = useTranslation(["common", "dag"]); const [latestRun] = dag.latest_dag_runs; @@ -76,6 +147,7 @@ export const DagCard = ({ dag }: Props) => { + { timetableSummary={dag.timetable_summary} /> + {latestRun ? ( @@ -104,14 +177,14 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - - {Boolean(dag.next_dagrun_run_after) ? ( - + + {/* 🔽 NEW: Task Instance Summary */} + + {latestRun ? ( + ) : undefined} + From 45136b2255d632295ddd2f248baa6d6c64401801 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Tue, 6 Jan 2026 20:42:31 +0530 Subject: [PATCH 02/13] Fix task instance summary rendering during local testing --- .../ui/public/i18n/locales/en/common.json | 1 + .../airflow/ui/src/pages/DagsList/DagCard.tsx | 84 ++----------------- .../pages/DagsList/TaskInstanceSummary.tsx | 82 ++++++++++++++++++ 3 files changed, 90 insertions(+), 77 deletions(-) create mode 100644 airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 7821e6babf7c0..45c963cb5db14 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -263,6 +263,7 @@ }, "taskInstance_one": "Task Instance", "taskInstance_other": "Task Instances", + "taskInstanceSummary": "Task Instance Summary", "timeRange": { "last12Hours": "Last 12 Hours", "last24Hours": "Last 24 Hours", diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index 5f991a7f07b35..a42d29b954bd9 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -17,17 +17,7 @@ * under the License. */ -import { - Box, - Flex, - HStack, - SimpleGrid, - Link, - Spinner, - Badge, - Wrap, - WrapItem, -} from "@chakra-ui/react"; +import { Box, Flex, HStack, SimpleGrid, Link, Spinner } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink } from "react-router-dom"; @@ -41,75 +31,16 @@ import { TogglePause } from "src/components/TogglePause"; import TriggerDAGButton from "src/components/TriggerDag/TriggerDAGButton"; import { Tooltip } from "src/components/ui"; import { isStatePending, useAutoRefresh } from "src/utils"; -import { useTaskInstances } from "src/queries/useTaskInstances"; import { DagTags } from "./DagTags"; import { RecentRuns } from "./RecentRuns"; import { Schedule } from "./Schedule"; +import { TaskInstanceSummary } from "./TaskInstanceSummary"; type Props = { readonly dag: DAGWithLatestDagRunsResponse; }; -/* ------------------------------- - * Task Instance Summary Component - * ------------------------------- */ -type TaskSummaryProps = { - dagId: string; - runId: string; -}; - -const TaskInstanceSummary = ({ dagId, runId }: TaskSummaryProps) => { - const { data, isLoading } = useTaskInstances({ - dagId, - dagRunId: runId, - limit: 0, - }); - - if (isLoading) { - return ; - } - - if (!data?.task_instances?.length) { - return null; - } - - const counts = data.task_instances.reduce>((acc, ti) => { - if (ti.state) { - acc[ti.state] = (acc[ti.state] ?? 0) + 1; - } - return acc; - }, {}); - - const STATE_COLORS: Record = { - success: "green", - failed: "red", - running: "yellow", - skipped: "gray", - upstream_failed: "red", - }; - - return ( - - {Object.entries(counts).map(([state, count]) => ( - - - - {state}: {count} - - - - ))} - - ); -}; - -/* ------------------------------- - * DAG Card - * ------------------------------- */ export const DagCard = ({ dag }: Props) => { const { t: translate } = useTranslation(["common", "dag"]); const [latestRun] = dag.latest_dag_runs; @@ -178,12 +109,11 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - {/* 🔽 NEW: Task Instance Summary */} - - {latestRun ? ( - - ) : undefined} - + {latestRun ? ( + + ) : ( + — + )} diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx new file mode 100644 index 0000000000000..5367533155def --- /dev/null +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -0,0 +1,82 @@ +/*! + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import { HStack, Spinner, Text } from "@chakra-ui/react"; +import { useTranslation } from "react-i18next"; + +import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; +import type { TaskInstanceState } from "openapi/requests/types.gen"; +import { StateBadge } from "src/components/StateBadge"; +import { Stat } from "src/components/Stat"; + +type Props = { + readonly dagId: string; + readonly runId: string; +}; + +export const TaskInstanceSummary = ({ dagId, runId }: Props) => { + const { t: translate } = useTranslation("common"); + + const { data, isLoading } = useTaskInstanceServiceGetTaskInstances({ + dagId, + dagRunId: runId, + limit: 1000, + }); + + if (isLoading) { + return ( + + + + ); + } + + // Count task instances by state + const stateCounts: Record = {}; + + data?.task_instances?.forEach((ti) => { + const state = ti.state ?? "no_status"; + + stateCounts[state] = (stateCounts[state] ?? 0) + 1; + }); + + const stateEntries = Object.entries(stateCounts); + + if (stateEntries.length === 0) { + return ( + + + — + + + ); + } + + return ( + + + {stateEntries.map(([state, count]) => ( + + {count} + + ))} + + + ); +}; From 8ba0a24559ea5f174a62612d41f62c46d9d5c743 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Wed, 7 Jan 2026 01:40:39 +0530 Subject: [PATCH 03/13] UI: Render TaskInstanceSummary only when latest run exists --- .../ui/src/pages/DagsList/DagCard.test.tsx | 61 ++++++++++++++----- .../airflow/ui/src/pages/DagsList/DagCard.tsx | 4 +- .../pages/DagsList/TaskInstanceSummary.tsx | 10 +-- 3 files changed, 49 insertions(+), 26 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 04fdb830636b0..192047a400e36 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -46,6 +46,25 @@ vi.mock("src/context/timezone", async () => { }; }); +// Mock the task instances API to return sample data +vi.mock("openapi/queries", async () => { + const actual = await vi.importActual("openapi/queries"); + + return { + ...actual, + useTaskInstanceServiceGetTaskInstances: () => ({ + data: { + task_instances: [ + { state: "success" }, + { state: "success" }, + { state: "failed" }, + ], + }, + isLoading: false, + }), + }; +}); + // Custom wrapper that uses GMT timezone const GMTWrapper = ({ children }: PropsWithChildren) => ( @@ -198,22 +217,14 @@ describe("DagCard", () => { expect(latestRunElement).toHaveTextContent("2025-09-19 19:22:00"); }); - it("DagCard should render next run section with timestamp", () => { - render(, { wrapper: GMTWrapper }); - const nextRunElement = screen.getByTestId("next-run"); - - expect(nextRunElement).toBeInTheDocument(); - // Should display the formatted next run timestamp (converted to GMT timezone) - expect(nextRunElement).toHaveTextContent("2024-08-22 19:00:00"); - }); - it("DagCard should render StateBadge as success", () => { render(, { wrapper: GMTWrapper }); - const stateBadge = screen.getByTestId("state-badge"); + const stateBadges = screen.getAllByTestId("state-badge"); - expect(stateBadge).toBeInTheDocument(); + // First badge is from DagRunInfo (latest run state) + expect(stateBadges[0]).toBeInTheDocument(); // Should have the success state from mockDag.latest_dag_runs[0].state - expect(stateBadge).toHaveAttribute("aria-label", "success"); + expect(stateBadges[0]).toHaveAttribute("aria-label", "success"); }); it("DagCard should render StateBadge as failed", () => { @@ -234,10 +245,30 @@ describe("DagCard", () => { } satisfies DAGWithLatestDagRunsResponse; render(, { wrapper: GMTWrapper }); - const stateBadge = screen.getByTestId("state-badge"); + const stateBadges = screen.getAllByTestId("state-badge"); - expect(stateBadge).toBeInTheDocument(); + // First badge is from DagRunInfo (latest run state) + expect(stateBadges[0]).toBeInTheDocument(); // Should have the failed state - expect(stateBadge).toHaveAttribute("aria-label", "failed"); + expect(stateBadges[0]).toHaveAttribute("aria-label", "failed"); + }); + + it("DagCard should render TaskInstanceSummary when DAG has a latest run", () => { + render(, { wrapper: GMTWrapper }); + const taskInstanceSummary = screen.getByTestId("task-instance-summary"); + + expect(taskInstanceSummary).toBeInTheDocument(); + }); + + it("DagCard should not render TaskInstanceSummary when DAG has no latest run", () => { + const mockDagWithNoRuns = { + ...mockDag, + latest_dag_runs: [], + } satisfies DAGWithLatestDagRunsResponse; + + render(, { wrapper: GMTWrapper }); + const taskInstanceSummary = screen.queryByTestId("task-instance-summary"); + + expect(taskInstanceSummary).toBeNull(); }); }); diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index a42d29b954bd9..484f7d5f068f0 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -111,9 +111,7 @@ export const DagCard = ({ dag }: Props) => { {latestRun ? ( - ) : ( - — - )} + ) : undefined} diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx index 5367533155def..b07aabdb5d754 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -17,7 +17,7 @@ * under the License. */ -import { HStack, Spinner, Text } from "@chakra-ui/react"; +import { HStack, Spinner } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; @@ -59,13 +59,7 @@ export const TaskInstanceSummary = ({ dagId, runId }: Props) => { const stateEntries = Object.entries(stateCounts); if (stateEntries.length === 0) { - return ( - - - — - - - ); + return null; } return ( From cbed3703cc44ba15676966be179ebc2d7756bc68 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Wed, 7 Jan 2026 03:23:24 +0530 Subject: [PATCH 04/13] UI: restore next DAG run info in DagCard --- .../src/airflow/ui/src/pages/DagsList/DagCard.tsx | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index 484f7d5f068f0..33fafa62965bd 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -90,6 +90,15 @@ export const DagCard = ({ dag }: Props) => { /> + + {Boolean(dag.next_dagrun_run_after) ? ( + + ) : undefined} + + {latestRun ? ( From 771ad16a59c0869ead9975bb75263f8ee18a6b5d Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Wed, 7 Jan 2026 04:11:57 +0530 Subject: [PATCH 05/13] UI: guard against undefined task instance data --- .../airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx index b07aabdb5d754..99141a7a26563 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -47,12 +47,15 @@ export const TaskInstanceSummary = ({ dagId, runId }: Props) => { ); } + if (!data) { + return null; + } + // Count task instances by state const stateCounts: Record = {}; - data?.task_instances?.forEach((ti) => { + data.task_instances.forEach((ti) => { const state = ti.state ?? "no_status"; - stateCounts[state] = (stateCounts[state] ?? 0) + 1; }); From 6fbe911b6dd8c26470a46bd2c4c9a19cc71f7142 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Fri, 9 Jan 2026 14:25:05 +0530 Subject: [PATCH 06/13] Fix lint issue in TaskInstanceSummary --- .../src/airflow/ui/src/pages/DagsList/DagCard.test.tsx | 6 +----- airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx | 5 +---- .../airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx | 4 ++-- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 192047a400e36..3c543c9f51ad0 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -54,11 +54,7 @@ vi.mock("openapi/queries", async () => { ...actual, useTaskInstanceServiceGetTaskInstances: () => ({ data: { - task_instances: [ - { state: "success" }, - { state: "success" }, - { state: "failed" }, - ], + task_instances: [{ state: "success" }, { state: "success" }, { state: "failed" }], }, isLoading: false, }), diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index 33fafa62965bd..b7a9ca395988c 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - import { Box, Flex, HStack, SimpleGrid, Link, Spinner } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { Link as RouterLink } from "react-router-dom"; @@ -118,9 +117,7 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - {latestRun ? ( - - ) : undefined} + {latestRun ? : undefined} diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx index 99141a7a26563..be717a9670ae5 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -16,14 +16,13 @@ * specific language governing permissions and limitations * under the License. */ - import { HStack, Spinner } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; import type { TaskInstanceState } from "openapi/requests/types.gen"; -import { StateBadge } from "src/components/StateBadge"; import { Stat } from "src/components/Stat"; +import { StateBadge } from "src/components/StateBadge"; type Props = { readonly dagId: string; @@ -56,6 +55,7 @@ export const TaskInstanceSummary = ({ dagId, runId }: Props) => { data.task_instances.forEach((ti) => { const state = ti.state ?? "no_status"; + stateCounts[state] = (stateCounts[state] ?? 0) + 1; }); From 804ed04e4f9c250640808dd8db5ace9b80c67ed2 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Fri, 9 Jan 2026 23:58:16 +0530 Subject: [PATCH 07/13] Add task instance summary stats to UI DAGs endpoint - Add SQL query to compute task instance counts by state for the latest DAG run (using window function) - Add task_instance_summary field to DAGWithLatestDagRunsResponse - Simplify TaskInstanceSummary component to use pre-computed data - Update DagCard to pass task_instance_summary prop - Update tests to reflect new API structure This addresses reviewer feedback to aggregate task instance stats in SQL rather than fetching individual task instances. --- .../core_api/datamodels/ui/dags.py | 1 + .../api_fastapi/core_api/routes/ui/dags.py | 40 +++++++++++++++++++ .../ui/src/pages/DagsList/DagCard.test.tsx | 26 +++--------- .../airflow/ui/src/pages/DagsList/DagCard.tsx | 2 +- .../pages/DagsList/TaskInstanceSummary.tsx | 35 +++------------- 5 files changed, 53 insertions(+), 51 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py index fc2fe3ed2d2aa..93bfca4296f44 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py @@ -28,6 +28,7 @@ class DAGWithLatestDagRunsResponse(DAGResponse): asset_expression: dict | None latest_dag_runs: list[DAGRunLightResponse] + task_instance_summary: dict[str, int] | None = None pending_actions: list[HITLDetail] is_favorite: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index 93914cca5c401..c5c1a5d76d694 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -211,6 +211,45 @@ def get_dags( recent_dag_runs = session.execute(recent_dag_runs_select) + # Fetch task instance summary stats for the latest DagRun per DAG + latest_run_per_dag_subquery = ( + select( + DagRun.dag_id, + DagRun.id.label("dag_run_id"), + func.rank() + .over( + partition_by=DagRun.dag_id, + order_by=DagRun.run_after.desc(), + ) + .label("rank"), + ) + .where(DagRun.dag_id.in_([dag.dag_id for dag in dags])) + .subquery() + ) + + task_instance_stats_select = ( + select( + latest_run_per_dag_subquery.c.dag_id, + TaskInstance.state, + func.count(TaskInstance.task_id).label("count"), + ) + .join( + TaskInstance, + TaskInstance.dag_run_id == latest_run_per_dag_subquery.c.dag_run_id, + ) + .where(latest_run_per_dag_subquery.c.rank == 1) + .group_by(latest_run_per_dag_subquery.c.dag_id, TaskInstance.state) + ) + + task_instance_stats_rows = session.execute(task_instance_stats_select) + + # Normalize task instance stats by dag_id + task_instance_summary_by_dag: dict[str, dict[str, int]] = {dag.dag_id: {} for dag in dags} + + for dag_id, state, count in task_instance_stats_rows: + if state is not None: + task_instance_summary_by_dag[dag_id][state] = count + # Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags} if has_pending_actions.value: @@ -241,6 +280,7 @@ def get_dags( **DAGResponse.model_validate(dag).model_dump(), "asset_expression": dag.asset_expression, "latest_dag_runs": [], + "task_instance_summary": task_instance_summary_by_dag.get(dag.dag_id, {}), "pending_actions": pending_actions_by_dag_id[dag.dag_id], "is_favorite": dag.dag_id in favorite_dag_ids, } diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 3c543c9f51ad0..6e194af0ccf1c 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -46,21 +46,6 @@ vi.mock("src/context/timezone", async () => { }; }); -// Mock the task instances API to return sample data -vi.mock("openapi/queries", async () => { - const actual = await vi.importActual("openapi/queries"); - - return { - ...actual, - useTaskInstanceServiceGetTaskInstances: () => ({ - data: { - task_instances: [{ state: "success" }, { state: "success" }, { state: "failed" }], - }, - isLoading: false, - }), - }; -}); - // Custom wrapper that uses GMT timezone const GMTWrapper = ({ children }: PropsWithChildren) => ( @@ -122,6 +107,7 @@ const mockDag = { pending_actions: [], relative_fileloc: "nested_task_groups.py", tags: [], + task_instance_summary: { success: 2, failed: 1 }, timetable_description: "Every minute", timetable_summary: "* * * * *", } satisfies DAGWithLatestDagRunsResponse; @@ -249,20 +235,20 @@ describe("DagCard", () => { expect(stateBadges[0]).toHaveAttribute("aria-label", "failed"); }); - it("DagCard should render TaskInstanceSummary when DAG has a latest run", () => { + it("DagCard should render TaskInstanceSummary when DAG has task_instance_summary", () => { render(, { wrapper: GMTWrapper }); const taskInstanceSummary = screen.getByTestId("task-instance-summary"); expect(taskInstanceSummary).toBeInTheDocument(); }); - it("DagCard should not render TaskInstanceSummary when DAG has no latest run", () => { - const mockDagWithNoRuns = { + it("DagCard should not render TaskInstanceSummary when DAG has no task_instance_summary", () => { + const mockDagWithNoSummary = { ...mockDag, - latest_dag_runs: [], + task_instance_summary: null, } satisfies DAGWithLatestDagRunsResponse; - render(, { wrapper: GMTWrapper }); + render(, { wrapper: GMTWrapper }); const taskInstanceSummary = screen.queryByTestId("task-instance-summary"); expect(taskInstanceSummary).toBeNull(); diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index b7a9ca395988c..e2ba903ab4106 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -117,7 +117,7 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - {latestRun ? : undefined} + diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx index be717a9670ae5..f3f3d1966241e 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -16,50 +16,25 @@ * specific language governing permissions and limitations * under the License. */ -import { HStack, Spinner } from "@chakra-ui/react"; +import { HStack } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; -import { useTaskInstanceServiceGetTaskInstances } from "openapi/queries"; import type { TaskInstanceState } from "openapi/requests/types.gen"; import { Stat } from "src/components/Stat"; import { StateBadge } from "src/components/StateBadge"; type Props = { - readonly dagId: string; - readonly runId: string; + readonly taskInstanceSummary: Record | null | undefined; }; -export const TaskInstanceSummary = ({ dagId, runId }: Props) => { +export const TaskInstanceSummary = ({ taskInstanceSummary }: Props) => { const { t: translate } = useTranslation("common"); - const { data, isLoading } = useTaskInstanceServiceGetTaskInstances({ - dagId, - dagRunId: runId, - limit: 1000, - }); - - if (isLoading) { - return ( - - - - ); - } - - if (!data) { + if (!taskInstanceSummary) { return null; } - // Count task instances by state - const stateCounts: Record = {}; - - data.task_instances.forEach((ti) => { - const state = ti.state ?? "no_status"; - - stateCounts[state] = (stateCounts[state] ?? 0) + 1; - }); - - const stateEntries = Object.entries(stateCounts); + const stateEntries = Object.entries(taskInstanceSummary); if (stateEntries.length === 0) { return null; From 1d456b7bfdaf09812f6187eda4e3f1e26eff6b0f Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Sat, 10 Jan 2026 02:48:21 +0530 Subject: [PATCH 08/13] Add backend tests for task_instance_summary aggregation Tests cover: - Aggregated task instance counts by state - Only latest DAG run is included in summary - Empty summary when no task instances exist --- .../core_api/routes/ui/test_dags.py | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 801f8754ed13c..4486766c25baf 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -384,3 +384,178 @@ def test_is_favorite_field_user_specific(self, test_client, session): # Verify that DAG1 is not marked as favorite for the test user dag1_data = next(dag for dag in body["dags"] if dag["dag_id"] == DAG1_ID) assert dag1_data["is_favorite"] is False + + def test_task_instance_summary_returns_aggregated_counts( + self, test_client, create_task_instance, session + ): + """Test that task_instance_summary returns aggregated task instance counts by state.""" + dag_id = "test_dag_ti_summary" + + # Create a DAG + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.flush() + + # Create a DAG run + start_date = pendulum.datetime(2024, 1, 1, 0, 0, 0, tz="UTC") + dag_run = DagRun( + dag_id=dag_id, + run_id="test_run_1", + run_type=DagRunType.MANUAL, + start_date=start_date, + logical_date=start_date, + run_after=start_date, + state=DagRunState.RUNNING, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(dag_run) + session.flush() + + # Create task instances with different states + states = [ + TaskInstanceState.SUCCESS, + TaskInstanceState.SUCCESS, + TaskInstanceState.SUCCESS, + TaskInstanceState.FAILED, + TaskInstanceState.FAILED, + TaskInstanceState.RUNNING, + ] + for i, state in enumerate(states): + ti = create_task_instance( + dag_id=dag_id, + run_id="test_run_1", + task_id=f"task_{i}", + session=session, + state=state, + ) + session.add(ti) + + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify task_instance_summary is present and correct + assert body["total_entries"] == 1 + dag_data = body["dags"][0] + assert "task_instance_summary" in dag_data + + summary = dag_data["task_instance_summary"] + assert summary.get("success") == 3 + assert summary.get("failed") == 2 + assert summary.get("running") == 1 + + def test_task_instance_summary_only_includes_latest_run( + self, test_client, create_task_instance, session + ): + """Test that task_instance_summary only includes task instances from the latest DAG run.""" + dag_id = "test_dag_ti_summary_latest" + + # Create a DAG + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.flush() + + # Create an older DAG run + older_date = pendulum.datetime(2024, 1, 1, 0, 0, 0, tz="UTC") + older_run = DagRun( + dag_id=dag_id, + run_id="older_run", + run_type=DagRunType.MANUAL, + start_date=older_date, + logical_date=older_date, + run_after=older_date, + state=DagRunState.SUCCESS, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(older_run) + session.flush() + + # Create task instances for older run (all failed) + for i in range(3): + ti = create_task_instance( + dag_id=dag_id, + run_id="older_run", + task_id=f"old_task_{i}", + session=session, + state=TaskInstanceState.FAILED, + ) + session.add(ti) + + # Create a newer DAG run + newer_date = pendulum.datetime(2024, 2, 1, 0, 0, 0, tz="UTC") + newer_run = DagRun( + dag_id=dag_id, + run_id="newer_run", + run_type=DagRunType.MANUAL, + start_date=newer_date, + logical_date=newer_date, + run_after=newer_date, + state=DagRunState.RUNNING, + triggered_by=DagRunTriggeredByType.TEST, + ) + session.add(newer_run) + session.flush() + + # Create task instances for newer run (all success) + for i in range(2): + ti = create_task_instance( + dag_id=dag_id, + run_id="newer_run", + task_id=f"new_task_{i}", + session=session, + state=TaskInstanceState.SUCCESS, + ) + session.add(ti) + + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify task_instance_summary only reflects the latest run + dag_data = body["dags"][0] + summary = dag_data["task_instance_summary"] + + # Should only have success states from the newer run + assert summary.get("success") == 2 + # Should NOT include failed states from older run + assert summary.get("failed") is None + + def test_task_instance_summary_empty_when_no_task_instances(self, test_client, session): + """Test that task_instance_summary is empty when DAG has no task instances.""" + dag_id = "test_dag_no_ti" + + # Create a DAG without any task instances + dag_model = DagModel( + dag_id=dag_id, + bundle_name="dag_maker", + fileloc=f"/tmp/{dag_id}.py", + is_stale=False, + ) + session.add(dag_model) + session.commit() + + # Make request + response = test_client.get("/dags", params={"dag_ids": [dag_id]}) + assert response.status_code == 200 + body = response.json() + + # Verify task_instance_summary is empty + dag_data = body["dags"][0] + assert dag_data["task_instance_summary"] == {} + From 332fb1b08197f687d6e9201668a5f07084f4f18d Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Sun, 18 Jan 2026 01:10:29 +0530 Subject: [PATCH 09/13] Update OpenAPI and react-query codegens for task_instance_summary --- .../api_fastapi/core_api/openapi/_private_ui.yaml | 7 +++++++ .../airflow/ui/openapi-gen/requests/schemas.gen.ts | 14 ++++++++++++++ .../airflow/ui/openapi-gen/requests/types.gen.ts | 3 +++ 3 files changed, 24 insertions(+) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index dbbbf43758cae..757fea73782b6 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1707,6 +1707,13 @@ components: $ref: '#/components/schemas/DAGRunLightResponse' type: array title: Latest Dag Runs + task_instance_summary: + anyOf: + - additionalProperties: + type: integer + type: object + - type: 'null' + title: Task Instance Summary pending_actions: items: $ref: '#/components/schemas/HITLDetail' diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index de34ac2f9be2a..0473d53b7c3c8 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -7654,6 +7654,20 @@ export const $DAGWithLatestDagRunsResponse = { type: 'array', title: 'Latest Dag Runs' }, + task_instance_summary: { + anyOf: [ + { + additionalProperties: { + type: 'integer' + }, + type: 'object' + }, + { + type: 'null' + } + ], + title: 'Task Instance Summary' + }, pending_actions: { items: { '$ref': '#/components/schemas/HITLDetail' diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index ef22addbc4f45..d2bee0ea1653e 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1887,6 +1887,9 @@ export type DAGWithLatestDagRunsResponse = { [key: string]: unknown; } | null; latest_dag_runs: Array; + task_instance_summary?: { + [key: string]: (number); +} | null; pending_actions: Array; is_favorite: boolean; /** From 0d65739e59591680e296f7fb5f3a4b375d8b396b Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Sun, 18 Jan 2026 01:21:20 +0530 Subject: [PATCH 10/13] Fix task_instance_summary query join and improve backend tests - Fix query to use composite key (dag_id, run_id) instead of non-existent dag_run_id - Update tests to use raw SQL inserts for TaskInstance creation - Remove unused TYPE_CHECKING import --- .../api_fastapi/core_api/routes/ui/dags.py | 7 ++- .../core_api/routes/ui/test_dags.py | 61 ++++++++++--------- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index c5c1a5d76d694..bb87907aced6a 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -215,7 +215,7 @@ def get_dags( latest_run_per_dag_subquery = ( select( DagRun.dag_id, - DagRun.id.label("dag_run_id"), + DagRun.run_id, func.rank() .over( partition_by=DagRun.dag_id, @@ -235,7 +235,10 @@ def get_dags( ) .join( TaskInstance, - TaskInstance.dag_run_id == latest_run_per_dag_subquery.c.dag_run_id, + and_( + TaskInstance.dag_id == latest_run_per_dag_subquery.c.dag_id, + TaskInstance.run_id == latest_run_per_dag_subquery.c.run_id, + ), ) .where(latest_run_per_dag_subquery.c.rank == 1) .group_by(latest_run_per_dag_subquery.c.dag_id, TaskInstance.state) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 4486766c25baf..5f84f0edfc02f 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -16,7 +16,6 @@ # under the License. from __future__ import annotations -from typing import TYPE_CHECKING from unittest import mock import pendulum @@ -28,11 +27,14 @@ from airflow.models.dag import DagModel, DagTag from airflow.models.dag_favorite import DagFavorite from airflow.models.hitl import HITLDetail +from airflow.models.taskinstance import TaskInstance as TI from airflow.sdk.timezone import utcnow from airflow.utils.session import provide_session from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType +from sqlalchemy import insert + from tests_common.test_utils.asserts import count_queries from unit.api_fastapi.core_api.routes.public.test_dags import ( DAG1_ID, @@ -43,9 +45,6 @@ TestDagEndpoint as TestPublicDagEndpoint, ) -if TYPE_CHECKING: - from tests_common.pytest_plugin import TaskInstance - pytestmark = pytest.mark.db_test @@ -386,7 +385,7 @@ def test_is_favorite_field_user_specific(self, test_client, session): assert dag1_data["is_favorite"] is False def test_task_instance_summary_returns_aggregated_counts( - self, test_client, create_task_instance, session + self, test_client, session ): """Test that task_instance_summary returns aggregated task instance counts by state.""" dag_id = "test_dag_ti_summary" @@ -416,7 +415,7 @@ def test_task_instance_summary_returns_aggregated_counts( session.add(dag_run) session.flush() - # Create task instances with different states + # Create task instances with different states using raw insert states = [ TaskInstanceState.SUCCESS, TaskInstanceState.SUCCESS, @@ -426,14 +425,16 @@ def test_task_instance_summary_returns_aggregated_counts( TaskInstanceState.RUNNING, ] for i, state in enumerate(states): - ti = create_task_instance( - dag_id=dag_id, - run_id="test_run_1", - task_id=f"task_{i}", - session=session, - state=state, + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="test_run_1", + task_id=f"task_{i}", + state=state, + map_index=-1, + pool="default_pool", + ) ) - session.add(ti) session.commit() @@ -453,7 +454,7 @@ def test_task_instance_summary_returns_aggregated_counts( assert summary.get("running") == 1 def test_task_instance_summary_only_includes_latest_run( - self, test_client, create_task_instance, session + self, test_client, session ): """Test that task_instance_summary only includes task instances from the latest DAG run.""" dag_id = "test_dag_ti_summary_latest" @@ -485,14 +486,16 @@ def test_task_instance_summary_only_includes_latest_run( # Create task instances for older run (all failed) for i in range(3): - ti = create_task_instance( - dag_id=dag_id, - run_id="older_run", - task_id=f"old_task_{i}", - session=session, - state=TaskInstanceState.FAILED, + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="older_run", + task_id=f"old_task_{i}", + state=TaskInstanceState.FAILED, + map_index=-1, + pool="default_pool", + ) ) - session.add(ti) # Create a newer DAG run newer_date = pendulum.datetime(2024, 2, 1, 0, 0, 0, tz="UTC") @@ -511,14 +514,16 @@ def test_task_instance_summary_only_includes_latest_run( # Create task instances for newer run (all success) for i in range(2): - ti = create_task_instance( - dag_id=dag_id, - run_id="newer_run", - task_id=f"new_task_{i}", - session=session, - state=TaskInstanceState.SUCCESS, + session.execute( + insert(TI).values( + dag_id=dag_id, + run_id="newer_run", + task_id=f"new_task_{i}", + state=TaskInstanceState.SUCCESS, + map_index=-1, + pool="default_pool", + ) ) - session.add(ti) session.commit() From a4b8be6d569abd1a2b281156de5e28a6eb6e6c1b Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Tue, 20 Jan 2026 10:28:15 +0530 Subject: [PATCH 11/13] refactor: rename task_instance_summary to latest_run_stats with LatestRunStats model - Add new LatestRunStats Pydantic model with task_instance_counts field - Rename field from task_instance_summary to latest_run_stats - Update backend response construction to use LatestRunStats model - Update OpenAPI spec with new schema - Update TypeScript types and schemas - Update UI components (DagCard, TaskInstanceSummary) - Update backend and frontend tests Addresses reviewer feedback from @jason810496 --- .../core_api/datamodels/ui/dags.py | 8 +++++- .../core_api/openapi/_private_ui.yaml | 19 ++++++++++---- .../api_fastapi/core_api/routes/ui/dags.py | 7 ++--- .../ui/openapi-gen/requests/schemas.gen.ts | 26 ++++++++++++++----- .../ui/openapi-gen/requests/types.gen.ts | 13 +++++++--- .../ui/src/pages/DagsList/DagCard.test.tsx | 8 +++--- .../airflow/ui/src/pages/DagsList/DagCard.tsx | 2 +- .../pages/DagsList/TaskInstanceSummary.tsx | 10 +++---- .../core_api/routes/ui/test_dags.py | 26 +++++++++---------- 9 files changed, 77 insertions(+), 42 deletions(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py index 93bfca4296f44..3c2d80cb05ff2 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py @@ -23,12 +23,18 @@ from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse +class LatestRunStats(BaseModel): + """Stats for the latest DAG run.""" + + task_instance_counts: dict[str, int] + + class DAGWithLatestDagRunsResponse(DAGResponse): """DAG with latest dag runs response serializer.""" asset_expression: dict | None latest_dag_runs: list[DAGRunLightResponse] - task_instance_summary: dict[str, int] | None = None + latest_run_stats: LatestRunStats | None = None pending_actions: list[HITLDetail] is_favorite: bool diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml index 757fea73782b6..81e11c7ef8adc 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/_private_ui.yaml @@ -1707,13 +1707,10 @@ components: $ref: '#/components/schemas/DAGRunLightResponse' type: array title: Latest Dag Runs - task_instance_summary: + latest_run_stats: anyOf: - - additionalProperties: - type: integer - type: object + - $ref: '#/components/schemas/LatestRunStats' - type: 'null' - title: Task Instance Summary pending_actions: items: $ref: '#/components/schemas/HITLDetail' @@ -2220,6 +2217,18 @@ components: - unixname title: JobResponse description: Job serializer for responses. + LatestRunStats: + properties: + task_instance_counts: + additionalProperties: + type: integer + type: object + title: Task Instance Counts + type: object + required: + - task_instance_counts + title: LatestRunStats + description: Stats for the latest DAG run. LightGridTaskInstanceSummary: properties: task_id: diff --git a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py index bb87907aced6a..a693191340dc9 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py @@ -57,6 +57,7 @@ from airflow.api_fastapi.core_api.datamodels.ui.dags import ( DAGWithLatestDagRunsCollectionResponse, DAGWithLatestDagRunsResponse, + LatestRunStats, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc from airflow.api_fastapi.core_api.security import ( @@ -247,11 +248,11 @@ def get_dags( task_instance_stats_rows = session.execute(task_instance_stats_select) # Normalize task instance stats by dag_id - task_instance_summary_by_dag: dict[str, dict[str, int]] = {dag.dag_id: {} for dag in dags} + latest_run_stats_by_dag: dict[str, dict[str, int]] = {dag.dag_id: {} for dag in dags} for dag_id, state, count in task_instance_stats_rows: if state is not None: - task_instance_summary_by_dag[dag_id][state] = count + latest_run_stats_by_dag[dag_id][state] = count # Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags} @@ -283,7 +284,7 @@ def get_dags( **DAGResponse.model_validate(dag).model_dump(), "asset_expression": dag.asset_expression, "latest_dag_runs": [], - "task_instance_summary": task_instance_summary_by_dag.get(dag.dag_id, {}), + "latest_run_stats": LatestRunStats(task_instance_counts=latest_run_stats_by_dag.get(dag.dag_id, {})), "pending_actions": pending_actions_by_dag_id[dag.dag_id], "is_favorite": dag.dag_id in favorite_dag_ids, } diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index 0473d53b7c3c8..8b1f665d6a352 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -7654,19 +7654,15 @@ export const $DAGWithLatestDagRunsResponse = { type: 'array', title: 'Latest Dag Runs' }, - task_instance_summary: { + latest_run_stats: { anyOf: [ { - additionalProperties: { - type: 'integer' - }, - type: 'object' + '$ref': '#/components/schemas/LatestRunStats' }, { type: 'null' } - ], - title: 'Task Instance Summary' + ] }, pending_actions: { items: { @@ -7955,6 +7951,22 @@ export const $HistoricalMetricDataResponse = { description: 'Historical Metric Data serializer for responses.' } as const; +export const $LatestRunStats = { + properties: { + task_instance_counts: { + additionalProperties: { + type: 'integer' + }, + type: 'object', + title: 'Task Instance Counts' + } + }, + type: 'object', + required: ['task_instance_counts'], + title: 'LatestRunStats', + description: 'Stats for the latest DAG run.' +} as const; + export const $LightGridTaskInstanceSummary = { properties: { task_id: { diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index d2bee0ea1653e..21833ee2fa869 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -1887,9 +1887,7 @@ export type DAGWithLatestDagRunsResponse = { [key: string]: unknown; } | null; latest_dag_runs: Array; - task_instance_summary?: { - [key: string]: (number); -} | null; + latest_run_stats?: LatestRunStats | null; pending_actions: Array; is_favorite: boolean; /** @@ -1968,6 +1966,15 @@ export type HistoricalMetricDataResponse = { task_instance_states: TaskInstanceStateCount; }; +/** + * Stats for the latest DAG run. + */ +export type LatestRunStats = { + task_instance_counts: { + [key: string]: (number); +}; +}; + /** * Task Instance Summary model for the Grid UI. */ diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx index 6e194af0ccf1c..c7bb5ad7682e6 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx @@ -107,7 +107,7 @@ const mockDag = { pending_actions: [], relative_fileloc: "nested_task_groups.py", tags: [], - task_instance_summary: { success: 2, failed: 1 }, + latest_run_stats: { task_instance_counts: { success: 2, failed: 1 } }, timetable_description: "Every minute", timetable_summary: "* * * * *", } satisfies DAGWithLatestDagRunsResponse; @@ -235,17 +235,17 @@ describe("DagCard", () => { expect(stateBadges[0]).toHaveAttribute("aria-label", "failed"); }); - it("DagCard should render TaskInstanceSummary when DAG has task_instance_summary", () => { + it("DagCard should render TaskInstanceSummary when DAG has latest_run_stats", () => { render(, { wrapper: GMTWrapper }); const taskInstanceSummary = screen.getByTestId("task-instance-summary"); expect(taskInstanceSummary).toBeInTheDocument(); }); - it("DagCard should not render TaskInstanceSummary when DAG has no task_instance_summary", () => { + it("DagCard should not render TaskInstanceSummary when DAG has no latest_run_stats", () => { const mockDagWithNoSummary = { ...mockDag, - task_instance_summary: null, + latest_run_stats: null, } satisfies DAGWithLatestDagRunsResponse; render(, { wrapper: GMTWrapper }); diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx index e2ba903ab4106..dde00684d64e7 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx @@ -117,7 +117,7 @@ export const DagCard = ({ dag }: Props) => { ) : undefined} - + diff --git a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx index f3f3d1966241e..0f71086b05685 100644 --- a/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx +++ b/airflow-core/src/airflow/ui/src/pages/DagsList/TaskInstanceSummary.tsx @@ -19,22 +19,22 @@ import { HStack } from "@chakra-ui/react"; import { useTranslation } from "react-i18next"; -import type { TaskInstanceState } from "openapi/requests/types.gen"; +import type { LatestRunStats, TaskInstanceState } from "openapi/requests/types.gen"; import { Stat } from "src/components/Stat"; import { StateBadge } from "src/components/StateBadge"; type Props = { - readonly taskInstanceSummary: Record | null | undefined; + readonly latestRunStats: LatestRunStats | null | undefined; }; -export const TaskInstanceSummary = ({ taskInstanceSummary }: Props) => { +export const TaskInstanceSummary = ({ latestRunStats }: Props) => { const { t: translate } = useTranslation("common"); - if (!taskInstanceSummary) { + if (!latestRunStats) { return null; } - const stateEntries = Object.entries(taskInstanceSummary); + const stateEntries = Object.entries(latestRunStats.task_instance_counts); if (stateEntries.length === 0) { return null; diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 5f84f0edfc02f..014f9838c7fa1 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -384,10 +384,10 @@ def test_is_favorite_field_user_specific(self, test_client, session): dag1_data = next(dag for dag in body["dags"] if dag["dag_id"] == DAG1_ID) assert dag1_data["is_favorite"] is False - def test_task_instance_summary_returns_aggregated_counts( + def test_latest_run_stats_returns_aggregated_counts( self, test_client, session ): - """Test that task_instance_summary returns aggregated task instance counts by state.""" + """Test that latest_run_stats returns aggregated task instance counts by state.""" dag_id = "test_dag_ti_summary" # Create a DAG @@ -443,20 +443,20 @@ def test_task_instance_summary_returns_aggregated_counts( assert response.status_code == 200 body = response.json() - # Verify task_instance_summary is present and correct + # Verify latest_run_stats is present and correct assert body["total_entries"] == 1 dag_data = body["dags"][0] - assert "task_instance_summary" in dag_data + assert "latest_run_stats" in dag_data - summary = dag_data["task_instance_summary"] + summary = dag_data["latest_run_stats"]["task_instance_counts"] assert summary.get("success") == 3 assert summary.get("failed") == 2 assert summary.get("running") == 1 - def test_task_instance_summary_only_includes_latest_run( + def test_latest_run_stats_only_includes_latest_run( self, test_client, session ): - """Test that task_instance_summary only includes task instances from the latest DAG run.""" + """Test that latest_run_stats only includes task instances from the latest DAG run.""" dag_id = "test_dag_ti_summary_latest" # Create a DAG @@ -532,17 +532,17 @@ def test_task_instance_summary_only_includes_latest_run( assert response.status_code == 200 body = response.json() - # Verify task_instance_summary only reflects the latest run + # Verify latest_run_stats only reflects the latest run dag_data = body["dags"][0] - summary = dag_data["task_instance_summary"] + summary = dag_data["latest_run_stats"]["task_instance_counts"] # Should only have success states from the newer run assert summary.get("success") == 2 # Should NOT include failed states from older run assert summary.get("failed") is None - def test_task_instance_summary_empty_when_no_task_instances(self, test_client, session): - """Test that task_instance_summary is empty when DAG has no task instances.""" + def test_latest_run_stats_empty_when_no_task_instances(self, test_client, session): + """Test that latest_run_stats is empty when DAG has no task instances.""" dag_id = "test_dag_no_ti" # Create a DAG without any task instances @@ -560,7 +560,7 @@ def test_task_instance_summary_empty_when_no_task_instances(self, test_client, s assert response.status_code == 200 body = response.json() - # Verify task_instance_summary is empty + # Verify latest_run_stats has empty task_instance_counts dag_data = body["dags"][0] - assert dag_data["task_instance_summary"] == {} + assert dag_data["latest_run_stats"]["task_instance_counts"] == {} From 066f5ce4db2fbf4535f22603260e9a15c474d64a Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Wed, 21 Jan 2026 01:11:13 +0530 Subject: [PATCH 12/13] Fix mypy type hint and restrict latest_run_stats to latest DagRun --- .../tests/unit/api_fastapi/core_api/routes/ui/test_dags.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py index 014f9838c7fa1..baf42d56f976a 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/ui/test_dags.py @@ -21,6 +21,7 @@ import pendulum import pytest from fastapi.testclient import TestClient +from sqlalchemy import insert from sqlalchemy.orm import Session from airflow.models import DagRun @@ -33,8 +34,6 @@ from airflow.utils.state import DagRunState, TaskInstanceState from airflow.utils.types import DagRunTriggeredByType, DagRunType -from sqlalchemy import insert - from tests_common.test_utils.asserts import count_queries from unit.api_fastapi.core_api.routes.public.test_dags import ( DAG1_ID, @@ -129,7 +128,7 @@ def test_should_return_200(self, test_client, query_params, expected_ids, expect previous_run_after = dag_run["run_after"] @pytest.fixture - def setup_hitl_data(self, create_task_instance: TaskInstance, session: Session): + def setup_hitl_data(self, create_task_instance: TI, session: Session): """Setup HITL test data for parametrized tests.""" # 3 Dags (test_dag0 created here and test_dag1, test_dag2 created in setup_dag_runs) # 5 task instances in test_dag0 From 9d7a43157470ade0d4653dbbb2586df491d8f816 Mon Sep 17 00:00:00 2001 From: Saad Madni Date: Wed, 21 Jan 2026 14:06:53 +0530 Subject: [PATCH 13/13] Update airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py Co-authored-by: Jason(Zhe-You) Liu <68415893+jason810496@users.noreply.github.com> --- .../src/airflow/api_fastapi/core_api/datamodels/ui/dags.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py index 3c2d80cb05ff2..fb841c55480d4 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py +++ b/airflow-core/src/airflow/api_fastapi/core_api/datamodels/ui/dags.py @@ -34,7 +34,7 @@ class DAGWithLatestDagRunsResponse(DAGResponse): asset_expression: dict | None latest_dag_runs: list[DAGRunLightResponse] - latest_run_stats: LatestRunStats | None = None + latest_run_stats: dict[TaskInstanceState, int] | None = None pending_actions: list[HITLDetail] is_favorite: bool