Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
64a8ca0
Remove the unnecessary OTEL tracing abstractions
dstandish Feb 27, 2026
a9f48e4
add dagrun span
dstandish Feb 27, 2026
10adf1e
remove the base tracer class
dstandish Feb 27, 2026
eb93f08
simplify carrier
dstandish Feb 27, 2026
18513d0
add task span
dstandish Mar 2, 2026
3e1e172
guard around force flush
dstandish Mar 2, 2026
575bce7
get it working again
dstandish Mar 3, 2026
176e565
revert removals and deprecations of old otel code
dstandish Mar 3, 2026
10bedbc
remove todo
dstandish Mar 3, 2026
37b0e36
handle backcompat
dstandish Mar 3, 2026
e6da739
revert comment
dstandish Mar 3, 2026
f25aa1c
revert change
dstandish Mar 3, 2026
0f3a5dd
add comment about backcompat config
dstandish Mar 3, 2026
eee3202
small fixes
dstandish Mar 3, 2026
6e748c9
remove old testing
dstandish Mar 3, 2026
0a58fc7
add dag run tracing tests
dstandish Mar 3, 2026
988ecfc
add task runner span tests
dstandish Mar 3, 2026
c3e2d1a
review nits
dstandish Mar 3, 2026
3b3a75d
make configure_otel a no-op when otel_on is off
dstandish Mar 3, 2026
41e0df3
review nits
dstandish Mar 3, 2026
d5a5558
simplify id gen
dstandish Mar 3, 2026
0552f8f
can set the context carrier earlier
dstandish Mar 3, 2026
20e78cc
fix test
dstandish Mar 3, 2026
8e3608d
fixes
dstandish Mar 3, 2026
9a84061
type hint for carrier
dstandish Mar 3, 2026
69cfa73
Integration test fix
dstandish Mar 3, 2026
81a40d9
fixes
dstandish Mar 5, 2026
cdfc661
don't override span id
dstandish Mar 6, 2026
773d324
Revert "don't override span id"
dstandish Mar 6, 2026
a90b062
use non-recording span to avoid memory leak, and move to helper function
dstandish Mar 6, 2026
477aa33
fixes
dstandish Mar 6, 2026
e1e55f7
fix off by 1
dstandish Mar 6, 2026
d7fbb4a
update span names
dstandish Mar 6, 2026
504ac3e
add config for flush timeout millis
dstandish Mar 6, 2026
58279c6
tweak attr names
dstandish Mar 6, 2026
c33482a
fix tests
dstandish Mar 6, 2026
76f122b
fix test
dstandish Mar 6, 2026
b909fdd
spelling
dstandish Mar 6, 2026
b4166e7
christos suggestion
dstandish Mar 9, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1414,9 +1414,21 @@ traces:
description: |
If True, then traces from Airflow internal methods are exported. Defaults to False.
version_added: 3.1.0
version_deprecated: 3.2.0
deprecation_reason: |
This parameter is no longer used.
type: string
example: ~
default: "False"
otel_task_runner_span_flush_timeout_millis:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In another config we use the full word “milliseconds” so this should match. That’s the only place we use milliseconds though; elsewhere we always use seconds instead.

description: |
Timeout in milliseconds to wait for the OpenTelemetry span exporter to flush pending spans
when a task runner process exits. If the exporter does not finish within this time, any
buffered spans may be dropped.
version_added: 3.1.0
type: integer
example: ~
default: "30000"
secrets:
description: ~
options:
Expand Down
44 changes: 0 additions & 44 deletions airflow-core/src/airflow/executors/base_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,11 @@
from airflow.configuration import conf
from airflow.executors import workloads
from airflow.executors.executor_loader import ExecutorLoader
from airflow.executors.workloads.task import TaskInstanceDTO
from airflow.models import Log
from airflow.models.callback import CallbackKey
from airflow.observability.metrics import stats_utils
from airflow.observability.trace import Trace
from airflow.utils.log.logging_mixin import LoggingMixin
from airflow.utils.state import TaskInstanceState
from airflow.utils.thread_safe_dict import ThreadSafeDict

PARALLELISM: int = conf.getint("core", "PARALLELISM")

Expand Down Expand Up @@ -143,8 +140,6 @@ class BaseExecutor(LoggingMixin):
:param parallelism: how many jobs should run at one time.
"""

active_spans = ThreadSafeDict()

supports_ad_hoc_ti_run: bool = False
supports_callbacks: bool = False
supports_multi_team: bool = False
Expand Down Expand Up @@ -217,10 +212,6 @@ def __repr__(self):
_repr += ")"
return _repr

@classmethod
def set_active_spans(cls, active_spans: ThreadSafeDict):
cls.active_spans = active_spans

def start(self): # pragma: no cover
"""Executors may need to get things started."""

Expand Down Expand Up @@ -340,17 +331,6 @@ def _emit_metrics(self, open_slots, num_running_tasks, num_queued_tasks):
queued_tasks_metric_name = self._get_metric_name("executor.queued_tasks")
running_tasks_metric_name = self._get_metric_name("executor.running_tasks")

span = Trace.get_current_span()
if span.is_recording():
span.add_event(
name="executor",
attributes={
open_slots_metric_name: open_slots,
queued_tasks_metric_name: num_queued_tasks,
running_tasks_metric_name: num_running_tasks,
},
)

self.log.debug("%s running task instances for executor %s", num_running_tasks, name)
self.log.debug("%s in queue for executor %s", num_queued_tasks, name)
if open_slots == 0:
Expand Down Expand Up @@ -415,30 +395,6 @@ def trigger_tasks(self, open_slots: int) -> None:
if key in self.attempts:
del self.attempts[key]

if isinstance(workload, workloads.ExecuteTask) and hasattr(workload, "ti"):
ti = workload.ti

# If it's None, then the span for the current id hasn't been started.
if self.active_spans is not None and self.active_spans.get("ti:" + str(ti.id)) is None:
if isinstance(ti, TaskInstanceDTO):
parent_context = Trace.extract(ti.parent_context_carrier)
else:
parent_context = Trace.extract(ti.dag_run.context_carrier)
# Start a new span using the context from the parent.
# Attributes will be set once the task has finished so that all
# values will be available (end_time, duration, etc.).

span = Trace.start_child_span(
span_name=f"{ti.task_id}",
parent_context=parent_context,
component="task",
start_as_current=False,
)
self.active_spans.set("ti:" + str(ti.id), span)
# Inject the current context into the carrier.
carrier = Trace.inject()
ti.context_carrier = carrier

workload_list.append(workload)

if workload_list:
Expand Down
2 changes: 1 addition & 1 deletion airflow-core/src/airflow/executors/workloads/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def make(
from airflow.utils.helpers import log_filename_template_renderer

ser_ti = TaskInstanceDTO.model_validate(ti, from_attributes=True)
ser_ti.parent_context_carrier = ti.dag_run.context_carrier
ser_ti.context_carrier = ti.dag_run.context_carrier
if not bundle_info:
bundle_info = BundleInfo(
name=ti.dag_model.bundle_name,
Expand Down
Loading
Loading