Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@
from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse


class LatestRunStats(BaseModel):
"""Stats for the latest DAG run."""

task_instance_counts: dict[str, int]


class DAGWithLatestDagRunsResponse(DAGResponse):
"""DAG with latest dag runs response serializer."""

asset_expression: dict | None
latest_dag_runs: list[DAGRunLightResponse]
latest_run_stats: dict[TaskInstanceState, int] | None = None
pending_actions: list[HITLDetail]
is_favorite: bool

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1707,6 +1707,10 @@ components:
$ref: '#/components/schemas/DAGRunLightResponse'
type: array
title: Latest Dag Runs
latest_run_stats:
anyOf:
- $ref: '#/components/schemas/LatestRunStats'
- type: 'null'
pending_actions:
items:
$ref: '#/components/schemas/HITLDetail'
Expand Down Expand Up @@ -2213,6 +2217,18 @@ components:
- unixname
title: JobResponse
description: Job serializer for responses.
LatestRunStats:
properties:
task_instance_counts:
additionalProperties:
type: integer
type: object
title: Task Instance Counts
type: object
required:
- task_instance_counts
title: LatestRunStats
description: Stats for the latest DAG run.
LightGridTaskInstanceSummary:
properties:
task_id:
Expand Down
44 changes: 44 additions & 0 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
from airflow.api_fastapi.core_api.datamodels.ui.dags import (
DAGWithLatestDagRunsCollectionResponse,
DAGWithLatestDagRunsResponse,
LatestRunStats,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.api_fastapi.core_api.security import (
Expand Down Expand Up @@ -211,6 +212,48 @@ def get_dags(

recent_dag_runs = session.execute(recent_dag_runs_select)

# Fetch task instance summary stats for the latest DagRun per DAG
latest_run_per_dag_subquery = (
select(
DagRun.dag_id,
DagRun.run_id,
func.rank()
.over(
partition_by=DagRun.dag_id,
order_by=DagRun.run_after.desc(),
)
.label("rank"),
)
.where(DagRun.dag_id.in_([dag.dag_id for dag in dags]))
.subquery()
)

task_instance_stats_select = (
select(
latest_run_per_dag_subquery.c.dag_id,
TaskInstance.state,
func.count(TaskInstance.task_id).label("count"),
)
.join(
TaskInstance,
and_(
TaskInstance.dag_id == latest_run_per_dag_subquery.c.dag_id,
TaskInstance.run_id == latest_run_per_dag_subquery.c.run_id,
),
)
.where(latest_run_per_dag_subquery.c.rank == 1)
.group_by(latest_run_per_dag_subquery.c.dag_id, TaskInstance.state)
)

task_instance_stats_rows = session.execute(task_instance_stats_select)

# Normalize task instance stats by dag_id
latest_run_stats_by_dag: dict[str, dict[str, int]] = {dag.dag_id: {} for dag in dags}

for dag_id, state, count in task_instance_stats_rows:
if state is not None:
latest_run_stats_by_dag[dag_id][state] = count

# Fetch pending HITL actions for each Dag if we are not certain whether some of the Dag might contain HITL actions
pending_actions_by_dag_id: dict[str, list[HITLDetail]] = {dag.dag_id: [] for dag in dags}
if has_pending_actions.value:
Expand Down Expand Up @@ -241,6 +284,7 @@ def get_dags(
**DAGResponse.model_validate(dag).model_dump(),
"asset_expression": dag.asset_expression,
"latest_dag_runs": [],
"latest_run_stats": LatestRunStats(task_instance_counts=latest_run_stats_by_dag.get(dag.dag_id, {})),
"pending_actions": pending_actions_by_dag_id[dag.dag_id],
"is_favorite": dag.dag_id in favorite_dag_ids,
}
Expand Down
26 changes: 26 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7654,6 +7654,16 @@ export const $DAGWithLatestDagRunsResponse = {
type: 'array',
title: 'Latest Dag Runs'
},
latest_run_stats: {
anyOf: [
{
'$ref': '#/components/schemas/LatestRunStats'
},
{
type: 'null'
}
]
},
pending_actions: {
items: {
'$ref': '#/components/schemas/HITLDetail'
Expand Down Expand Up @@ -7941,6 +7951,22 @@ export const $HistoricalMetricDataResponse = {
description: 'Historical Metric Data serializer for responses.'
} as const;

export const $LatestRunStats = {
properties: {
task_instance_counts: {
additionalProperties: {
type: 'integer'
},
type: 'object',
title: 'Task Instance Counts'
}
},
type: 'object',
required: ['task_instance_counts'],
title: 'LatestRunStats',
description: 'Stats for the latest DAG run.'
} as const;

export const $LightGridTaskInstanceSummary = {
properties: {
task_id: {
Expand Down
10 changes: 10 additions & 0 deletions airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1887,6 +1887,7 @@ export type DAGWithLatestDagRunsResponse = {
[key: string]: unknown;
} | null;
latest_dag_runs: Array<DAGRunLightResponse>;
latest_run_stats?: LatestRunStats | null;
pending_actions: Array<HITLDetail>;
is_favorite: boolean;
/**
Expand Down Expand Up @@ -1965,6 +1966,15 @@ export type HistoricalMetricDataResponse = {
task_instance_states: TaskInstanceStateCount;
};

/**
* Stats for the latest DAG run.
*/
export type LatestRunStats = {
task_instance_counts: {
[key: string]: (number);
};
};

/**
* Task Instance Summary model for the Grid UI.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@
},
"taskInstance_one": "Task Instance",
"taskInstance_other": "Task Instances",
"taskInstanceSummary": "Task Instance Summary",
"timeRange": {
"last12Hours": "Last 12 Hours",
"last24Hours": "Last 24 Hours",
Expand Down
43 changes: 28 additions & 15 deletions airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ const mockDag = {
pending_actions: [],
relative_fileloc: "nested_task_groups.py",
tags: [],
latest_run_stats: { task_instance_counts: { success: 2, failed: 1 } },
timetable_description: "Every minute",
timetable_summary: "* * * * *",
} satisfies DAGWithLatestDagRunsResponse;
Expand Down Expand Up @@ -198,22 +199,14 @@ describe("DagCard", () => {
expect(latestRunElement).toHaveTextContent("2025-09-19 19:22:00");
});

it("DagCard should render next run section with timestamp", () => {
render(<DagCard dag={mockDag} />, { wrapper: GMTWrapper });
const nextRunElement = screen.getByTestId("next-run");

expect(nextRunElement).toBeInTheDocument();
// Should display the formatted next run timestamp (converted to GMT timezone)
expect(nextRunElement).toHaveTextContent("2024-08-22 19:00:00");
});

it("DagCard should render StateBadge as success", () => {
render(<DagCard dag={mockDag} />, { wrapper: GMTWrapper });
const stateBadge = screen.getByTestId("state-badge");
const stateBadges = screen.getAllByTestId("state-badge");

expect(stateBadge).toBeInTheDocument();
// First badge is from DagRunInfo (latest run state)
expect(stateBadges[0]).toBeInTheDocument();
// Should have the success state from mockDag.latest_dag_runs[0].state
expect(stateBadge).toHaveAttribute("aria-label", "success");
expect(stateBadges[0]).toHaveAttribute("aria-label", "success");
});

it("DagCard should render StateBadge as failed", () => {
Expand All @@ -234,10 +227,30 @@ describe("DagCard", () => {
} satisfies DAGWithLatestDagRunsResponse;

render(<DagCard dag={mockDagWithFailedRun} />, { wrapper: GMTWrapper });
const stateBadge = screen.getByTestId("state-badge");
const stateBadges = screen.getAllByTestId("state-badge");

expect(stateBadge).toBeInTheDocument();
// First badge is from DagRunInfo (latest run state)
expect(stateBadges[0]).toBeInTheDocument();
// Should have the failed state
expect(stateBadge).toHaveAttribute("aria-label", "failed");
expect(stateBadges[0]).toHaveAttribute("aria-label", "failed");
});

it("DagCard should render TaskInstanceSummary when DAG has latest_run_stats", () => {
render(<DagCard dag={mockDag} />, { wrapper: GMTWrapper });
const taskInstanceSummary = screen.getByTestId("task-instance-summary");

expect(taskInstanceSummary).toBeInTheDocument();
});

it("DagCard should not render TaskInstanceSummary when DAG has no latest_run_stats", () => {
const mockDagWithNoSummary = {
...mockDag,
latest_run_stats: null,
} satisfies DAGWithLatestDagRunsResponse;

render(<DagCard dag={mockDagWithNoSummary} />, { wrapper: GMTWrapper });
const taskInstanceSummary = screen.queryByTestId("task-instance-summary");

expect(taskInstanceSummary).toBeNull();
});
});
23 changes: 15 additions & 8 deletions airflow-core/src/airflow/ui/src/pages/DagsList/DagCard.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import { isStatePending, useAutoRefresh } from "src/utils";
import { DagTags } from "./DagTags";
import { RecentRuns } from "./RecentRuns";
import { Schedule } from "./Schedule";
import { TaskInstanceSummary } from "./TaskInstanceSummary";

type Props = {
readonly dag: DAGWithLatestDagRunsResponse;
Expand Down Expand Up @@ -76,6 +77,7 @@ export const DagCard = ({ dag }: Props) => {
<DeleteDagButton dagDisplayName={dag.dag_display_name} dagId={dag.dag_id} withText={false} />
</HStack>
</Flex>

<SimpleGrid columns={4} gap={1} height={20} px={3} py={1}>
<Stat data-testid="schedule" label={translate("dagDetails.schedule")}>
<Schedule
Expand All @@ -86,6 +88,16 @@ export const DagCard = ({ dag }: Props) => {
timetableSummary={dag.timetable_summary}
/>
</Stat>

<Stat data-testid="next-run" label={translate("dagDetails.nextRun")}>
{Boolean(dag.next_dagrun_run_after) ? (
<DagRunInfo
logicalDate={dag.next_dagrun_logical_date}
runAfter={dag.next_dagrun_run_after as string}
/>
) : undefined}
</Stat>

<Stat data-testid="latest-run" label={translate("dagDetails.latestRun")}>
{latestRun ? (
<Link asChild color="fg.info">
Expand All @@ -104,14 +116,9 @@ export const DagCard = ({ dag }: Props) => {
</Link>
) : undefined}
</Stat>
<Stat data-testid="next-run" label={translate("dagDetails.nextRun")}>
{Boolean(dag.next_dagrun_run_after) ? (
<DagRunInfo
logicalDate={dag.next_dagrun_logical_date}
runAfter={dag.next_dagrun_run_after as string}
/>
) : undefined}
</Stat>

<TaskInstanceSummary latestRunStats={dag.latest_run_stats} />

<RecentRuns latestRuns={dag.latest_dag_runs} />
</SimpleGrid>
</Box>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*!
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import { HStack } from "@chakra-ui/react";
import { useTranslation } from "react-i18next";

import type { LatestRunStats, TaskInstanceState } from "openapi/requests/types.gen";
import { Stat } from "src/components/Stat";
import { StateBadge } from "src/components/StateBadge";

type Props = {
readonly latestRunStats: LatestRunStats | null | undefined;
};

export const TaskInstanceSummary = ({ latestRunStats }: Props) => {
const { t: translate } = useTranslation("common");

if (!latestRunStats) {
return null;
}

const stateEntries = Object.entries(latestRunStats.task_instance_counts);

if (stateEntries.length === 0) {
return null;
}

return (
<Stat data-testid="task-instance-summary" label={translate("taskInstanceSummary")}>
<HStack flexWrap="wrap" gap={1}>
{stateEntries.map(([state, count]) => (
<StateBadge key={state} state={state as TaskInstanceState}>
{count}
</StateBadge>
))}
</HStack>
</Stat>
);
};
Loading
Loading