diff --git a/airflow-core/src/airflow/dag_processing/manager.py b/airflow-core/src/airflow/dag_processing/manager.py index ba4715a8ae7a7..2d1c57b71f7b3 100644 --- a/airflow-core/src/airflow/dag_processing/manager.py +++ b/airflow-core/src/airflow/dag_processing/manager.py @@ -62,7 +62,6 @@ from airflow.models.db_callback_request import DbCallbackRequest from airflow.models.errors import ParseImportError from airflow.observability.metrics import stats_utils -from airflow.observability.trace import DebugTrace from airflow.sdk import SecretCache from airflow.sdk.log import init_log_file, logging_processors from airflow.typing_compat import assert_never @@ -1222,17 +1221,9 @@ def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]): This is called once every time around the parsing "loop" - i.e. after all files have been parsed. """ - with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span: - Stats.gauge("dag_processing.total_parse_time", parse_time) - Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats)) - Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats)) - span.set_attributes( - { - "total_parse_time": parse_time, - "dag_bag_size": sum(stat.num_dags for stat in stats), - "import_errors": sum(stat.import_errors for stat in stats), - } - ) + Stats.gauge("dag_processing.total_parse_time", parse_time) + Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats)) + Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats)) def process_parse_results( diff --git a/airflow-core/src/airflow/executors/base_executor.py b/airflow-core/src/airflow/executors/base_executor.py index a57790744b77b..3bb8a70fa2712 100644 --- a/airflow-core/src/airflow/executors/base_executor.py +++ b/airflow-core/src/airflow/executors/base_executor.py @@ -28,14 +28,13 @@ import pendulum from airflow._shared.observability.metrics.stats import Stats -from airflow._shared.observability.traces import NO_TRACE_ID from airflow.cli.cli_config import DefaultHelpParser from airflow.configuration import conf from airflow.executors import workloads from airflow.executors.executor_loader import ExecutorLoader from airflow.models import Log from airflow.observability.metrics import stats_utils -from airflow.observability.trace import DebugTrace, Trace, add_debug_span +from airflow.observability.trace import Trace from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.state import TaskInstanceState from airflow.utils.thread_safe_dict import ThreadSafeDict @@ -263,7 +262,6 @@ def sync(self) -> None: Executors should override this to perform gather statuses. """ - @add_debug_span def heartbeat(self) -> None: """Heartbeat sent to trigger new jobs.""" open_slots = self.parallelism - len(self.running) @@ -350,7 +348,6 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, workload reverse=False, ) - @add_debug_span def trigger_tasks(self, open_slots: int) -> None: """ Initiate async execution of the queued tasks, up to the number of available slots. @@ -433,22 +430,6 @@ def fail(self, key: TaskInstanceKey, info=None) -> None: :param info: Executor information for the task instance :param key: Unique key for the task instance """ - trace_id = Trace.get_current_span().get_span_context().trace_id - if trace_id != NO_TRACE_ID: - with DebugTrace.start_child_span( - span_name="fail", - component="BaseExecutor", - ) as span: - span.set_attributes( - { - "dag_id": key.dag_id, - "run_id": key.run_id, - "task_id": key.task_id, - "try_number": key.try_number, - "error": True, - } - ) - self.change_state(key, TaskInstanceState.FAILED, info) def success(self, key: TaskInstanceKey, info=None) -> None: @@ -458,21 +439,6 @@ def success(self, key: TaskInstanceKey, info=None) -> None: :param info: Executor information for the task instance :param key: Unique key for the task instance """ - trace_id = Trace.get_current_span().get_span_context().trace_id - if trace_id != NO_TRACE_ID: - with DebugTrace.start_child_span( - span_name="success", - component="BaseExecutor", - ) as span: - span.set_attributes( - { - "dag_id": key.dag_id, - "run_id": key.run_id, - "task_id": key.task_id, - "try_number": key.try_number, - } - ) - self.change_state(key, TaskInstanceState.SUCCESS, info) def queued(self, key: TaskInstanceKey, info=None) -> None: diff --git a/airflow-core/src/airflow/jobs/job.py b/airflow-core/src/airflow/jobs/job.py index 00b8299dee4b9..682a3d52574ce 100644 --- a/airflow-core/src/airflow/jobs/job.py +++ b/airflow-core/src/airflow/jobs/job.py @@ -35,7 +35,6 @@ from airflow.exceptions import AirflowException from airflow.listeners.listener import get_listener_manager from airflow.models.base import ID_LEN, Base -from airflow.observability.trace import DebugTrace, add_debug_span from airflow.utils.helpers import convert_camel_to_snake from airflow.utils.log.logging_mixin import LoggingMixin from airflow.utils.net import get_hostname @@ -210,68 +209,61 @@ def heartbeat( :param session to use for saving the job """ previous_heartbeat = self.latest_heartbeat - with DebugTrace.start_span(span_name="heartbeat", component="Job") as span: - try: - span.set_attribute("heartbeat", str(self.latest_heartbeat)) - # This will cause it to load from the db + try: + # This will cause it to load from the db + session.merge(self) + previous_heartbeat = self.latest_heartbeat + + if self.state == JobState.RESTARTING: + self.kill() + + # Figure out how long to sleep for + sleep_for: float = 0 + if self.latest_heartbeat: + seconds_remaining = ( + self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() + ) + sleep_for = max(0, seconds_remaining) + sleep(sleep_for) + # Update last heartbeat time + with create_session() as session: + # Make the session aware of this object session.merge(self) + self.latest_heartbeat = timezone.utcnow() + session.commit() + time_since_last_heartbeat: float = ( + 0 + if previous_heartbeat is None + else (timezone.utcnow() - previous_heartbeat).total_seconds() + ) + health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) + if time_since_last_heartbeat > health_check_threshold_value: + self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) + # At this point, the DB has updated. previous_heartbeat = self.latest_heartbeat - - if self.state == JobState.RESTARTING: - self.kill() - - # Figure out how long to sleep for - sleep_for: float = 0 - if self.latest_heartbeat: - seconds_remaining = ( - self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds() - ) - sleep_for = max(0, seconds_remaining) - if span.is_recording(): - span.add_event(name="sleep", attributes={"sleep_for": sleep_for}) - sleep(sleep_for) - # Update last heartbeat time - with create_session() as session: - # Make the session aware of this object - session.merge(self) - self.latest_heartbeat = timezone.utcnow() - session.commit() - time_since_last_heartbeat: float = ( - 0 - if previous_heartbeat is None - else (timezone.utcnow() - previous_heartbeat).total_seconds() - ) - health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate) - if time_since_last_heartbeat > health_check_threshold_value: - self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat) - # At this point, the DB has updated. - previous_heartbeat = self.latest_heartbeat - heartbeat_callback(session) - self.log.debug("[heartbeat]") - self.heartbeat_failed = False - except OperationalError: - Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) - if not self.heartbeat_failed: - self.log.exception("%s heartbeat failed with error", self.__class__.__name__) - self.heartbeat_failed = True - msg = f"{self.__class__.__name__} heartbeat got an exception" - if span.is_recording(): - span.add_event(name="error", attributes={"message": msg}) - if self.is_alive(): - self.log.error( - "%s heartbeat failed with error. Scheduler may go into unhealthy state", - self.__class__.__name__, - ) - msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state" - if span.is_recording(): - span.add_event(name="error", attributes={"message": msg}) - else: - msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state" - self.log.error(msg) - if span.is_recording(): - span.add_event(name="error", attributes={"message": msg}) - # We didn't manage to heartbeat, so make sure that the timestamp isn't updated - self.latest_heartbeat = previous_heartbeat + heartbeat_callback(session) + self.log.debug("[heartbeat]") + self.heartbeat_failed = False + except OperationalError: + Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1) + if not self.heartbeat_failed: + self.log.exception("%s heartbeat failed with error", self.__class__.__name__) + self.heartbeat_failed = True + msg = f"{self.__class__.__name__} heartbeat got an exception" + self.log.error(msg) + if self.is_alive(): + self.log.error( + "%s heartbeat failed with error. Scheduler may go into unhealthy state", + self.__class__.__name__, + ) + msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state" + else: + msg = ( + f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state" + ) + self.log.error(msg) + # We didn't manage to heartbeat, so make sure that the timestamp isn't updated + self.latest_heartbeat = previous_heartbeat @provide_session def prepare_for_execution(self, session: Session = NEW_SESSION): @@ -401,7 +393,6 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N return ret -@add_debug_span def perform_heartbeat( job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool ) -> None: diff --git a/airflow-core/src/airflow/jobs/scheduler_job_runner.py b/airflow-core/src/airflow/jobs/scheduler_job_runner.py index c221d04cb1833..4117153d4e7cd 100644 --- a/airflow-core/src/airflow/jobs/scheduler_job_runner.py +++ b/airflow-core/src/airflow/jobs/scheduler_job_runner.py @@ -98,7 +98,7 @@ from airflow.models.team import Team from airflow.models.trigger import TRIGGER_FAIL_REPR, Trigger, TriggerFailureReason from airflow.observability.metrics import stats_utils -from airflow.observability.trace import DebugTrace, Trace, add_debug_span +from airflow.observability.trace import Trace from airflow.serialization.definitions.assets import SerializedAssetUniqueKey from airflow.serialization.definitions.notset import NOTSET from airflow.ti_deps.dependencies_states import EXECUTION_STATES @@ -1616,17 +1616,7 @@ def _run_scheduler_loop(self) -> None: ) for loop_count in itertools.count(start=1): - with ( - DebugTrace.start_span(span_name="scheduler_job_loop", component="SchedulerJobRunner") as span, - Stats.timer("scheduler.scheduler_loop_duration") as timer, - ): - span.set_attributes( - { - "category": "scheduler", - "loop_count": loop_count, - } - ) - + with Stats.timer("scheduler.scheduler_loop_duration") as timer: with create_session() as session: if self._is_tracing_enabled(): self._end_spans_of_externally_ended_ops(session) @@ -1677,13 +1667,6 @@ def _run_scheduler_loop(self) -> None: self.log.debug("Next timed event is in %f", next_event) self.log.debug("Ran scheduling loop in %.2f ms", timer.duration) - if span.is_recording(): - span.add_event( - name="Ran scheduling loop", - attributes={ - "duration in ms": timer.duration, - }, - ) if not is_unit_test and not num_queued_tis and not num_finished_events: # If the scheduler is doing things, don't sleep. This means when there is work to do, the @@ -1697,8 +1680,6 @@ def _run_scheduler_loop(self) -> None: self.num_runs, loop_count, ) - if span.is_recording(): - span.add_event("Exiting scheduler loop as requested number of runs has been reached") break def _do_scheduling(self, session: Session) -> int: @@ -1908,7 +1889,6 @@ def _mark_backfills_complete(self, session: Session = NEW_SESSION) -> None: for b in backfills: b.completed_at = now - @add_debug_span def _create_dag_runs(self, dag_models: Collection[DagModel], session: Session) -> None: """Create a DAG run and update the dag_model to control if/when the next DAGRun should be created.""" # Bulk Fetch DagRuns with dag_id and logical_date same @@ -2145,7 +2125,6 @@ def _lock_backfills(self, dag_runs: Collection[DagRun], session: Session) -> dic return locked_backfills - @add_debug_span def _start_queued_dagruns(self, session: Session) -> None: """Find DagRuns in queued state and decide moving them to running state.""" dag_runs: Collection[DagRun] = list(DagRun.get_queued_dag_runs_to_set_running(session)) @@ -2164,7 +2143,6 @@ def _start_queued_dagruns(self, session: Session) -> None: ) active_runs_of_dags = Counter({(dag_id, br_id): num for dag_id, br_id, num in session.execute(query)}) - @add_debug_span def _update_state(dag: SerializedDAG, dag_run: DagRun): span = Trace.get_current_span() span.set_attributes( @@ -2279,150 +2257,121 @@ def _schedule_dag_run( :param dag_run: The DagRun to schedule :return: Callback that needs to be executed """ - with DebugTrace.start_root_span( - span_name="_schedule_dag_run", component="SchedulerJobRunner" - ) as span: - span.set_attributes( - { - "dag_id": dag_run.dag_id, - "run_id": dag_run.run_id, - "run_type": dag_run.run_type, - } - ) - callback: DagCallbackRequest | None = None + callback: DagCallbackRequest | None = None - dag = dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, session=session) - dag_model = DM.get_dagmodel(dag_run.dag_id, session) - if not dag_model: - self.log.error("Couldn't find DAG model %s in database!", dag_run.dag_id) - return callback + dag = dag_run.dag = self.scheduler_dag_bag.get_dag_for_run(dag_run=dag_run, session=session) + dag_model = DM.get_dagmodel(dag_run.dag_id, session) + if not dag_model: + self.log.error("Couldn't find DAG model %s in database!", dag_run.dag_id) + return callback - if not dag: - self.log.error("Couldn't find DAG %s in DAG bag!", dag_run.dag_id) - return callback + if not dag: + self.log.error("Couldn't find DAG %s in DAG bag!", dag_run.dag_id) + return callback - if ( - dag_run.start_date - and dag.dagrun_timeout - and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout + if ( + dag_run.start_date + and dag.dagrun_timeout + and dag_run.start_date < timezone.utcnow() - dag.dagrun_timeout + ): + dag_run.set_state(DagRunState.FAILED) + unfinished_task_instances = session.scalars( + select(TI) + .where(TI.dag_id == dag_run.dag_id) + .where(TI.run_id == dag_run.run_id) + .where(TI.state.in_(State.unfinished)) + ) + for task_instance in unfinished_task_instances: + task_instance.state = TaskInstanceState.SKIPPED + session.merge(task_instance) + session.flush() + self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) + + if dag_run.state in State.finished_dr_states and dag_run.run_type in ( + DagRunType.SCHEDULED, + DagRunType.MANUAL, + DagRunType.ASSET_TRIGGERED, ): - dag_run.set_state(DagRunState.FAILED) - unfinished_task_instances = session.scalars( - select(TI) - .where(TI.dag_id == dag_run.dag_id) - .where(TI.run_id == dag_run.run_id) - .where(TI.state.in_(State.unfinished)) - ) - for task_instance in unfinished_task_instances: - task_instance.state = TaskInstanceState.SKIPPED - session.merge(task_instance) - session.flush() - self.log.info("Run %s of %s has timed-out", dag_run.run_id, dag_run.dag_id) - - if dag_run.state in State.finished_dr_states and dag_run.run_type in ( - DagRunType.SCHEDULED, - DagRunType.MANUAL, - DagRunType.ASSET_TRIGGERED, - ): - self._set_exceeds_max_active_runs(dag_model=dag_model, session=session) + self._set_exceeds_max_active_runs(dag_model=dag_model, session=session) - dag_run_reloaded = session.scalar( - select(DagRun) - .where(DagRun.id == dag_run.id) - .options( - selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset), - selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases), - ) - ) - if dag_run_reloaded is None: - # This should never happen since we just had the dag_run - self.log.error("DagRun %s was deleted unexpectedly", dag_run.id) - return None - dag_run = dag_run_reloaded - callback_to_execute = DagCallbackRequest( - filepath=dag_model.relative_fileloc or "", - dag_id=dag.dag_id, - run_id=dag_run.run_id, - bundle_name=dag_model.bundle_name, - bundle_version=dag_run.bundle_version, - context_from_server=DagRunContext( - dag_run=dag_run, - last_ti=dag_run.get_last_ti(dag=dag, session=session), - ), - is_failure_callback=True, - msg="timed_out", + dag_run_reloaded = session.scalar( + select(DagRun) + .where(DagRun.id == dag_run.id) + .options( + selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.asset), + selectinload(DagRun.consumed_asset_events).selectinload(AssetEvent.source_aliases), ) + ) + if dag_run_reloaded is None: + # This should never happen since we just had the dag_run + self.log.error("DagRun %s was deleted unexpectedly", dag_run.id) + return None + dag_run = dag_run_reloaded + callback_to_execute = DagCallbackRequest( + filepath=dag_model.relative_fileloc or "", + dag_id=dag.dag_id, + run_id=dag_run.run_id, + bundle_name=dag_model.bundle_name, + bundle_version=dag_run.bundle_version, + context_from_server=DagRunContext( + dag_run=dag_run, + last_ti=dag_run.get_last_ti(dag=dag, session=session), + ), + is_failure_callback=True, + msg="timed_out", + ) - dag_run.notify_dagrun_state_changed(msg="timed_out") - if dag_run.end_date and dag_run.start_date: - duration = dag_run.end_date - dag_run.start_date - DualStatsManager.timing( - "dagrun.duration.failed", - duration, - tags={}, - extra_tags={"dag_id": dag_run.dag_id}, - ) - span.set_attribute("error", True) - if span.is_recording(): - span.add_event( - name="error", - attributes={ - "message": f"Run {dag_run.run_id} of {dag_run.dag_id} has timed-out", - "duration": str(duration), - }, - ) - return callback_to_execute + dag_run.notify_dagrun_state_changed(msg="timed_out") + if dag_run.end_date and dag_run.start_date: + duration = dag_run.end_date - dag_run.start_date + DualStatsManager.timing( + "dagrun.duration.failed", + duration, + tags={}, + extra_tags={"dag_id": dag_run.dag_id}, + ) + return callback_to_execute - if dag_run.logical_date and dag_run.logical_date > timezone.utcnow(): - self.log.error("Logical date is in future: %s", dag_run.logical_date) - return callback + if dag_run.logical_date and dag_run.logical_date > timezone.utcnow(): + self.log.error("Logical date is in future: %s", dag_run.logical_date) + return callback - if not dag_run.bundle_version and not self._verify_integrity_if_dag_changed( - dag_run=dag_run, session=session - ): - self.log.warning( - "The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id - ) - return callback + if not dag_run.bundle_version and not self._verify_integrity_if_dag_changed( + dag_run=dag_run, session=session + ): + self.log.warning("The DAG disappeared before verifying integrity: %s. Skipping.", dag_run.dag_id) + return callback - if ( - self._is_tracing_enabled() - and dag_run.scheduled_by_job_id is not None - and dag_run.scheduled_by_job_id != self.job.id - and self.active_spans.get("dr:" + str(dag_run.id)) is None - ): - # If the dag_run has been previously scheduled by another job and there is no active span, - # then check if the job is still healthy. - # If it's not healthy, then recreate the spans. - self._recreate_unhealthy_scheduler_spans_if_needed(dag_run, session) + if ( + self._is_tracing_enabled() + and dag_run.scheduled_by_job_id is not None + and dag_run.scheduled_by_job_id != self.job.id + and self.active_spans.get("dr:" + str(dag_run.id)) is None + ): + # If the dag_run has been previously scheduled by another job and there is no active span, + # then check if the job is still healthy. + # If it's not healthy, then recreate the spans. + self._recreate_unhealthy_scheduler_spans_if_needed(dag_run, session) - dag_run.scheduled_by_job_id = self.job.id + dag_run.scheduled_by_job_id = self.job.id - # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? - schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) + # TODO[HA]: Rename update_state -> schedule_dag_run, ?? something else? + schedulable_tis, callback_to_run = dag_run.update_state(session=session, execute_callbacks=False) - if dag_run.state in State.finished_dr_states and dag_run.run_type in ( - DagRunType.SCHEDULED, - DagRunType.MANUAL, - DagRunType.ASSET_TRIGGERED, - ): - self._set_exceeds_max_active_runs(dag_model=dag_model, session=session) + if dag_run.state in State.finished_dr_states and dag_run.run_type in ( + DagRunType.SCHEDULED, + DagRunType.MANUAL, + DagRunType.ASSET_TRIGGERED, + ): + self._set_exceeds_max_active_runs(dag_model=dag_model, session=session) - # This will do one query per dag run. We "could" build up a complex - # query to update all the TIs across all the logical dates and dag - # IDs in a single query, but it turns out that can be _very very slow_ - # see #11147/commit ee90807ac for more details - if span.is_recording(): - span.add_event( - name="schedule_tis", - attributes={ - "message": "dag_run scheduling its tis", - "schedulable_tis": [_ti.task_id for _ti in schedulable_tis], - }, - ) - dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) + # This will do one query per dag run. We "could" build up a complex + # query to update all the TIs across all the logical dates and dag + # IDs in a single query, but it turns out that can be _very very slow_ + # see #11147/commit ee90807ac for more details + dag_run.schedule_tis(schedulable_tis, session, max_tis_per_query=self.job.max_tis_per_query) - return callback_to_run + return callback_to_run def _verify_integrity_if_dag_changed(self, dag_run: DagRun, session: Session) -> bool: """ @@ -2688,51 +2637,39 @@ def _emit_running_dags_metric(self, session: Session = NEW_SESSION) -> None: def _emit_pool_metrics(self, session: Session = NEW_SESSION) -> None: from airflow.models.pool import Pool - with DebugTrace.start_span(span_name="emit_pool_metrics", component="SchedulerJobRunner") as span: - pools = Pool.slots_stats(session=session) - for pool_name, slot_stats in pools.items(): - normalized_pool_name = normalize_pool_name_for_stats(pool_name) - DualStatsManager.gauge( - "pool.open_slots", - slot_stats["open"], - tags={}, - extra_tags={"pool_name": normalized_pool_name}, - ) - DualStatsManager.gauge( - "pool.queued_slots", - slot_stats["queued"], - tags={}, - extra_tags={"pool_name": normalized_pool_name}, - ) - DualStatsManager.gauge( - "pool.running_slots", - slot_stats["running"], - tags={}, - extra_tags={"pool_name": normalized_pool_name}, - ) - DualStatsManager.gauge( - "pool.deferred_slots", - slot_stats["deferred"], - tags={}, - extra_tags={"pool_name": normalized_pool_name}, - ) - DualStatsManager.gauge( - "pool.scheduled_slots", - slot_stats["scheduled"], - tags={}, - extra_tags={"pool_name": normalized_pool_name}, - ) - - span.set_attributes( - { - "category": "scheduler", - f"pool.open_slots.{normalized_pool_name}": slot_stats["open"], - f"pool.queued_slots.{normalized_pool_name}": slot_stats["queued"], - f"pool.running_slots.{normalized_pool_name}": slot_stats["running"], - f"pool.deferred_slots.{normalized_pool_name}": slot_stats["deferred"], - f"pool.scheduled_slots.{normalized_pool_name}": slot_stats["scheduled"], - } - ) + pools = Pool.slots_stats(session=session) + for pool_name, slot_stats in pools.items(): + normalized_pool_name = normalize_pool_name_for_stats(pool_name) + DualStatsManager.gauge( + "pool.open_slots", + slot_stats["open"], + tags={}, + extra_tags={"pool_name": normalized_pool_name}, + ) + DualStatsManager.gauge( + "pool.queued_slots", + slot_stats["queued"], + tags={}, + extra_tags={"pool_name": normalized_pool_name}, + ) + DualStatsManager.gauge( + "pool.running_slots", + slot_stats["running"], + tags={}, + extra_tags={"pool_name": normalized_pool_name}, + ) + DualStatsManager.gauge( + "pool.deferred_slots", + slot_stats["deferred"], + tags={}, + extra_tags={"pool_name": normalized_pool_name}, + ) + DualStatsManager.gauge( + "pool.scheduled_slots", + slot_stats["scheduled"], + tags={}, + extra_tags={"pool_name": normalized_pool_name}, + ) @provide_session def adopt_or_reset_orphaned_tasks(self, session: Session = NEW_SESSION) -> int: diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index ba083b8fad3cb..be8213c423fd6 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -49,7 +49,7 @@ from airflow.jobs.job import perform_heartbeat from airflow.models.trigger import Trigger from airflow.observability.metrics import stats_utils -from airflow.observability.trace import DebugTrace, Trace, add_debug_span +from airflow.observability.trace import Trace from airflow.sdk.api.datamodels._generated import HITLDetailResponse from airflow.sdk.execution_time.comms import ( CommsDecoder, @@ -545,18 +545,17 @@ def run(self) -> None: if not self.is_alive(): log.error("Trigger runner process has died! Exiting.") break - with DebugTrace.start_span(span_name="triggerer_job_loop", component="TriggererJobRunner"): - self.load_triggers() + self.load_triggers() - # Wait for up to 1 second for activity - self._service_subprocess(1) + # Wait for up to 1 second for activity + self._service_subprocess(1) - self.handle_events() - self.handle_failed_triggers() - self.clean_unused() - self.heartbeat() + self.handle_events() + self.handle_failed_triggers() + self.clean_unused() + self.heartbeat() - self.emit_metrics() + self.emit_metrics() def heartbeat(self): perform_heartbeat(self.job, heartbeat_callback=self.heartbeat_callback, only_if_necessary=True) @@ -564,7 +563,6 @@ def heartbeat(self): def heartbeat_callback(self, session: Session | None = None) -> None: Stats.incr("triggerer_heartbeat", 1, 1) - @add_debug_span def load_triggers(self): """Query the database for the triggers we're supposed to be running and update the runner.""" Trigger.assign_unassigned( @@ -576,7 +574,6 @@ def load_triggers(self): ids = Trigger.ids_for_triggerer(self.job.id, queues=self.queues) self.update_triggers(set(ids)) - @add_debug_span def handle_events(self): """Dispatch outbound events to the Trigger model which pushes them to the relevant task instances.""" while self.events: @@ -587,12 +584,10 @@ def handle_events(self): # Emit stat event Stats.incr("triggers.succeeded") - @add_debug_span def clean_unused(self): """Clean out unused or finished triggers.""" Trigger.clean_unused() - @add_debug_span def handle_failed_triggers(self): """ Handle "failed" triggers. - ones that errored or exited before they sent an event. diff --git a/airflow-core/src/airflow/observability/trace.py b/airflow-core/src/airflow/observability/trace.py index bb074be0152b7..6033502c1dcd7 100644 --- a/airflow-core/src/airflow/observability/trace.py +++ b/airflow-core/src/airflow/observability/trace.py @@ -18,7 +18,6 @@ import logging from collections.abc import Callable -from functools import wraps from socket import socket from typing import TYPE_CHECKING @@ -28,30 +27,11 @@ log = logging.getLogger(__name__) -def add_debug_span(func): - """Decorate a function with span.""" - func_name = func.__name__ - qual_name = func.__qualname__ - module_name = func.__module__ - component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name - - @wraps(func) - def wrapper(*args, **kwargs): - with DebugTrace.start_span(span_name=func_name, component=component): - return func(*args, **kwargs) - - return wrapper - - class _TraceMeta(type): factory: Callable[[], Tracer] | None = None instance: Tracer | EmptyTrace | None = None def __new__(cls, name, bases, attrs): - # Read the debug flag from the class body. - if "check_debug_traces_flag" not in attrs: - raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") - return super().__new__(cls, name, bases, attrs) def __getattr__(cls, name: str): @@ -80,15 +60,7 @@ def configure_factory(cls): """Configure the trace factory based on settings.""" otel_on = conf.getboolean("traces", "otel_on") - if cls.check_debug_traces_flag: - debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") - else: - # Set to true so that it will be ignored during the evaluation for the factory instance. - # If this is true, then (otel_on and debug_traces_on) will always evaluate to - # whatever value 'otel_on' has and therefore it will be ignored. - debug_traces_on = True - - if otel_on and debug_traces_on: + if otel_on: from airflow.observability.traces import otel_tracer cls.factory = staticmethod( @@ -108,15 +80,7 @@ def get_constant_tags(cls) -> str | None: if TYPE_CHECKING: Trace: EmptyTrace - DebugTrace: EmptyTrace else: class Trace(metaclass=_TraceMeta): """Empty class for Trace - we use metaclass to inject the right one.""" - - check_debug_traces_flag = False - - class DebugTrace(metaclass=_TraceMeta): - """Empty class for Trace and in case the debug traces flag is enabled.""" - - check_debug_traces_flag = True diff --git a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py index 3b6207aefe7fa..3ca8e07f5d55f 100644 --- a/airflow-core/tests/unit/observability/traces/test_otel_tracer.py +++ b/airflow-core/tests/unit/observability/traces/test_otel_tracer.py @@ -28,7 +28,7 @@ from airflow._shared.observability.traces.base_tracer import EmptyTrace from airflow._shared.observability.traces.otel_tracer import OtelTrace from airflow._shared.observability.traces.utils import datetime_to_nano -from airflow.observability.trace import DebugTrace, Trace +from airflow.observability.trace import Trace from airflow.observability.traces import otel_tracer from tests_common.test_utils.config import env_vars @@ -63,26 +63,6 @@ def test_get_otel_tracer_from_trace_metaclass(self): task_tracer.get_otel_tracer_provider() assert task_tracer.use_simple_processor is True - @env_vars( - { - "AIRFLOW__TRACES__OTEL_ON": "True", - "OTEL_EXPORTER_OTLP_ENDPOINT": "http://localhost:4318", - "OTEL_TRACES_EXPORTER": "otlp", - } - ) - def test_debug_trace_metaclass(self): - """Test that `DebugTrace.some_method()`, uses the correct instance when the debug_traces flag is configured.""" - assert DebugTrace.check_debug_traces_flag is True - - # Factory hasn't been configured, it defaults to EmptyTrace. - assert not isinstance(DebugTrace.factory(), OtelTrace) - assert isinstance(DebugTrace.factory(), EmptyTrace) - - DebugTrace.configure_factory() - # Factory has been configured, it should still be EmptyTrace. - assert not isinstance(DebugTrace.factory(), OtelTrace) - assert isinstance(DebugTrace.factory(), EmptyTrace) - @patch("opentelemetry.sdk.trace.export.ConsoleSpanExporter") @patch("airflow._shared.observability.otel_env_config.OtelEnvConfig") @env_vars( diff --git a/task-sdk/src/airflow/sdk/observability/trace.py b/task-sdk/src/airflow/sdk/observability/trace.py index 7f3d44710592d..71dfa7a4f3e3a 100644 --- a/task-sdk/src/airflow/sdk/observability/trace.py +++ b/task-sdk/src/airflow/sdk/observability/trace.py @@ -18,7 +18,6 @@ import logging from collections.abc import Callable -from functools import wraps from socket import socket from typing import TYPE_CHECKING @@ -28,30 +27,11 @@ log = logging.getLogger(__name__) -def add_debug_span(func): - """Decorate a function with span.""" - func_name = func.__name__ - qual_name = func.__qualname__ - module_name = func.__module__ - component = qual_name.rsplit(".", 1)[0] if "." in qual_name else module_name - - @wraps(func) - def wrapper(*args, **kwargs): - with DebugTrace.start_span(span_name=func_name, component=component): - return func(*args, **kwargs) - - return wrapper - - class _TraceMeta(type): factory: Callable[[], Tracer] | None = None instance: Tracer | EmptyTrace | None = None def __new__(cls, name, bases, attrs): - # Read the debug flag from the class body. - if "check_debug_traces_flag" not in attrs: - raise TypeError(f"Class '{name}' must define 'check_debug_traces_flag'.") - return super().__new__(cls, name, bases, attrs) def __getattr__(cls, name: str): @@ -80,15 +60,7 @@ def configure_factory(cls): """Configure the trace factory based on settings.""" otel_on = conf.getboolean("traces", "otel_on") - if cls.check_debug_traces_flag: - debug_traces_on = conf.getboolean("traces", "otel_debug_traces_on") - else: - # Set to true so that it will be ignored during the evaluation for the factory instance. - # If this is true, then (otel_on and debug_traces_on) will always evaluate to - # whatever value 'otel_on' has and therefore it will be ignored. - debug_traces_on = True - - if otel_on and debug_traces_on: + if otel_on: from airflow.sdk.observability.traces import otel_tracer cls.factory = staticmethod( @@ -108,15 +80,7 @@ def get_constant_tags(cls) -> str | None: if TYPE_CHECKING: Trace: EmptyTrace - DebugTrace: EmptyTrace else: class Trace(metaclass=_TraceMeta): """Empty class for Trace - we use metaclass to inject the right one.""" - - check_debug_traces_flag = False - - class DebugTrace(metaclass=_TraceMeta): - """Empty class for Trace and in case the debug traces flag is enabled.""" - - check_debug_traces_flag = True