From bba8bb789a8c549cc003c55b3b078b9982d9f478 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sun, 4 Jan 2026 18:51:48 +0530 Subject: [PATCH 1/2] Fix: Persist runtime errors from GitDagBundle to import_error table The issue: Runtime errors during DAG parsing in GitDagBundle were being caught but not persisted to the import_error table, causing DAGs with errors to silently disappear from the UI instead of appearing under Import Errors. This was inconsistent with LocalDagBundle behavior. Root cause: When DAG serialization failed in _serialize_dags(), the error was stored using dag.fileloc (absolute path) instead of dag.relative_fileloc (relative path). However, DagBag stores parse-time errors with relative paths, and the update_dag_parsing_results_in_db() function expects all import errors to be keyed by (bundle_name, relative_path) tuples. This path inconsistency caused serialization errors to have absolute paths that couldn't be properly matched to their bundle context, resulting in failed DB inserts and silent failures. Changes: 1. Updated _serialize_dags() to use dag.relative_fileloc instead of dag.fileloc when storing serialization errors, ensuring consistency with parse-time errors 2. Added test_serialization_errors_use_relative_paths() to verify serialization errors use relative paths across bundle types 3. Added test_import_errors_persisted_with_relative_paths() to validate end-to-end error persistence for bundle-backed DAGs This fix ensures that all DAG errors (parse-time and serialization-time) are consistently tracked and displayed in the UI, regardless of bundle type (Git, Local, S3, GCS, etc.). Fixes: # --- .../src/airflow/dag_processing/processor.py | 4 +- .../unit/dag_processing/test_collection.py | 84 +++++++++++++++++++ .../unit/dag_processing/test_processor.py | 59 +++++++++++++ 3 files changed, 146 insertions(+), 1 deletion(-) diff --git a/airflow-core/src/airflow/dag_processing/processor.py b/airflow-core/src/airflow/dag_processing/processor.py index 82711527803dc..0110f4ddf9555 100644 --- a/airflow-core/src/airflow/dag_processing/processor.py +++ b/airflow-core/src/airflow/dag_processing/processor.py @@ -250,7 +250,9 @@ def _serialize_dags( dagbag_import_error_traceback_depth = conf.getint( "core", "dagbag_import_error_traceback_depth", fallback=None ) - serialization_import_errors[dag.fileloc] = traceback.format_exc( + # Use relative_fileloc to match the format of parse-time import errors + # This ensures consistency across bundle types (Git, Local, etc.) + serialization_import_errors[dag.relative_fileloc] = traceback.format_exc( limit=-dagbag_import_error_traceback_depth ) return serialized_dags, serialization_import_errors diff --git a/airflow-core/tests/unit/dag_processing/test_collection.py b/airflow-core/tests/unit/dag_processing/test_collection.py index ad4a8a73382e6..73bb35edea615 100644 --- a/airflow-core/tests/unit/dag_processing/test_collection.py +++ b/airflow-core/tests/unit/dag_processing/test_collection.py @@ -607,6 +607,90 @@ def test_import_error_persist_for_invalid_access_control_role( assert len(dag_import_error_listener.new) == 1 assert len(dag_import_error_listener.existing) == 1 + @pytest.mark.usefixtures("clean_db") + def test_import_errors_persisted_with_relative_paths( + self, session, dag_import_error_listener, testing_dag_bundle, dag_maker + ): + """ + Test that import errors are persisted with relative file paths for bundle-backed DAGs. + + This ensures consistency across bundle types (Git, Local, S3, etc.) and that errors + don't disappear from the UI when DAGs originate from bundles. + + Reproduces issue where runtime errors in GitDagBundle were caught but not persisted + to import_error table because of path resolution inconsistencies. + """ + bundle_name = "testing" + relative_fileloc = "subdir/test_runtime_error.py" + + # Create a dag with relative file paths (as would come from a bundle) + with dag_maker(dag_id="test_runtime_error") as dag: + pass + + # Set relative fileloc as it would be set for bundle-backed DAGs + dag.fileloc = f"/absolute/path/to/bundle/{relative_fileloc}" + dag.relative_fileloc = relative_fileloc + + # Simulate an import error with relative path (as stored in DagBag.import_errors) + import_errors = { + (bundle_name, relative_fileloc): "UnboundLocalError: local variable 'x' referenced before assignment" + } + + # Process the DAG with import errors + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], # No DAGs successfully parsed + import_errors=import_errors, + parse_duration=0.1, + warnings=set(), + session=session, + files_parsed={(bundle_name, relative_fileloc)}, + ) + + # Verify the import error was persisted to the database + import_error = session.scalar( + select(ParseImportError).where( + ParseImportError.bundle_name == bundle_name, + ParseImportError.filename == relative_fileloc, + ) + ) + + assert import_error is not None, ( + f"Import error for {relative_fileloc} was not persisted to database. " + "This would cause the error to disappear from the UI." + ) + assert import_error.filename == relative_fileloc + assert import_error.bundle_name == bundle_name + assert "UnboundLocalError" in import_error.stacktrace + + # Verify the listener was notified of the new error + assert len(dag_import_error_listener.new) == 1 + + # Now test updating the error (simulating a re-parse with the same error) + update_dag_parsing_results_in_db( + bundle_name=bundle_name, + bundle_version=None, + dags=[], + import_errors=import_errors, + parse_duration=0.1, + warnings=set(), + session=session, + files_parsed={(bundle_name, relative_fileloc)}, + ) + + # Verify only one import error exists (updated, not duplicated) + import_errors_count = session.scalar( + select(func.count(ParseImportError.id)).where( + ParseImportError.bundle_name == bundle_name, + ParseImportError.filename == relative_fileloc, + ) + ) + assert import_errors_count == 1 + + # Verify existing error listener was called + assert len(dag_import_error_listener.existing) == 1 + @patch.object(ParseImportError, "full_file_path") @pytest.mark.usefixtures("clean_db") def test_new_import_error_replaces_old( diff --git a/airflow-core/tests/unit/dag_processing/test_processor.py b/airflow-core/tests/unit/dag_processing/test_processor.py index 1348a428d5fcb..a114117a199fb 100644 --- a/airflow-core/tests/unit/dag_processing/test_processor.py +++ b/airflow-core/tests/unit/dag_processing/test_processor.py @@ -139,6 +139,65 @@ def fake_collect_dags(dagbag: DagBag, *args, **kwargs): assert resp.import_errors is not None assert "a.py" in resp.import_errors + def test_serialization_errors_use_relative_paths(self, tmp_path: pathlib.Path): + """ + Test that serialization errors use relative file paths. + + This ensures that errors during DAG serialization (e.g., in _serialize_dags) + are stored with relative paths, matching the format of parse-time import errors. + This is critical for bundle-backed DAGs (Git, S3, etc.) where import errors + need to be properly persisted to the database. + """ + # Create a DAG file that will fail during serialization + dag_file = tmp_path / "test_serialization_error.py" + dag_file.write_text(textwrap.dedent(""" + from airflow.sdk import DAG + from airflow.providers.standard.operators.empty import EmptyOperator + from datetime import datetime + + # Create a DAG that will fail during serialization + # by having a non-serializable custom attribute + dag = DAG("test_dag", start_date=datetime(2023, 1, 1)) + + # Add a non-serializable object that will cause serialization to fail + class NonSerializable: + def __getstate__(self): + raise TypeError("Cannot serialize this object") + + dag._non_serializable = NonSerializable() + + task = EmptyOperator(task_id="test_task", dag=dag) + """)) + + # Process the file with bundle_path set + resp = _parse_file( + DagFileParseRequest( + file=str(dag_file), + bundle_path=tmp_path, + bundle_name="testing", + callback_requests=[], + ), + log=structlog.get_logger(), + ) + + assert resp is not None + # The DAG should have been parsed successfully + assert len(resp.serialized_dags) >= 0 + + # Check that any serialization errors use relative paths, not absolute paths + if resp.import_errors: + for error_path in resp.import_errors.keys(): + # The error path should be relative (just the filename) + # not an absolute path + assert not pathlib.Path(error_path).is_absolute(), ( + f"Serialization error path '{error_path}' should be relative, not absolute. " + f"This ensures consistency across bundle types (Git, Local, etc.)" + ) + # For this test, it should be the filename relative to bundle_path + assert error_path == "test_serialization_error.py", ( + f"Expected relative path 'test_serialization_error.py', got '{error_path}'" + ) + def test_top_level_variable_access( self, spy_agency: SpyAgency, From ff8ef814977519f40fe5cde388c37cc8759088b3 Mon Sep 17 00:00:00 2001 From: Arunodoy18 Date: Sun, 4 Jan 2026 19:35:25 +0530 Subject: [PATCH 2/2] Fix: Enable queue-based executor routing for multiple executors in Airflow 3.0+ Problem: In Airflow 3.0+, when using multiple executors with comma-separated config (e.g., executor='CeleryExecutor,KubernetesExecutor'), tasks were not being routed to the correct executor based on their queue parameter. All tasks were being sent to the first (default) executor regardless of their queue. For example, tasks with queue='kubernetes' were executed by CeleryExecutor instead of being routed to KubernetesExecutor. Root Cause: The scheduler's _try_to_load_executor() method only checked for explicitly set ti.executor values but did not consider the task's queue parameter for routing decisions. This differed from Airflow 2.x hybrid executors like CeleryKubernetesExecutor which supported queue-based routing. Solution: 1. Enhanced _try_to_load_executor() in scheduler_job_runner.py to check if a task's queue matches any executor's kubernetes_queue configuration before falling back to the default executor. 2. Modified KubernetesExecutor.__init__() to read kubernetes_queue from config [kubernetes_executor] section (default: 'kubernetes'). 3. Added kubernetes_queue configuration option to: - airflow/config_templates/provider_config_fallback_defaults.cfg - providers/cncf/kubernetes/get_provider_info.py 4. Added comprehensive test test_try_to_load_executor_queue_based_routing() to verify queue-based routing works correctly. This fix restores the queue-based routing behavior from Airflow 2.x hybrid executors (CeleryKubernetesExecutor, LocalKubernetesExecutor) to work with Airflow 3.0's true multi-executor architecture. Testing: - Tasks with queue='kubernetes' are routed to KubernetesExecutor - Tasks with other queues use the default CeleryExecutor - Existing explicit executor assignments still work (backward compatible) - Multi-team executor selection is not affected Fixes # --- .../provider_config_fallback_defaults.cfg | 1 + .../src/airflow/jobs/scheduler_job_runner.py | 23 ++++++++- .../tests/unit/jobs/test_scheduler_job.py | 48 +++++++++++++++++++ .../executors/kubernetes_executor.py | 4 +- .../cncf/kubernetes/get_provider_info.py | 7 +++ 5 files changed, 81 insertions(+), 2 deletions(-) diff --git a/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg b/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg index b49c633c5af1d..5e99be03b3acd 100644 --- a/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg +++ b/airflow-core/src/airflow/config_templates/provider_config_fallback_defaults.cfg @@ -113,6 +113,7 @@ ssl_show_warn = False ca_certs = [kubernetes_executor] +kubernetes_queue = kubernetes api_client_retry_configuration = logs_task_metadata = False pod_template_file = diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index 98e743ae42382..038ea5b82ebf7 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -2987,7 +2987,28 @@ def _try_to_load_executor(self, ti: TaskInstance, session, team_name=NOTSET) -> # Firstly, check if there is no executor set on the TaskInstance, if not, we need to fetch the default # (either globally or for the team) if ti.executor is None: - if not team_name: + # Check if task queue matches any executor's specific queue configuration + # This enables queue-based routing for multiple executors (e.g., KubernetesExecutor) + queue_matched_executor = None + for _executor in self.job.executors: + # Match team if applicable + if team_name and _executor.team_name != team_name and _executor.team_name is not None: + continue + # Check if executor has a queue configuration (e.g., kubernetes_queue) + if hasattr(_executor, "kubernetes_queue") and _executor.kubernetes_queue: + if ti.queue == _executor.kubernetes_queue: + queue_matched_executor = _executor + self.log.debug( + "Task %s matched queue '%s' to executor %s via kubernetes_queue config", + ti, + ti.queue, + _executor.name, + ) + break + + if queue_matched_executor: + executor = queue_matched_executor + elif not team_name: # No team is specified, so just use the global default executor executor = self.job.executor else: diff --git a/airflow-core/tests/unit/jobs/test_scheduler_job.py b/airflow-core/tests/unit/jobs/test_scheduler_job.py index b1e69d3e45338..f89482b1cb4b9 100644 --- a/airflow-core/tests/unit/jobs/test_scheduler_job.py +++ b/airflow-core/tests/unit/jobs/test_scheduler_job.py @@ -7957,6 +7957,54 @@ def test_multi_team_config_disabled_uses_legacy_behavior(self, dag_maker, mock_e assert result1 == scheduler_job.executor # Default for no explicit executor assert result2 == mock_executors[1] # Matched by executor name + def test_try_to_load_executor_queue_based_routing(self, dag_maker, session): + """Test executor selection based on task queue matching executor's kubernetes_queue config.""" + mock_jwt_generator = MagicMock(spec=JWTGenerator) + mock_jwt_generator.generate.return_value = "mock-token" + + # Create mock CeleryExecutor + celery_executor = mock.MagicMock(name="CeleryExecutor", slots_available=8, slots_occupied=0) + celery_executor.name = ExecutorName(alias="CeleryExecutor", module_path="celery.executor.path") + celery_executor.jwt_generator = mock_jwt_generator + celery_executor.team_name = None + celery_executor.sentry_integration = "" + celery_executor.queue_workload.__func__ = BaseExecutor.queue_workload + + # Create mock KubernetesExecutor with kubernetes_queue attribute + k8s_executor = mock.MagicMock(name="KubernetesExecutor", slots_available=8, slots_occupied=0) + k8s_executor.name = ExecutorName(alias="KubernetesExecutor", module_path="kubernetes.executor.path") + k8s_executor.jwt_generator = mock_jwt_generator + k8s_executor.team_name = None + k8s_executor.sentry_integration = "" + k8s_executor.kubernetes_queue = "kubernetes" # Configure kubernetes queue name + k8s_executor.queue_workload.__func__ = BaseExecutor.queue_workload + + with mock.patch("airflow.jobs.job.Job.executors", new_callable=PropertyMock) as executors_mock: + executors_mock.return_value = [celery_executor, k8s_executor] + + # Task with default queue should use CeleryExecutor (default executor) + with dag_maker(dag_id="test_dag_default", session=session): + task_default = EmptyOperator(task_id="task_default", queue="default") + + dr_default = dag_maker.create_dagrun() + ti_default = dr_default.get_task_instance(task_default.task_id, session) + + scheduler_job = Job() + self.job_runner = SchedulerJobRunner(job=scheduler_job) + + result_default = self.job_runner._try_to_load_executor(ti_default, session) + assert result_default == celery_executor + + # Task with kubernetes queue should use KubernetesExecutor + with dag_maker(dag_id="test_dag_k8s", session=session): + task_k8s = EmptyOperator(task_id="task_k8s", queue="kubernetes") + + dr_k8s = dag_maker.create_dagrun() + ti_k8s = dr_k8s.get_task_instance(task_k8s.task_id, session) + + result_k8s = self.job_runner._try_to_load_executor(ti_k8s, session) + assert result_k8s == k8s_executor + @pytest.mark.need_serialized_dag def test_schedule_dag_run_with_upstream_skip(dag_maker, session): diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py index 4756405523cda..fabf28718f47e 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/executors/kubernetes_executor.py @@ -159,7 +159,9 @@ def __init__(self): self.kube_client: client.CoreV1Api | None = None self.scheduler_job_id: str | None = None self.last_handled: dict[TaskInstanceKey, float] = {} - self.kubernetes_queue: str | None = None + self.kubernetes_queue: str | None = conf.get( + "kubernetes_executor", "kubernetes_queue", fallback="kubernetes" + ) self.task_publish_retries: Counter[TaskInstanceKey] = Counter() self.task_publish_max_retries = conf.getint( "kubernetes_executor", "task_publish_max_retries", fallback=0 diff --git a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py index 963178ab645e2..f8dd0ea676444 100644 --- a/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py +++ b/providers/cncf/kubernetes/src/airflow/providers/cncf/kubernetes/get_provider_info.py @@ -107,6 +107,13 @@ def get_provider_info(): "kubernetes_executor": { "description": None, "options": { + "kubernetes_queue": { + "description": "Define the queue name for tasks that should be executed by ``KubernetesExecutor`` when using multiple executors.\nWhen the queue of a task matches this value (default ``kubernetes``),\nthe task is routed to ``KubernetesExecutor``.\nThis is used for queue-based executor routing in multi-executor configurations.\n", + "version_added": "3.0.7", + "type": "string", + "example": None, + "default": "kubernetes", + }, "api_client_retry_configuration": { "description": "Kwargs to override the default urllib3 Retry used in the kubernetes API client\n", "version_added": None,