Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11085,6 +11085,7 @@ components:
enum:
- asset conflict
- non-existent pool
- runtime varying value
title: DagWarningType
description: 'Enum for DAG warning types.

Expand Down
16 changes: 16 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2645,6 +2645,22 @@ dag_processor:
type: boolean
example: ~
default: "True"
dag_version_inflation_check_level:
description: |
Controls the behavior of Dag stability checker performed before Dag parsing in the Dag processor.
The check detects detect potential issues such as runtime-varying values in Dag/Task constructors
that could cause Dag version inflation.

* ``off``: Disables Dag stability checks entirely. No errors or warnings are generated.
* ``warning``: Dags load normally but warnings are displayed in the UI when issues are detected.
* ``error``: Treats Dag stability failures as Dag import errors, preventing the Dag from loading.

Default is "warning" to alert users of potential issues without blocking Dag execution.
version_added: 3.2.0
type: string
example: ~
default: "warning"

profiling:
description: |
Configuration for memory profiling in Airflow component.
Expand Down
10 changes: 8 additions & 2 deletions airflow-core/src/airflow/dag_processing/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session):


def _update_dag_warnings(
dag_ids: list[str], warnings: set[DagWarning], warning_types: tuple[DagWarningType], session: Session
dag_ids: list[str],
warnings: set[DagWarning],
warning_types: tuple[DagWarningType, ...],
session: Session,
):
from airflow.models.dagwarning import DagWarning

Expand Down Expand Up @@ -371,7 +374,10 @@ def update_dag_parsing_results_in_db(
warnings: set[DagWarning],
session: Session,
*,
warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,),
warning_types: tuple[DagWarningType, ...] = (
DagWarningType.NONEXISTENT_POOL,
DagWarningType.RUNTIME_VARYING_VALUE,
),
files_parsed: set[tuple[str, str]] | None = None,
):
"""
Expand Down
5 changes: 4 additions & 1 deletion airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -1211,13 +1211,16 @@ def process_parse_results(
files_parsed = {(bundle_name, relative_fileloc)}
files_parsed.update(import_errors.keys())

if (warnings := parsing_result.warnings) and isinstance(warnings[0], dict):
warnings = [DagWarning(**warn) for warn in warnings]

update_dag_parsing_results_in_db(
bundle_name=bundle_name,
bundle_version=bundle_version,
dags=parsing_result.serialized_dags,
import_errors=import_errors,
parse_duration=run_duration,
warnings=set(parsing_result.warnings or []),
warnings=set(warnings or []),
session=session,
files_parsed=files_parsed,
)
Expand Down
18 changes: 15 additions & 3 deletions airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
from airflow.sdk.execution_time.supervisor import WatchedSubprocess
from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification
from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG
from airflow.utils.dag_version_inflation_checker import check_dag_file_stability
from airflow.utils.file import iter_airflow_imports
from airflow.utils.state import TaskInstanceState

Expand Down Expand Up @@ -198,21 +199,33 @@ def _parse_file_entrypoint():
log = structlog.get_logger(logger_name="task")

result = _parse_file(msg, log)

if result is not None:
comms_decoder.send(result)


def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None:
# TODO: Set known_pool names on DagBag!

stability_check_result = check_dag_file_stability(os.fspath(msg.file))

if stability_check_error_dict := stability_check_result.get_error_format_dict(msg.file, msg.bundle_path):
# If Dag stability check level is error, we shouldn't parse the Dags and return the result early
return DagFileParsingResult(
fileloc=msg.file,
serialized_dags=[],
import_errors=stability_check_error_dict,
)

bag = BundleDagBag(
dag_folder=msg.file,
bundle_path=msg.bundle_path,
bundle_name=msg.bundle_name,
load_op_links=False,
)

if msg.callback_requests:
# If the request is for callback, we shouldn't serialize the DAGs
# If the request is for callback, we shouldn't serialize the Dags
_execute_callbacks(bag, msg.callback_requests, log)
return None

Expand All @@ -222,8 +235,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP
fileloc=msg.file,
serialized_dags=serialized_dags,
import_errors=bag.import_errors,
# TODO: Make `bag.dag_warnings` not return SQLA model objects
warnings=[],
warnings=stability_check_result.get_formatted_warnings(bag.dag_ids),
)
return result

Expand Down
1 change: 1 addition & 0 deletions airflow-core/src/airflow/models/dagwarning.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,4 @@ class DagWarningType(str, Enum):

ASSET_CONFLICT = "asset conflict"
NONEXISTENT_POOL = "non-existent pool"
RUNTIME_VARYING_VALUE = "runtime varying value"
Original file line number Diff line number Diff line change
Expand Up @@ -3433,7 +3433,7 @@ export const $DagVersionResponse = {

export const $DagWarningType = {
type: 'string',
enum: ['asset conflict', 'non-existent pool'],
enum: ['asset conflict', 'non-existent pool', 'runtime varying value'],
title: 'DagWarningType',
description: `Enum for DAG warning types.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,7 @@ export type DagVersionResponse = {
* This is the set of allowable values for the ``warning_type`` field
* in the DagWarning model.
*/
export type DagWarningType = 'asset conflict' | 'non-existent pool';
export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'runtime varying value';

/**
* Backfill collection serializer for responses in dry-run mode.
Expand Down
Loading
Loading