Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions airflow-core/src/airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
from airflow.models.db_callback_request import DbCallbackRequest
from airflow.models.errors import ParseImportError
from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace
from airflow.sdk import SecretCache
from airflow.sdk.log import init_log_file, logging_processors
from airflow.typing_compat import assert_never
Expand Down Expand Up @@ -1222,17 +1221,9 @@ def emit_metrics(*, parse_time: float, stats: Sequence[DagFileStat]):
This is called once every time around the parsing "loop" - i.e. after
all files have been parsed.
"""
with DebugTrace.start_span(span_name="emit_metrics", component="DagFileProcessorManager") as span:
Stats.gauge("dag_processing.total_parse_time", parse_time)
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats))
span.set_attributes(
{
"total_parse_time": parse_time,
"dag_bag_size": sum(stat.num_dags for stat in stats),
"import_errors": sum(stat.import_errors for stat in stats),
}
)
Stats.gauge("dag_processing.total_parse_time", parse_time)
Stats.gauge("dagbag_size", sum(stat.num_dags for stat in stats))
Stats.gauge("dag_processing.import_errors", sum(stat.import_errors for stat in stats))


def process_parse_results(
Expand Down
36 changes: 1 addition & 35 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import pendulum

from airflow._shared.observability.metrics.stats import Stats
from airflow._shared.observability.traces import NO_TRACE_ID
from airflow.cli.cli_config import DefaultHelpParser
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.executor_loader import ExecutorLoader
from airflow.models import Log
from airflow.observability.metrics import stats_utils
from airflow.observability.trace import DebugTrace, Trace, add_debug_span
from airflow.observability.trace import Trace
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict
Expand Down Expand Up @@ -263,7 +262,6 @@ def sync(self) -> None:
Executors should override this to perform gather statuses.
"""

