diff --git a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml index 1651927427f96..299cbbbf0a254 100644 --- a/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml +++ b/airflow-core/src/airflow/api_fastapi/core_api/openapi/v2-rest-api-generated.yaml @@ -11085,6 +11085,7 @@ components: enum: - asset conflict - non-existent pool + - runtime varying value title: DagWarningType description: 'Enum for DAG warning types. diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index dac144f54cbf0..c3c5863a11e3d 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2645,6 +2645,22 @@ dag_processor: type: boolean example: ~ default: "True" + dag_version_inflation_check_level: + description: | + Controls the behavior of Dag stability checker performed before Dag parsing in the Dag processor. + The check detects detect potential issues such as runtime-varying values in Dag/Task constructors + that could cause Dag version inflation. + + * ``off``: Disables Dag stability checks entirely. No errors or warnings are generated. + * ``warning``: Dags load normally but warnings are displayed in the UI when issues are detected. + * ``error``: Treats Dag stability failures as Dag import errors, preventing the Dag from loading. + + Default is "warning" to alert users of potential issues without blocking Dag execution. + version_added: 3.2.0 + type: string + example: ~ + default: "warning" + profiling: description: | Configuration for memory profiling in Airflow component. diff --git a/airflow-core/src/airflow/dag_processing/collection.py b/airflow-core/src/airflow/dag_processing/collection.py index fd09df8fd36ec..72404b58c7c7b 100644 --- a/airflow-core/src/airflow/dag_processing/collection.py +++ b/airflow-core/src/airflow/dag_processing/collection.py @@ -257,7 +257,10 @@ def _sync_dag_perms(dag: LazyDeserializedDAG, session: Session): def _update_dag_warnings( - dag_ids: list[str], warnings: set[DagWarning], warning_types: tuple[DagWarningType], session: Session + dag_ids: list[str], + warnings: set[DagWarning], + warning_types: tuple[DagWarningType, ...], + session: Session, ): from airflow.models.dagwarning import DagWarning @@ -371,7 +374,10 @@ def update_dag_parsing_results_in_db( warnings: set[DagWarning], session: Session, *, - warning_types: tuple[DagWarningType] = (DagWarningType.NONEXISTENT_POOL,), + warning_types: tuple[DagWarningType, ...] = ( + DagWarningType.NONEXISTENT_POOL, + DagWarningType.RUNTIME_VARYING_VALUE, + ), files_parsed: set[tuple[str, str]] | None = None, ): """ diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index bddeedd8ac134..07279f72b267a 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -1211,13 +1211,16 @@ def process_parse_results( files_parsed = {(bundle_name, relative_fileloc)} files_parsed.update(import_errors.keys()) + if (warnings := parsing_result.warnings) and isinstance(warnings[0], dict): + warnings = [DagWarning(**warn) for warn in warnings] + update_dag_parsing_results_in_db( bundle_name=bundle_name, bundle_version=bundle_version, dags=parsing_result.serialized_dags, import_errors=import_errors, parse_duration=run_duration, - warnings=set(parsing_result.warnings or []), + warnings=set(warnings or []), session=session, files_parsed=files_parsed, ) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 77bc71df3c464..628e94df61500 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -68,6 +68,7 @@ from airflow.sdk.execution_time.supervisor import WatchedSubprocess from airflow.sdk.execution_time.task_runner import RuntimeTaskInstance, _send_error_email_notification from airflow.serialization.serialized_objects import DagSerialization, LazyDeserializedDAG +from airflow.utils.dag_version_inflation_checker import check_dag_file_stability from airflow.utils.file import iter_airflow_imports from airflow.utils.state import TaskInstanceState @@ -198,6 +199,7 @@ def _parse_file_entrypoint(): log = structlog.get_logger(logger_name="task") result = _parse_file(msg, log) + if result is not None: comms_decoder.send(result) @@ -205,14 +207,25 @@ def _parse_file_entrypoint(): def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileParsingResult | None: # TODO: Set known_pool names on DagBag! + stability_check_result = check_dag_file_stability(os.fspath(msg.file)) + + if stability_check_error_dict := stability_check_result.get_error_format_dict(msg.file, msg.bundle_path): + # If Dag stability check level is error, we shouldn't parse the Dags and return the result early + return DagFileParsingResult( + fileloc=msg.file, + serialized_dags=[], + import_errors=stability_check_error_dict, + ) + bag = BundleDagBag( dag_folder=msg.file, bundle_path=msg.bundle_path, bundle_name=msg.bundle_name, load_op_links=False, ) + if msg.callback_requests: - # If the request is for callback, we shouldn't serialize the DAGs + # If the request is for callback, we shouldn't serialize the Dags _execute_callbacks(bag, msg.callback_requests, log) return None @@ -222,8 +235,7 @@ def _parse_file(msg: DagFileParseRequest, log: FilteringBoundLogger) -> DagFileP fileloc=msg.file, serialized_dags=serialized_dags, import_errors=bag.import_errors, - # TODO: Make `bag.dag_warnings` not return SQLA model objects - warnings=[], + warnings=stability_check_result.get_formatted_warnings(bag.dag_ids), ) return result diff --git a/airflow-core/src/airflow/models/dagwarning.py b/airflow-core/src/airflow/models/dagwarning.py index eaf5945857108..032f0fa6f6adf 100644 --- a/airflow-core/src/airflow/models/dagwarning.py +++ b/airflow-core/src/airflow/models/dagwarning.py @@ -103,3 +103,4 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NONEXISTENT_POOL = "non-existent pool" + RUNTIME_VARYING_VALUE = "runtime varying value" diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts index b44c2883a9551..20984479a8439 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -3433,7 +3433,7 @@ export const $DagVersionResponse = { export const $DagWarningType = { type: 'string', - enum: ['asset conflict', 'non-existent pool'], + enum: ['asset conflict', 'non-existent pool', 'runtime varying value'], title: 'DagWarningType', description: `Enum for DAG warning types. diff --git a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts index af14ba786e646..478be8af1c0f1 100644 --- a/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow-core/src/airflow/ui/openapi-gen/requests/types.gen.ts @@ -877,7 +877,7 @@ export type DagVersionResponse = { * This is the set of allowable values for the ``warning_type`` field * in the DagWarning model. */ -export type DagWarningType = 'asset conflict' | 'non-existent pool'; +export type DagWarningType = 'asset conflict' | 'non-existent pool' | 'runtime varying value'; /** * Backfill collection serializer for responses in dry-run mode. diff --git a/airflow-core/src/airflow/utils/dag_version_inflation_checker.py b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py new file mode 100644 index 0000000000000..ca7f58b26026f --- /dev/null +++ b/airflow-core/src/airflow/utils/dag_version_inflation_checker.py @@ -0,0 +1,529 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import ast +from dataclasses import dataclass +from enum import Enum +from pathlib import Path + +RUNTIME_VARYING_CALLS = [ + ("datetime", "now"), + ("datetime", "today"), + ("datetime", "utcnow"), + ("date", "today"), + ("time", "time"), + ("time", "localtime"), + ("random", "random"), + ("random", "randint"), + ("random", "choice"), + ("random", "uniform"), + ("uuid", "uuid4"), + ("uuid", "uuid1"), + ("pendulum", "now"), + ("pendulum", "today"), + ("pendulum", "yesterday"), + ("pendulum", "tomorrow"), +] + + +class DagVersionInflationCheckLevel(Enum): + """enum class for Dag version inflation check level.""" + + off = "off" + warning = "warning" + error = "error" + + +class DagVersionInflationCheckResult: + """ + Represents the result of stability analysis on a Dag file. + + Stores detected warnings and formats them appropriately based on the configured check level + (warning or error). + """ + + def __init__(self, check_level: DagVersionInflationCheckLevel): + self.check_level: DagVersionInflationCheckLevel = check_level + self.warnings: list[RuntimeVaryingValueWarning] = [] + self.runtime_varying_values: dict = {} + + def format_warnings(self) -> str | None: + """Return formatted string of warning list.""" + if not self.warnings: + return None + + lines = [ + "This Dag uses runtime-variable values in Dag construction.", + "It causes the Dag version to increase as values change on every Dag parse.", + "", + ] + for w in self.warnings: + lines.extend( + [ + f"Line {w.line}, Col {w.col}", + f"Code: {w.code}", + f"Issue: {w.message}", + "", + ] + ) + + if self.runtime_varying_values: + lines.extend( + [ + "️Don't use the variables as arguments in Dag/Task constructors:", + *( + f" Line {line}: '{var_name}' related '{source}'" + for var_name, (line, source) in sorted( + self.runtime_varying_values.items(), + key=lambda x: x[1][0], + ) + ), + "", + ] + ) + + return "\n".join(lines) + + def get_formatted_warnings(self, dag_ids: list[str]) -> list[dict[str, str | None]]: + """Convert warning statement to Dag warning format.""" + from airflow.models.dagwarning import DagWarningType + + if not self.warnings or self.check_level != DagVersionInflationCheckLevel.warning: + return [] + return [ + { + "dag_id": dag_id, + "warning_type": DagWarningType.RUNTIME_VARYING_VALUE.value, + "message": self.format_warnings(), + } + for dag_id in dag_ids + ] + + def get_error_format_dict(self, file_path, bundle_path): + if not self.warnings or self.check_level != DagVersionInflationCheckLevel.error: + return None + + relative_file_path = str(Path(file_path).relative_to(bundle_path)) if bundle_path else file_path + return {relative_file_path: self.format_warnings()} + + +@dataclass +class RuntimeVaryingValueWarning: + """Warning information for runtime-varying value detection.""" + + line: int + col: int + code: str + message: str + + +class WarningContext(str, Enum): + """Context types for warnings.""" + + TASK_CONSTRUCTOR = "Task constructor" + DAG_CONSTRUCTOR = "Dag constructor" + + +class RuntimeVaryingValueAnalyzer: + """ + Analyzer dedicated to tracking and detecting runtime-varying values. + + This analyzer is responsible for identifying if a given AST node + contains values that change on every execution (datetime.now(), random(), etc.). + """ + + def __init__( + self, + varying_vars: dict[str, tuple[int, str]], + imports: dict[str, str], + from_imports: dict[str, tuple[str, str]], + ): + self.varying_vars = varying_vars + self.imports = imports + self.from_imports = from_imports + + def get_varying_source(self, node: ast.expr) -> str | None: + """ + Check if an AST node contains runtime-varying values and return the source. + + Checks: + - Runtime-varying function calls (datetime.now(), etc.) + - Runtime-varying variable references + - Runtime-varying values in f-strings + - Runtime-varying values in expressions/collections + """ + if isinstance(node, ast.Call): + # 1. Direct runtime-varying call + if self.is_runtime_varying_call(node): + return ast.unparse(node) + + # 2. Method call chain + if isinstance(node.func, ast.Attribute): + return self.get_varying_source(node.func.value) + + # 3. Runtime-varying variable reference + if isinstance(node, ast.Name) and node.id in self.varying_vars: + _, source = self.varying_vars[node.id] + return source + + # 4. f-string + if isinstance(node, ast.JoinedStr): + return self.get_varying_fstring(node) + + # 5. Binary operation + if isinstance(node, ast.BinOp): + return self.get_varying_source(node.left) or self.get_varying_source(node.right) + + # 6. Collections (list/tuple/set) + if isinstance(node, (ast.List, ast.Tuple, ast.Set)): + return self.get_varying_collection(node.elts) + + # 7. List comprehension + if isinstance(node, ast.ListComp): + return self.get_varying_source(node.elt) + + # 8. Dictionary + if isinstance(node, ast.Dict): + return self.get_varying_dict(node) + + return None + + def get_varying_fstring(self, node: ast.JoinedStr) -> str | None: + """Check for runtime-varying values inside f-strings.""" + for value in node.values: + if isinstance(value, ast.FormattedValue) and (source := self.get_varying_source(value.value)): + return source + return None + + def get_varying_collection(self, elements: list) -> str | None: + """Check for runtime-varying values in collection elements.""" + for elt in elements: + if source := self.get_varying_source(elt): + return source + return None + + def get_varying_dict(self, node: ast.Dict) -> str | None: + """Check for runtime-varying values in dictionary keys/values.""" + for key, value in zip(node.keys, node.values): + if key and (source := self.get_varying_source(key)): + return source + if value and (source := self.get_varying_source(value)): + return source + return None + + def is_runtime_varying_call(self, node: ast.Call) -> bool: + """ + Check if a call is runtime-varying. + + 1. Is the function itself runtime-varying? + 2. Do the arguments contain runtime-varying values? + """ + # Check if the function itself is runtime-varying + if isinstance(node.func, ast.Attribute) and self.is_runtime_varying_attribute_call(node.func): + return True + + if isinstance(node.func, ast.Name) and self.is_runtime_varying_name_call(node.func): + return True + + # Check if arguments contain runtime-varying values + return self.has_varying_arguments(node) + + def has_varying_arguments(self, node: ast.Call) -> bool: + """Check if function arguments contain runtime-varying values.""" + for arg in node.args: + if self.get_varying_source(arg): + return True + + for kw in node.keywords: + if self.get_varying_source(kw.value): + return True + + return False + + def is_runtime_varying_attribute_call(self, attr: ast.Attribute) -> bool: + """Check for runtime-varying calls like datetime.now().""" + method_name = attr.attr + + if isinstance(attr.value, ast.Name): + module_or_alias = attr.value.id + actual_module = self.imports.get(module_or_alias, module_or_alias) + + # If imported via "from import" + if module_or_alias in self.from_imports: + _, original_name = self.from_imports[module_or_alias] + actual_module = original_name + + return (actual_module, method_name) in RUNTIME_VARYING_CALLS + + # Nested attribute (e.g., datetime.datetime.now) + if isinstance(attr.value, ast.Attribute): + inner_attr = attr.value + if isinstance(inner_attr.value, ast.Name): + return (inner_attr.attr, method_name) in RUNTIME_VARYING_CALLS + + return False + + def is_runtime_varying_name_call(self, func: ast.Name) -> bool: + """Check for runtime-varying calls like now() (when imported via 'from import').""" + func_name = func.id + + if func_name in self.from_imports: + module, original_name = self.from_imports[func_name] + module_parts = module.split(".") + + for part in module_parts: + if (part, original_name) in RUNTIME_VARYING_CALLS: + return True + + return False + + +class DagTaskDetector: + """ + Detector dedicated to identifying Dag and Task constructors. + + This detector identifies when code is creating Dag or Task objects + in Airflow. It needs to handle both traditional class instantiation and decorator styles. + """ + + def __init__(self, from_imports: dict[str, tuple[str, str]]): + self.from_imports: dict[str, tuple[str, str]] = from_imports + self.dag_instances: set[str] = set() + self.is_in_dag_context: bool = False + + def is_dag_constructor(self, node: ast.Call) -> bool: + """Check if a call is a Dag constructor.""" + if not isinstance(node.func, ast.Name): + return False + + func_name = node.func.id + + # "from airflow import DAG" form or "from airflow.decorator import dag" + if func_name in self.from_imports: + module, original = self.from_imports[func_name] + if (module == "airflow" or module.startswith("airflow.")) and original in ("DAG", "dag"): + return True + + return False + + def is_task_constructor(self, node: ast.Call) -> bool: + """ + Check if a call is a Task constructor. + + Criteria: + 1. All calls within a Dag with block + 2. Calls that receive a Dag instance as an argument (dag=...) + """ + # Inside Dag with block + if self.is_in_dag_context: + return True + + # Passing Dag instance as argument + for arg in node.args: + if isinstance(arg, ast.Name) and arg.id in self.dag_instances: + return True + + for keyword in node.keywords: + if keyword.value and isinstance(keyword.value, ast.Name): + if keyword.value.id in self.dag_instances: + return True + + return False + + def register_dag_instance(self, var_name: str): + """Register a Dag instance variable name.""" + self.dag_instances.add(var_name) + + def enter_dag_context(self): + """Enter a Dag with block.""" + self.is_in_dag_context = True + + def exit_dag_context(self): + """Exit a Dag with block.""" + self.is_in_dag_context = False + + +class AirflowRuntimeVaryingValueChecker(ast.NodeVisitor): + """ + Main visitor class to detect runtime-varying value usage in Airflow Dag/Task. + + Main responsibilities: + - Traverse AST and visit nodes + - Detect Dag/Task creation + - Track runtime-varying values and generate warnings + """ + + def __init__(self, check_level: DagVersionInflationCheckLevel = DagVersionInflationCheckLevel.warning): + self.static_check_result: DagVersionInflationCheckResult = DagVersionInflationCheckResult( + check_level=check_level + ) + self.imports: dict[str, str] = {} + self.from_imports: dict[str, tuple[str, str]] = {} + self.varying_vars: dict[str, tuple[int, str]] = {} + self.check_level = check_level + + # Helper objects + self.value_analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) + self.dag_detector = DagTaskDetector(self.from_imports) + + def visit_Import(self, node: ast.Import): + """Process import statements.""" + for alias in node.names: + name = alias.asname or alias.name + self.imports[name] = alias.name + + def visit_ImportFrom(self, node: ast.ImportFrom): + """Process from ... import statements.""" + if node.module: + for alias in node.names: + name = alias.asname or alias.name + self.from_imports[name] = (node.module, alias.name) + + def visit_Assign(self, node: ast.Assign): + """ + Process variable assignments. + + Checks: + 1. Dag instance assignment + 2. Task instance assignment + 3. Runtime-varying value assignment + """ + value = node.value + + # Dag constructor + if isinstance(value, ast.Call) and self.dag_detector.is_dag_constructor(value): + self._register_dag_instances(node.targets) + self._check_and_warn(value, WarningContext.DAG_CONSTRUCTOR) + + # Task constructor + elif isinstance(value, ast.Call) and self.dag_detector.is_task_constructor(value): + self._check_and_warn(value, WarningContext.TASK_CONSTRUCTOR) + + # Track runtime-varying values + else: + self._track_varying_assignment(node) + + def visit_Call(self, node: ast.Call): + """ + Process function calls. + + Check not assign but just call the function or Dag definition via decorator. + """ + if self.dag_detector.is_dag_constructor(node): + self._check_and_warn(node, WarningContext.DAG_CONSTRUCTOR) + + elif self.dag_detector.is_task_constructor(node): + self._check_and_warn(node, WarningContext.TASK_CONSTRUCTOR) + + def visit_For(self, node: ast.For): + """ + Process for statements. + + Check if iteration target contains runtime-varying values. + """ + # check the iterator value is runtime-varying + # iter is runtime-varying : for iter in [datetime.now(), 3] + if varying_source := self.value_analyzer.get_varying_source(node.iter): + if isinstance(node.target, ast.Name): + self.varying_vars[node.target.id] = (node.lineno, varying_source) + + for body in node.body: + self.visit(body) + + if varying_source: + if isinstance(node.target, ast.Name): + self.varying_vars.pop(node.target.id) + + def visit_With(self, node: ast.With): + """ + Process with statements. + + Detect Dag context manager. + """ + is_with_dag_context = False + for item in node.items: + # check if the Dag instance exists in with context + self.visit(item) + if isinstance(item.context_expr, ast.Call): + if self.dag_detector.is_dag_constructor(item.context_expr): + # check the value defined in with statement to detect entering Dag with block + is_with_dag_context = True + + if is_with_dag_context: + self.dag_detector.enter_dag_context() + + for body in node.body: + self.visit(body) + + # Exit Dag with block + self.dag_detector.exit_dag_context() + + def _register_dag_instances(self, targets: list): + """Register Dag instance variable names.""" + for target in targets: + if isinstance(target, ast.Name): + self.dag_detector.register_dag_instance(target.id) + + def _track_varying_assignment(self, node: ast.Assign): + """Track variable assignments with runtime-varying values.""" + if varying_source := self.value_analyzer.get_varying_source(node.value): + for target in node.targets: + if isinstance(target, ast.Name): + self.varying_vars[target.id] = (node.lineno, varying_source) + + def _check_and_warn(self, call: ast.Call, context: WarningContext): + """Check function call arguments and generate warnings.""" + if self.value_analyzer.get_varying_source(call): + self.static_check_result.warnings.append( + RuntimeVaryingValueWarning( + line=call.lineno, + col=call.col_offset, + code=ast.unparse(call), + message=self._get_warning_message(context), + ) + ) + + def _get_warning_message(self, context: WarningContext) -> str: + """Get appropriate warning message based on context.""" + if self.dag_detector.is_in_dag_context and context == WarningContext.TASK_CONSTRUCTOR: + return "Don't use runtime-varying values as function arguments within with Dag block" + return f"Don't use runtime-varying value as argument in {context.value}" + + +def check_dag_file_stability(file_path) -> DagVersionInflationCheckResult: + from airflow.configuration import conf + + try: + check_level = DagVersionInflationCheckLevel( + conf.get("dag_processor", "dag_version_inflation_check_level") + ) + except ValueError: + check_level = DagVersionInflationCheckLevel.warning + + if check_level == DagVersionInflationCheckLevel.off: + return DagVersionInflationCheckResult(check_level=check_level) + + try: + parsed = ast.parse(Path(file_path).read_bytes()) + except (SyntaxError, ValueError, TypeError, FileNotFoundError): + return DagVersionInflationCheckResult(check_level=check_level) + + checker = AirflowRuntimeVaryingValueChecker(check_level) + checker.visit(parsed) + checker.static_check_result.runtime_varying_values = checker.varying_vars + return checker.static_check_result diff --git a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py index 287245127f8cc..0d9eb862e29f6 100644 --- a/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py +++ b/airflow-core/tests/unit/api_fastapi/core_api/routes/public/test_dag_warning.py @@ -114,4 +114,7 @@ def test_get_dag_warnings_bad_request(self, test_client): response = test_client.get("/dagWarnings", params={"warning_type": "invalid"}) response_json = response.json() assert response.status_code == 422 - assert response_json["detail"][0]["msg"] == "Input should be 'asset conflict' or 'non-existent pool'" + assert ( + response_json["detail"][0]["msg"] + == "Input should be 'asset conflict', 'non-existent pool' or 'runtime varying value'" + ) diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 960b8440902de..76d13c99f958b 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -609,6 +609,41 @@ def fake_collect_dags(self, *args, **kwargs): assert called is True +@conf_vars({("dag_processor", "dag_version_inflation_check_level"): "error"}) +def test_parse_file_static_check_with_error(): + result = _parse_file( + DagFileParseRequest( + file=f"{TEST_DAG_FOLDER}/test_dag_version_inflation_check.py", + bundle_path=TEST_DAG_FOLDER, + bundle_name="testing", + ), + log=structlog.get_logger(), + ) + + assert result.serialized_dags == [] + assert list(result.import_errors.keys()) == ["test_dag_version_inflation_check.py"] + assert result.warnings is None + assert "Don't use the variables as arguments" in next(iter(result.import_errors.values())) + + +def test_parse_file_static_check_with_default_warning(): + result = _parse_file( + DagFileParseRequest( + file=f"{TEST_DAG_FOLDER}/test_dag_version_inflation_check.py", + bundle_path=TEST_DAG_FOLDER, + bundle_name="testing", + ), + log=structlog.get_logger(), + ) + + assert len(result.serialized_dags) > 0 + assert len(result.warnings) == len(result.serialized_dags) + assert all( + warning.get("dag_id") and warning.get("warning_type") and warning.get("message") + for warning in result.warnings + ) + + def test_callback_processing_does_not_update_timestamps(session): """Callback processing should not update last_finish_time to prevent stale DAG detection.""" stat = process_parse_results( diff --git a/airflow-core/tests/unit/dags/test_dag_version_inflation_check.py b/airflow-core/tests/unit/dags/test_dag_version_inflation_check.py new file mode 100644 index 0000000000000..96715b50308e0 --- /dev/null +++ b/airflow-core/tests/unit/dags/test_dag_version_inflation_check.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import random +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.providers.standard.operators.bash import BashOperator +from airflow.providers.standard.operators.python import PythonOperator + + +def print_hello(): + print("Hello from Airflow!") + return "Success" + + +def print_date(): + print(f"Current date: {datetime.now()}") + return datetime.now() + + +def calculate_sum(): + result = sum(range(1, 11)) + print(f"Sum of 1 to 10: {result}") + return result + + +default_args = { + "owner": "airflow", + "depends_on_past": False, + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=5), +} + +random_task_id = random.randint(1, 10000) + +for i in range(1, 10): + dag_id = f"dynamic_dag_{i:03d}" + + dag = DAG( + dag_id=dag_id, + default_args=default_args, + description=f"number {i}", + schedule=timedelta(minutes=1), + start_date=datetime(2024, 1, 1), + catchup=False, + is_paused_upon_creation=False, + ) + + task1 = BashOperator( + task_id=f"{str(random_task_id)}", + bash_command=f'echo "Hello from Dag {i}!"', + dag=dag, + ) + + task2 = PythonOperator( + task_id="print_python_hello", + python_callable=print_hello, + dag=dag, + ) + + task3 = PythonOperator( + task_id="print_current_date", + python_callable=print_date, + dag=dag, + ) + + task4 = PythonOperator( + task_id="calculate_sum", + python_callable=calculate_sum, + dag=dag, + ) + + task5 = BashOperator( + task_id="final_task", + bash_command=f'echo "Dag {i} - All tasks completed!"', + dag=dag, + ) + + task1 >> [task2, task3] >> task4 >> task5 diff --git a/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py new file mode 100644 index 0000000000000..fdc2160328ffe --- /dev/null +++ b/airflow-core/tests/unit/utils/test_dag_version_inflation_checker.py @@ -0,0 +1,735 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import ast + +from airflow.utils.dag_version_inflation_checker import ( + AirflowRuntimeVaryingValueChecker, + DagTaskDetector, + RuntimeVaryingValueAnalyzer, + RuntimeVaryingValueWarning, + WarningContext, +) + + +class TestRuntimeVaryingValueAnalyzer: + def setup_method(self): + """Each test gets a fresh analyzer instance.""" + self.varying_vars = {} + self.imports = {} + self.from_imports = {} + self.analyzer = RuntimeVaryingValueAnalyzer(self.varying_vars, self.imports, self.from_imports) + + def test_is_runtime_varying_attribute_call__detects_datetime_now(self): + """datetime.now() should be recognized as runtime-varying.""" + code = "datetime.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + # The func is an Attribute node: datetime.now + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_attribute_call__ignores_static_method(self): + """Static methods like str.upper() should NOT be detected.""" + code = "str.upper('hello')" + call_node = ast.parse(code, mode="eval").body + + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) + + assert result is False + + def test_is_runtime_varying_attribute_call__handles_aliased_imports(self): + """ + Should detect runtime-varying calls even with import aliases. + + Example: import datetime as dt; dt.now() + """ + code = "dt.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["dt"] = "datetime" # dt is alias for datetime + + assert isinstance(call_node.func, ast.Attribute) + result = self.analyzer.is_runtime_varying_attribute_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_name_call__detects_uuid4(self): + """Detect uuid4() when imported as "from uuid import uuid4.""" + code = "uuid4()" + call_node = ast.parse(code, mode="eval").body + self.from_imports["uuid4"] = ("uuid", "uuid4") + + assert isinstance(call_node.func, ast.Name) + result = self.analyzer.is_runtime_varying_name_call(call_node.func) + + assert result is True + + def test_is_runtime_varying_name_call__ignores_regular_function(self): + code = "my_function()" + call_node = ast.parse(code, mode="eval").body + + assert isinstance(call_node.func, ast.Name) + result = self.analyzer.is_runtime_varying_name_call(call_node.func) + + assert result is False + + def test_has_varying_arguments__detects_varying_positional_arg(self): + """ + Detect when a positional argument is runtime-varying. + + Example: print(datetime.now()) + """ + code = "print(datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.has_varying_arguments(call_node) + + assert result is True + + def test_has_varying_arguments__detects_varying_keyword_arg(self): + """ + Detect when a keyword argument is runtime-varying. + + Example: func(param=random.randint(1, 10)) + """ + code = "func(param=func1(random.randint(1, 10)))" + call_node = ast.parse(code, mode="eval").body + self.imports["random"] = "random" + + result = self.analyzer.has_varying_arguments(call_node) + + assert result is True + + def test_has_varying_arguments__returns_false_for_static_args(self): + """ + Static arguments should return False. + + Example: print("hello", 123) + """ + code = 'print("hello", 123)' + call_node = ast.parse(code, mode="eval").body + + result = self.analyzer.has_varying_arguments(call_node) + + assert result is False + + def test_is_runtime_varying_call__true_when_function_itself_varies(self): + """ + Return True when the function call itself is runtime-varying. + + Example: datetime.now() - the function is the varying part + """ + code = "datetime.now()" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.is_runtime_varying_call(call_node) + + assert result is True + + def test_is_runtime_varying_call__true_when_argument_varies(self): + """ + Return True when arguments contain runtime-varying values. + + Example: print(datetime.now()) - print is static but arg varies + """ + code = "print(datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.is_runtime_varying_call(call_node) + + assert result is True + + def test_is_runtime_varying_call__false_when_completely_static(self): + """Return False when both function and arguments are static.""" + code = 'print("hello")' + call_node = ast.parse(code, mode="eval").body + + result = self.analyzer.is_runtime_varying_call(call_node) + + assert result is False + + def test_get_varying_source__detects_direct_call(self): + """Detect direct runtime-varying function calls.""" + code = "datetime.now()" + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_variable_reference(self): + """ + Detect when a variable holds a runtime-varying value. + + Example: current_time = datetime.now(); + """ + code = "current_time" + node = ast.parse(code, mode="eval").body + self.varying_vars["current_time"] = (10, "datetime.now()") + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_fstring(self): + """ + Detect runtime-varying values embedded in f-strings. + + Example: f"dag_{datetime.now()}" + """ + code = 'f"dag_{datetime.now()}"' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_list(self): + """ + Detect runtime-varying values inside list literals. + + Example: [1, 2, datetime.now()] + """ + code = "[1, 2, datetime.now()]" + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_dict_value(self): + """ + Detect runtime-varying values in dictionary values. + + Example: {"key": datetime.now()} + """ + code = '{"key": datetime.now()}' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result == "datetime.now()" + + def test_get_varying_source__detects_in_binary_operation(self): + """ + Detect runtime-varying values in binary operations. + + Example: "prefix_" + str(datetime.now()) + """ + code = '"prefix_" + str(datetime.now())' + node = ast.parse(code, mode="eval").body + self.imports["datetime"] = "datetime" + + result = self.analyzer.get_varying_source(node) + + assert result is not None + assert "datetime.now()" in result + + def test_get_varying_source__returns_none_for_static_values(self): + """ + Return None for completely static values. + + Example: "static_string", 123, [1, 2, 3] + """ + static_values = [ + '"static_string"', + "123", + "[1, 2, 3]", + '{"key": "value"}', + ] + + for code in static_values: + node = ast.parse(code, mode="eval").body + result = self.analyzer.get_varying_source(node) + assert result is None, f"Expected None for static value: {code}" + + +class TestDAGTaskDetector: + def setup_method(self): + """Each test gets a fresh detector instance""" + self.from_imports = {} + self.detector = DagTaskDetector(self.from_imports) + + def test_is_dag_constructor__detects_traditional_dag_call_uppercase(self): + """ + Detect uppercase DAG() when imported. + + Usage: dag = DAG(dag_id="my_dag") + """ + code = 'DAG(dag_id="my_dag")' + call_node = ast.parse(code, mode="eval").body + self.from_imports["DAG"] = ("airflow", "DAG") + + result = self.detector.is_dag_constructor(call_node) + + assert result is True + + def test_is_dag_constructor__detects_dag_generated_by_decorator(self): + """ + Detect Dag generated by decorator. + + Usage: @dag(dag_id="my_dag") + """ + code = 'dag(dag_id="my_dag")' + call_node = ast.parse(code, mode="eval").body + self.from_imports["dag"] = ("airflow.decorators", "dag") + + result = self.detector.is_dag_constructor(call_node) + + assert result is True + + def test_is_dag_constructor__ignores_non_dag_functions(self): + """Regular function calls should not be detected as Dag constructors.""" + code = "my_function()" + call_node = ast.parse(code, mode="eval").body + + result = self.detector.is_dag_constructor(call_node) + + assert result is False + + def test_is_task_constructor__true_when_inside_dag_context(self): + """ + Any function call inside a Dag with-block is considered a task. + + Example: + with DAG() as dag: + PythonOperator() # <- This is a task + """ + code = "PythonOperator(task_id='my_task')" + call_node = ast.parse(code, mode="eval").body + + self.detector.enter_dag_context() + result = self.detector.is_task_constructor(call_node) + + assert result is True + + def test_is_task_constructor__false_when_outside_dag_context(self): + """Same call outside Dag context is NOT automatically a task.""" + code = "PythonOperator(task_id='my_task')" + call_node = ast.parse(code, mode="eval").body + + result = self.detector.is_task_constructor(call_node) + assert result is False + + def test_is_task_constructor__true_when_dag_passed_as_argument(self): + """ + Detect task when dag= parameter references a Dag instance. + + Example: my_dag = DAG(dag_id='dag); task = PythonOperator(dag=my_dag) + """ + code = "PythonOperator(task_id='task', dag=my_dag)" + call_node = ast.parse(code, mode="eval").body + self.detector.register_dag_instance("my_dag") + + result = self.detector.is_task_constructor(call_node) + assert result is True + + def test_is_task_constructor__true_when_dag_in_positional_args(self): + """ + Detect task even when Dag is passed as positional argument. + + Example: my_dag = DAG(dag_id='dag); task = PythonOperator('task_id', my_dag) + """ + code = "PythonOperator('task_id', my_dag)" + call_node = ast.parse(code, mode="eval").body + self.detector.register_dag_instance("my_dag") + + result = self.detector.is_task_constructor(call_node) + assert result is True + + def test_enter_and_exit_dag_context(self): + """Properly track entering and exiting Dag with-blocks.""" + assert self.detector.is_in_dag_context is False + + self.detector.enter_dag_context() + assert self.detector.is_in_dag_context is True + + self.detector.exit_dag_context() + assert self.detector.is_in_dag_context is False + + def test_register_dag_instance(self): + """Remember variable names that hold Dag instances.""" + assert "my_dag" not in self.detector.dag_instances + + self.detector.register_dag_instance("my_dag") + + assert "my_dag" in self.detector.dag_instances + + +class TestAirflowRuntimeVaryingValueChecker: + """Tests for AirflowRuntimeVaryingValueChecker (Main Visitor).""" + + def setup_method(self): + """Each test gets a fresh checker instance""" + self.checker = AirflowRuntimeVaryingValueChecker() + + def test_visit_import__tracks_simple_import(self): + """Remember simple imports like 'import datetime'.""" + code = "import datetime" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "datetime" in self.checker.imports + assert self.checker.imports["datetime"] == "datetime" + + def test_visit_import__tracks_aliased_import(self): + """Remember import aliases like 'import datetime as dt'.""" + code = "import datetime as dt" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "dt" in self.checker.imports + assert self.checker.imports["dt"] == "datetime" + + def test_visit_importfrom__tracks_from_import(self): + """Remember 'from X import Y' style imports.""" + code = "from datetime import now" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "now" in self.checker.from_imports + assert self.checker.from_imports["now"] == ("datetime", "now") + + def test_visit_importfrom__tracks_aliased_from_import(self): + """Remember aliases in from imports.""" + code = "from datetime import now as current_time" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "current_time" in self.checker.from_imports + assert self.checker.from_imports["current_time"] == ("datetime", "now") + + def test_visit_assign__registers_dag_instance(self): + """When assigning DAG(), remember the variable name.""" + code = """ +from airflow import DAG +my_dag = DAG(dag_id="test") +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "my_dag" in self.checker.dag_detector.dag_instances + + def test_visit_assign__tracks_varying_variable(self): + """When assigning a runtime-varying value, track the variable.""" + code = """ +from datetime import datetime +current_time = datetime.now() +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert "current_time" in self.checker.varying_vars + line, source = self.checker.varying_vars["current_time"] + assert "datetime.now()" in source + + def test_visit_assign__warns_on_dag_with_varying_value(self): + """Warn when Dag constructor uses runtime-varying values.""" + code = """ +from airflow import DAG +from datetime import datetime +dag = DAG(dag_id=f"dag_{datetime.now()}") +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert len(self.checker.static_check_result.warnings) == 1 + assert any("Dag constructor" in w.message for w in self.checker.static_check_result.warnings) + + def test_visit_call__detects_task_in_dag_context(self): + """Detect task creation inside Dag with block.""" + code = """ +from airflow import DAG +from airflow.operators.python import PythonOperator +from datetime import datetime + +with DAG(dag_id="test") as dag: + task = PythonOperator(task_id=f"task_{datetime.now()}") # !problem +""" + tree = ast.parse(code) + + self.checker.visit(tree) + + assert len(self.checker.static_check_result.warnings) == 1 + assert any("PythonOperator" in w.code for w in self.checker.static_check_result.warnings) + + def test_visit_for__warns_on_varying_range(self): + """Warn when for-loop range is runtime-varying.""" + code = """ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime + +with DAG( + dag_id=dag_id, + schedule_interval='@daily', +) as dag: + for i in [datetime.now(), "3"]: + task = BashOperator( + task_id='print_bash_hello_{i}', + bash_command=f'echo "Hello from DAG {i}!"', # !problem + dag=dag, + ) +""" + tree = ast.parse(code) + + self.checker.visit(tree) + warnings = self.checker.static_check_result.warnings + + assert len(warnings) == 1 + assert any("BashOperator" in w.code for w in warnings) + + def test_check_and_warn__creates_warning_for_varying_arg(self): + """Create a warning when detecting varying positional argument.""" + code = 'DAG(f"dag_{datetime.now()}")' + call_node = ast.parse(code, mode="eval").body + self.checker.from_imports["DAG"] = ("airflow", "DAG") + self.checker.imports["datetime"] = "datetime" + + self.checker._check_and_warn(call_node, WarningContext.DAG_CONSTRUCTOR) + + assert len(self.checker.static_check_result.warnings) == 1 + warning = self.checker.static_check_result.warnings[0] + assert WarningContext.DAG_CONSTRUCTOR.value in warning.message + assert "datetime.now()" in warning.code + + def test_check_and_warn__creates_warning_for_varying_kwarg(self): + """Create a warning when detecting varying keyword argument""" + code = "DAG(dag_id=datetime.now())" + call_node = ast.parse(code, mode="eval").body + self.checker.from_imports["DAG"] = ("airflow", "DAG") + self.checker.imports["datetime"] = "datetime" + + self.checker._check_and_warn(call_node, WarningContext.TASK_CONSTRUCTOR) + + assert len(self.checker.static_check_result.warnings) == 1 + warning = self.checker.static_check_result.warnings[0] + assert "dag_id" in warning.code + assert "datetime.now()" in warning.code + + +class TestIntegrationScenarios: + """ + Integration tests showing real-world Airflow patterns. + Demonstrate actual use cases and why they're problematic. + """ + + def _check_code(self, code: str) -> list[RuntimeVaryingValueWarning]: + """Helper to parse and check code""" + tree = ast.parse(code) + checker = AirflowRuntimeVaryingValueChecker() + checker.visit(tree) + return checker.static_check_result.warnings + + def test_antipattern__dynamic_dag_id_with_timestamp(self): + """ANTI-PATTERN: Using timestamps in Dag IDs.""" + code = """ +from airflow import DAG +from datetime import datetime + +# BAD: Dag ID includes current timestamp +dag = DAG(dag_id=f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}") +""" + warnings = self._check_code(code) + + assert len(warnings) == 1 + assert any("datetime.now()" in w.code for w in warnings) + + def test_define_dag_with_block(self): + code = """ +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.operators.bash import BashOperator +import uuid +from datetime import datetime as dt + +start_date = dt.now() + +default_args = { + 'start_date': start_date +} + +with DAG( + dag_id="my_dag", + default_args=default_args # !problem +) as dag, Test(default_args=default_args) as test: + task1 = PythonOperator( + task_id=f"task_{uuid.uuid4()}", # !problem + python_callable=lambda: None + ) + + task2 = BashOperator( + task_id=f"task_{dt.now()}" # !problem + ) + + task3 = BashOperator( + task_id="task_for_normal_case" + ) + + task1 >> task2 >> task3 +""" + warnings = self._check_code(code) + + assert len(warnings) == 3 + assert any("uuid.uuid4()" in w.code for w in warnings) + assert any("dt.now()" in w.code for w in warnings) + assert any("default_args" in w.code for w in warnings) + + def test_correct_pattern__static_dag_with_runtime_context(self): + code = """ +from airflow import DAG +from airflow.models.param import Param +from airflow.operators.bash import BashOperator +from datetime import datetime +from mydule import test_function + +import time + +current_timestamp = time.time() +local_time = time.localtime() + +dag = DAG( + dag_id='time_module_dag', + start_date=datetime(2024, 1, 1), + schedule_interval='@daily', + params={ + "execution_date": Param( + default=f"manual_run_{datetime.now().isoformat()}", # !problem + description="Unique identifier for the run", + type="string", + minLength=10, + ) + }, +) + +b = test_function(time=current_timestamp) + +task1 = BashOperator( + task_id='time_task', + bash_command=f'echo "Timestamp: {current_timestamp}"', # !problem + dag=dag, +) + +task2 = BashOperator( + task_id='time_task2', + dag=dag, +) + +task1 >> task2 +""" + warnings = self._check_code(code) + + assert len(warnings) == 2 + assert any("Param(default=f'manual_run_{datetime.now().isoformat()}'" in w.code for w in warnings) + assert any("current_timestamp" in w.code for w in warnings) + + def test_dag_decorator_pattern__currently_not_detected(self): + """ + PATTERN: @dag decorator usage + """ + code = """ +from airflow.decorators import dag, task +from datetime import datetime + +@dag(dag_id=f"my_dag_{datetime.now()}") # !problem +def my_dag_function(): + + @task + def my_task(): + return "hello" + + my_task() +""" + warnings = self._check_code(code) + assert len(warnings) == 1 + + def test_dag_generated_in_for_or_function_statement(self): + code = """ +from airflow import DAG +from airflow.operators.bash import BashOperator +from datetime import datetime +import pendulum + +def create_dag(dag_id, task_id): + default_args = { + "depends_on_past": False, + "start_date": datetime.now() + } + + with DAG( + dag_id, + default_args=default_args, # !problem + ) as dag: + task1 = BashOperator( + task_id=task_id + ) + + return dag + +now = pendulum.now() +seoul = now.in_timezone('Asia/Seoul') + +for i in [datetime.now(), "3"]: + dag_id = f"dag_{i}_{random.randint(1, 1000)}" + + dag = DAG( + dag_id=dag_id, # !problem + schedule_interval='@daily', + tags=[f"iteration_{i}"], + ) + + task1 = BashOperator( + task_id='print_bash_hello', + bash_command=f'echo "Hello from DAG {i}!"', # !problem + dag=dag, + ) + + task2 = BashOperator( + task_id=f'random_task_{random.randint(1, 100)}', # !problem + bash_command='echo "World"', + dag=dag, + ) + + task3 = BashOperator( + task_id=f'random_task_in_{seoul}', # !problem + bash_command='echo "World"', + dag=dag, + ) + + task1 >> task2 >> task3 +""" + warnings = self._check_code(code) + assert len(warnings) == 5 diff --git a/airflow-ctl/src/airflowctl/api/datamodels/generated.py b/airflow-ctl/src/airflowctl/api/datamodels/generated.py index 6472a722397df..e00fb0cda0ef7 100644 --- a/airflow-ctl/src/airflowctl/api/datamodels/generated.py +++ b/airflow-ctl/src/airflowctl/api/datamodels/generated.py @@ -453,6 +453,7 @@ class DagWarningType(str, Enum): ASSET_CONFLICT = "asset conflict" NON_EXISTENT_POOL = "non-existent pool" + RUNTIME_VARYING_VALUE = "runtime varying value" class DryRunBackfillResponse(BaseModel):