Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions ddtrace/internal/opentelemetry/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ def attach(self, otel_context):
if ddcontext:
ddcontext.remove_all_baggage_items()
if otel_baggage:
for key, value in otel_baggage.items():
for key, value in list(otel_baggage.items()):
ddcontext._baggage[key] = value # potentially convert to json

# A return value with the type `object` is required by the otel api to remove/deactivate spans.
Expand Down Expand Up @@ -85,7 +85,7 @@ def get_current(self):
else:
dd_baggage = {}

for key, value in dd_baggage.items():
for key, value in list(dd_baggage.items()):
context = set_baggage(key, value, context)

return context
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
opentelemetry: Fixes ``RuntimeError: dictionary changed size during iteration`` in
``DDRuntimeContext.get_current()`` and ``DDRuntimeContext.attach()`` when multiple
threads concurrently read and mutate baggage dicts
60 changes: 60 additions & 0 deletions tests/opentelemetry/test_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,66 @@ def target(parent_context):
t.join()


def test_otel_get_current_thread_safety(oteltracer):
"""Test that get_current() does not raise RuntimeError when baggage is mutated concurrently.

Regression test for https://github.com/DataDog/dd-trace-py/issues/16523

The real-world scenario: multiple threads share the same DD span context
(via OTel context propagation). Some threads call get_current() which iterates
dd_baggage.items(), while others call attach() which mutates the same dict.
"""
import concurrent.futures

from opentelemetry.baggage import set_baggage
from opentelemetry.context import attach as otel_attach
from opentelemetry.context import get_current

errors = []

with oteltracer.start_as_current_span("root"):
# Pre-populate baggage to increase iteration time and race window
ctx = get_current()
for i in range(50):
ctx = set_baggage(f"key{i}", f"value{i}", ctx)
otel_attach(ctx)

# Capture the parent context (contains the span + baggage) to propagate to workers
parent_ctx = get_current()

def reader():
"""Attach parent context then repeatedly call get_current() which iterates dd_baggage.items()"""
# Propagate parent context to this thread (standard OTel pattern for thread pools)
otel_attach(parent_ctx)
for _ in range(200):
try:
get_current()
except RuntimeError as e:
if "dictionary changed size during iteration" in str(e):
errors.append(e)
return

def writer():
"""Attach parent context then repeatedly mutate baggage on the shared DD context."""
# Propagate parent context to this thread
otel_attach(parent_ctx)
for i in range(200):
# Build a new OTel context with additional baggage based on the parent
# (which contains the span), then attach it — this writes to dd_baggage
new_ctx = set_baggage(f"writer_key{i}", f"writer_value{i}", parent_ctx)
otel_attach(new_ctx)

with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
futures = []
for _ in range(4):
futures.append(executor.submit(reader))
for _ in range(4):
futures.append(executor.submit(writer))
concurrent.futures.wait(futures)

assert not errors, f"Thread-safety regression: {errors[0]}"


def _subprocess_task(parent_span_context, errors):
import ddtrace.auto # noqa

Expand Down
Loading