@add_debug_span
def heartbeat(self) -> None:
"""Heartbeat sent to trigger new jobs."""
open_slots = self.parallelism - len(self.running)
Expand Down Expand Up @@ -350,7 +348,6 @@ def order_queued_tasks_by_priority(self) -> list[tuple[TaskInstanceKey, workload
reverse=False,
)

@add_debug_span
def trigger_tasks(self, open_slots: int) -> None:
"""
Initiate async execution of the queued tasks, up to the number of available slots.
Expand Down Expand Up @@ -433,22 +430,6 @@ def fail(self, key: TaskInstanceKey, info=None) -> None:
:param info: Executor information for the task instance
:param key: Unique key for the task instance
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
with DebugTrace.start_child_span(
span_name="fail",
component="BaseExecutor",
) as span:
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
"error": True,
}
)

self.change_state(key, TaskInstanceState.FAILED, info)

def success(self, key: TaskInstanceKey, info=None) -> None:
Expand All @@ -458,21 +439,6 @@ def success(self, key: TaskInstanceKey, info=None) -> None:
:param info: Executor information for the task instance
:param key: Unique key for the task instance
"""
trace_id = Trace.get_current_span().get_span_context().trace_id
if trace_id != NO_TRACE_ID:
with DebugTrace.start_child_span(
span_name="success",
component="BaseExecutor",
) as span:
span.set_attributes(
{
"dag_id": key.dag_id,
"run_id": key.run_id,
"task_id": key.task_id,
"try_number": key.try_number,
}
)

self.change_state(key, TaskInstanceState.SUCCESS, info)

def queued(self, key: TaskInstanceKey, info=None) -> None:
Expand Down
115 changes: 53 additions & 62 deletions airflow-core/src/airflow/jobs/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
from airflow.exceptions import AirflowException
from airflow.listeners.listener import get_listener_manager
from airflow.models.base import ID_LEN, Base
from airflow.observability.trace import DebugTrace, add_debug_span
from airflow.utils.helpers import convert_camel_to_snake
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.net import get_hostname
Expand Down Expand Up @@ -210,68 +209,61 @@ def heartbeat(
:param session to use for saving the job
"""
previous_heartbeat = self.latest_heartbeat
with DebugTrace.start_span(span_name="heartbeat", component="Job") as span:
try:
span.set_attribute("heartbeat", str(self.latest_heartbeat))
# This will cause it to load from the db
try:
# This will cause it to load from the db
session.merge(self)
previous_heartbeat = self.latest_heartbeat

if self.state == JobState.RESTARTING:
self.kill()

# Figure out how long to sleep for
sleep_for: float = 0
if self.latest_heartbeat:
seconds_remaining = (
self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
)
sleep_for = max(0, seconds_remaining)
sleep(sleep_for)
# Update last heartbeat time
with create_session() as session:
# Make the session aware of this object
session.merge(self)
self.latest_heartbeat = timezone.utcnow()
session.commit()
time_since_last_heartbeat: float = (
0
if previous_heartbeat is None
else (timezone.utcnow() - previous_heartbeat).total_seconds()
)
health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
if time_since_last_heartbeat > health_check_threshold_value:
self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
# At this point, the DB has updated.
previous_heartbeat = self.latest_heartbeat

if self.state == JobState.RESTARTING:
self.kill()

# Figure out how long to sleep for
sleep_for: float = 0
if self.latest_heartbeat:
seconds_remaining = (
self.heartrate - (timezone.utcnow() - self.latest_heartbeat).total_seconds()
)
sleep_for = max(0, seconds_remaining)
if span.is_recording():
span.add_event(name="sleep", attributes={"sleep_for": sleep_for})
sleep(sleep_for)
# Update last heartbeat time
with create_session() as session:
# Make the session aware of this object
session.merge(self)
self.latest_heartbeat = timezone.utcnow()
session.commit()
time_since_last_heartbeat: float = (
0
if previous_heartbeat is None
else (timezone.utcnow() - previous_heartbeat).total_seconds()
)
health_check_threshold_value = health_check_threshold(self.job_type, self.heartrate)
if time_since_last_heartbeat > health_check_threshold_value:
self.log.info("Heartbeat recovered after %.2f seconds", time_since_last_heartbeat)
# At this point, the DB has updated.
previous_heartbeat = self.latest_heartbeat
heartbeat_callback(session)
self.log.debug("[heartbeat]")
self.heartbeat_failed = False
except OperationalError:
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
if not self.heartbeat_failed:
self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
self.heartbeat_failed = True
msg = f"{self.__class__.__name__} heartbeat got an exception"
if span.is_recording():
span.add_event(name="error", attributes={"message": msg})
if self.is_alive():
self.log.error(
"%s heartbeat failed with error. Scheduler may go into unhealthy state",
self.__class__.__name__,
)
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state"
if span.is_recording():
span.add_event(name="error", attributes={"message": msg})
else:
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state"
self.log.error(msg)
if span.is_recording():
span.add_event(name="error", attributes={"message": msg})
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat
heartbeat_callback(session)
self.log.debug("[heartbeat]")
self.heartbeat_failed = False
except OperationalError:
Stats.incr(convert_camel_to_snake(self.__class__.__name__) + "_heartbeat_failure", 1, 1)
if not self.heartbeat_failed:
self.log.exception("%s heartbeat failed with error", self.__class__.__name__)
self.heartbeat_failed = True
msg = f"{self.__class__.__name__} heartbeat got an exception"
self.log.error(msg)
if self.is_alive():
self.log.error(
"%s heartbeat failed with error. Scheduler may go into unhealthy state",
self.__class__.__name__,
)
msg = f"{self.__class__.__name__} heartbeat failed with error. Scheduler may go into unhealthy state"
else:
msg = (
f"{self.__class__.__name__} heartbeat failed with error. Scheduler is in unhealthy state"
)
self.log.error(msg)
# We didn't manage to heartbeat, so make sure that the timestamp isn't updated
self.latest_heartbeat = previous_heartbeat

@provide_session
def prepare_for_execution(self, session: Session = NEW_SESSION):
Expand Down Expand Up @@ -401,7 +393,6 @@ def execute_job(job: Job, execute_callable: Callable[[], int | None]) -> int | N
return ret


@add_debug_span
def perform_heartbeat(
job: Job, heartbeat_callback: Callable[[Session], None], only_if_necessary: bool
) -> None:
Expand Down
Loading
Loading