Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import inspect
from collections.abc import Iterable, Mapping
from datetime import datetime, timedelta
from functools import cache
from typing import TYPE_CHECKING, Any

from itsdangerous import URLSafeSerializer
Expand All @@ -42,6 +43,19 @@
if TYPE_CHECKING:
from airflow.serialization.definitions.param import SerializedParamsDict


@cache
def _get_file_token_serializer() -> URLSafeSerializer:
"""
Return a cached URLSafeSerializer instance.

Uses @cache for lazy initialization - the serializer is created on first
call rather than at module import time. This avoids issues if the module
is imported before configuration is fully loaded.
"""
return URLSafeSerializer(conf.get_mandatory_value("api", "secret_key"))


DAG_ALIAS_MAPPING: dict[str, str] = {
# The keys are the names in the response, the values are the original names in the model
# This is used to map the names in the response to the names in the model
Expand Down Expand Up @@ -118,12 +132,11 @@ def get_timetable_summary(cls, tts: str | None) -> str | None:
@property
def file_token(self) -> str:
"""Return file token."""
serializer = URLSafeSerializer(conf.get_mandatory_value("api", "secret_key"))
payload = {
"bundle_name": self.bundle_name,
"relative_fileloc": self.relative_fileloc,
}
return serializer.dumps(payload)
return _get_file_token_serializer().dumps(payload)


class DAGPatchBody(StrictBaseModel):
Expand Down
19 changes: 13 additions & 6 deletions airflow-core/src/airflow/api_fastapi/core_api/routes/ui/dags.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@
filter_param_factory,
)
from airflow.api_fastapi.common.router import AirflowRouter
from airflow.api_fastapi.core_api.datamodels.dags import DAGResponse
from airflow.api_fastapi.core_api.datamodels.ui.dag_runs import DAGRunLightResponse
from airflow.api_fastapi.core_api.datamodels.dags import DAG_ALIAS_MAPPING, DAGResponse
from airflow.api_fastapi.core_api.datamodels.ui.dags import (
DAGWithLatestDagRunsCollectionResponse,
DAGWithLatestDagRunsResponse,
Expand Down Expand Up @@ -234,18 +234,25 @@ def get_dags(
pending_actions_by_dag_id[dag_id].append(hitl_detail)

# aggregate rows by dag_id
dag_runs_by_dag_id: dict[str, DAGWithLatestDagRunsResponse] = {
dag.dag_id: DAGWithLatestDagRunsResponse.model_validate(
# Build the dict dynamically from DAGResponse.model_fields so that new fields
# added to DAGResponse are picked up automatically without code changes here.
dag_runs_by_dag_id: dict[str, DAGWithLatestDagRunsResponse] = {}
for dag in dags:
dag_data = {
DAG_ALIAS_MAPPING.get(field_name, field_name): getattr(
dag, DAG_ALIAS_MAPPING.get(field_name, field_name)
)
for field_name in DAGResponse.model_fields
}
dag_data.update(
{
**DAGResponse.model_validate(dag).model_dump(),
"asset_expression": dag.asset_expression,
"latest_dag_runs": [],
"pending_actions": pending_actions_by_dag_id[dag.dag_id],
"is_favorite": dag.dag_id in favorite_dag_ids,
}
)
for dag in dags
}
dag_runs_by_dag_id[dag.dag_id] = DAGWithLatestDagRunsResponse.model_validate(dag_data)

for row in recent_dag_runs:
dag_run_response = DAGRunLightResponse.model_validate(row)
Expand Down