Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
d76a11d
chore(internal): add process_tags to first span of each payload
dubloom Nov 4, 2025
a3643d8
tests(process_tags): add tests
dubloom Nov 5, 2025
a52e8cc
lint
dubloom Nov 5, 2025
f943f2a
fix: suitespec
dubloom Nov 5, 2025
660bd64
fix: telemetry test
dubloom Nov 5, 2025
ace7fae
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 5, 2025
78dd521
fix telemetry 2
dubloom Nov 5, 2025
dd58490
simplify process_tags (brett review)
dubloom Nov 6, 2025
f47539e
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 6, 2025
184ef53
update python version
dubloom Nov 6, 2025
be2973e
put tests within internal suite
dubloom Nov 6, 2025
c6b4d7f
remove sys hack
dubloom Nov 6, 2025
c6cb1be
make tests compatible with CI
dubloom Nov 7, 2025
974b474
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 7, 2025
7416466
lint
dubloom Nov 7, 2025
0428dcd
brett review
dubloom Nov 10, 2025
b66d6a4
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 10, 2025
71b5ed7
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 12, 2025
a123350
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 12, 2025
32ddf35
improve tag normalization
dubloom Nov 14, 2025
ee77b0e
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 14, 2025
f5c3eee
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 17, 2025
7cf4143
gab review
dubloom Nov 17, 2025
ae20207
improving normalization
dubloom Nov 18, 2025
f70b7ed
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 18, 2025
efe28fa
remove print
dubloom Nov 18, 2025
f957552
Update tests/internal/test_process_tags.py
dubloom Nov 18, 2025
86f04db
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 19, 2025
914d52a
add a test that activates the feature with env variable
dubloom Nov 19, 2025
fdd6480
chore(di): add process_tags (#15225)
dubloom Nov 19, 2025
fa3dbb8
Merge branch 'main' into dubloom/process-tags-collection
dubloom Nov 19, 2025
aa23d07
lint
dubloom Nov 19, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions .riot/requirements/1645326.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# This file is autogenerated by pip-compile with Python 3.13
# by the following command:
#
# pip-compile --allow-unsafe --no-annotate .riot/requirements/1645326.in
#
attrs==25.4.0
coverage[toml]==7.11.0
hypothesis==6.45.0
iniconfig==2.3.0
mock==5.2.0
opentracing==2.4.0
packaging==25.0
pluggy==1.6.0
pygments==2.19.2
pytest==8.4.2
pytest-cov==7.0.0
pytest-mock==3.15.1
sortedcontainers==2.4.0
4 changes: 4 additions & 0 deletions ddtrace/_trace/processor/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@
from ddtrace.internal.constants import HIGHER_ORDER_TRACE_ID_BITS
from ddtrace.internal.constants import LAST_DD_PARENT_ID_KEY
from ddtrace.internal.constants import MAX_UINT_64BITS
from ddtrace.internal.constants import PROCESS_TAGS
from ddtrace.internal.constants import SAMPLING_DECISION_TRACE_TAG_KEY
from ddtrace.internal.constants import SamplingMechanism
from ddtrace.internal.logger import get_logger
from ddtrace.internal.process_tags import process_tags
from ddtrace.internal.rate_limiter import RateLimiter
from ddtrace.internal.sampling import SpanSamplingRule
from ddtrace.internal.sampling import get_span_sampling_rules
Expand Down Expand Up @@ -250,6 +252,8 @@ def process_trace(self, trace: List[Span]) -> Optional[List[Span]]:
span._update_tags_from_context()
self._set_git_metadata(span)
span._set_tag_str("language", "python")
if serialized_process_tags := process_tags.get_serialized_process_tags():
span._set_tag_str(PROCESS_TAGS, serialized_process_tags)
# for 128 bit trace ids
if span.trace_id > MAX_UINT_64BITS:
trace_id_hob = _get_64_highest_order_bits_as_hex(span.trace_id)
Expand Down
1 change: 1 addition & 0 deletions ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
HTTP_REQUEST_PATH_PARAMETER = "http.request.path.parameter"
REQUEST_PATH_PARAMS = "http.request.path_params"
STATUS_403_TYPE_AUTO = {"status_code": 403, "type": "auto"}
PROCESS_TAGS = "_dd.tags.process"

CONTAINER_ID_HEADER_NAME = "Datadog-Container-Id"

Expand Down
79 changes: 79 additions & 0 deletions ddtrace/internal/process_tags/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import os
from pathlib import Path
import re
import sys
from typing import Callable
from typing import Dict
from typing import Optional

from ddtrace.internal.forksafe import Lock
from ddtrace.internal.logger import get_logger
from ddtrace.settings._config import config

from .constants import ENTRYPOINT_BASEDIR_TAG
from .constants import ENTRYPOINT_NAME_TAG
from .constants import ENTRYPOINT_TYPE_SCRIPT
from .constants import ENTRYPOINT_TYPE_TAG
from .constants import ENTRYPOINT_WORKDIR_TAG


log = get_logger(__name__)


# outside of ProcessTags class for test purpose
def normalize_tag(value: str) -> str:
return re.sub(r"[^a-z0-9/._-]", "_", value.lower())


class ProcessTags:
def __init__(self) -> None:
self._lock = Lock()
self._serialized: Optional[str] = None
self._enabled = config._process_tags_enabled
self._process_tags: Dict[str, str] = {}
self.reload()

def _serialize_process_tags(self) -> Optional[str]:
if self._process_tags and not self._serialized:
serialized_tags = ",".join(f"{key}:{value}" for key, value in self._process_tags.items())
return serialized_tags
return None

def get_serialized_process_tags(self) -> Optional[str]:
if not self._enabled:
return None

with self._lock:
if not self._serialized:
self._serialized = self._serialize_process_tags()
return self._serialized

def add_process_tag(self, key: str, value: Optional[str] = None, compute: Optional[Callable[[], str]] = None):
if not self._enabled:
return

if compute:
try:
value = compute()
except Exception as e:
log.debug("failed to set %s process_tag: %s", key, e)

if value:
with self._lock:
self._process_tags[key] = normalize_tag(value)
self._serialized = None

def reload(self):
if not self._enabled:
return

with self._lock:
self._process_tags = {}

self.add_process_tag(ENTRYPOINT_WORKDIR_TAG, compute=lambda: os.path.basename(os.getcwd()))
self.add_process_tag(ENTRYPOINT_BASEDIR_TAG, compute=lambda: Path(sys.argv[0]).resolve().parent.name)
self.add_process_tag(ENTRYPOINT_NAME_TAG, compute=lambda: os.path.splitext(os.path.basename(sys.argv[0]))[0])
self.add_process_tag(ENTRYPOINT_TYPE_TAG, value=ENTRYPOINT_TYPE_SCRIPT)


process_tags = ProcessTags()
7 changes: 7 additions & 0 deletions ddtrace/internal/process_tags/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
ENTRYPOINT_NAME_TAG = "entrypoint.name"
ENTRYPOINT_WORKDIR_TAG = "entrypoint.workdir"

ENTRYPOINT_TYPE_TAG = "entrypoint.type"
ENTRYPOINT_TYPE_SCRIPT = "script"

ENTRYPOINT_BASEDIR_TAG = "entrypoint.basedir"
3 changes: 3 additions & 0 deletions ddtrace/settings/_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,9 @@ def __init__(self):
self._trace_resource_renaming_always_simplified_endpoint = _get_config(
"DD_TRACE_RESOURCE_RENAMING_ALWAYS_SIMPLIFIED_ENDPOINT", default=False, modifier=asbool
)
self._process_tags_enabled = _get_config(
"DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED", default=False, modifier=asbool
)

# Long-running span interval configurations
# Only supported for Ray spans for now
Expand Down
1 change: 1 addition & 0 deletions riotfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT
command="pytest --no-cov {cmdargs} tests/coverage -s",
pys=select_pys(max_version="3.12"),
),
Venv(name="process_tags", command="pytest -v {cmdargs} tests/process_tags/", pys=select_pys(min_version="3.9")),
Venv(
name="internal",
env={
Expand Down
Empty file added tests/process_tags/__init__.py
Empty file.
130 changes: 130 additions & 0 deletions tests/process_tags/test_process_tags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import os
from pathlib import Path
import sys

from ddtrace.internal.constants import PROCESS_TAGS
from ddtrace.internal.process_tags import normalize_tag
from ddtrace.internal.process_tags import process_tags
from ddtrace.internal.process_tags.constants import ENTRYPOINT_BASEDIR_TAG
from ddtrace.internal.process_tags.constants import ENTRYPOINT_NAME_TAG
from ddtrace.internal.process_tags.constants import ENTRYPOINT_TYPE_SCRIPT
from ddtrace.internal.process_tags.constants import ENTRYPOINT_TYPE_TAG
from ddtrace.internal.process_tags.constants import ENTRYPOINT_WORKDIR_TAG
from tests.utils import TracerTestCase
from tests.utils import override_env


def test_normalize_tag():
assert normalize_tag("HelloWorld") == "helloworld"
assert normalize_tag("Hello@World!") == "hello_world_"
assert normalize_tag("HeLLo123") == "hello123"
assert normalize_tag("hello world") == "hello_world"
assert normalize_tag("a/b.c_d-e") == "a/b.c_d-e"
assert normalize_tag("héllø") == "h_ll_"
assert normalize_tag("") == ""
assert normalize_tag("💡⚡️") == "___"
assert normalize_tag("!foo@") == "_foo_"
assert normalize_tag("123_abc.DEF-ghi/jkl") == "123_abc.def-ghi/jkl"
assert normalize_tag("Env:Prod-Server#1") == "env_prod-server_1"


class TestProcessTags(TracerTestCase):
def test_process_tags_deactivated(self):
with self.tracer.trace("test"):
pass

span = self.get_spans()[0]
assert span is not None
assert PROCESS_TAGS not in span._meta

def test_process_tags_activated_with_override_env(self):
"""Test process tags using override_env instead of run_in_subprocess"""
with override_env(dict(DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="True")):
process_tags._enabled = True
process_tags.reload()

with self.tracer.trace("test"):
pass

process_tags._enabled = False

span = self.get_spans()[0]
assert span is not None
assert PROCESS_TAGS in span._meta

expected_name = "pytest"
expected_type = ENTRYPOINT_TYPE_SCRIPT
expected_basedir = Path(sys.argv[0]).resolve().parent.name
expected_workdir = os.path.basename(os.getcwd())

serialized_tags = span._meta[PROCESS_TAGS]
expected_raw = (
f"{ENTRYPOINT_WORKDIR_TAG}:{expected_workdir},"
f"{ENTRYPOINT_BASEDIR_TAG}:{expected_basedir},"
f"{ENTRYPOINT_NAME_TAG}:{expected_name},"
f"{ENTRYPOINT_TYPE_TAG}:{expected_type}"
)
assert serialized_tags == expected_raw

tags_dict = dict(tag.split(":", 1) for tag in serialized_tags.split(","))
assert tags_dict[ENTRYPOINT_NAME_TAG] == expected_name
assert tags_dict[ENTRYPOINT_TYPE_TAG] == expected_type
assert tags_dict[ENTRYPOINT_BASEDIR_TAG] == expected_basedir
assert tags_dict[ENTRYPOINT_WORKDIR_TAG] == expected_workdir

def test_process_tags_only_on_local_root_span(self):
"""Test that only local root spans get process tags, not children"""
with override_env(dict(DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="True")):
process_tags._enabled = True
process_tags.reload()

with self.tracer.trace("parent"):
with self.tracer.trace("child"):
pass

process_tags._enabled = False

spans = self.get_spans()
assert len(spans) == 2

parent = [s for s in spans if s.name == "parent"][0]
assert PROCESS_TAGS in parent._meta

child = [s for s in spans if s.name == "child"][0]
assert PROCESS_TAGS not in child._meta

def test_add_process_tag_compute_exception(self):
"""Test error handling when compute raises exception"""
with override_env(dict(DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="True")):
process_tags._enabled = True
process_tags.reload()

def failing_compute():
raise ValueError("Test exception")

process_tags.add_process_tag("test.tag", compute=failing_compute)

assert "test.tag" not in process_tags._process_tags

process_tags.add_process_tag("test.working", value="value")
assert "test.working" in process_tags._process_tags
assert process_tags._process_tags["test.working"] == "value"

process_tags._enabled = False

def test_serialization_caching(self):
"""Test that serialization is cached and invalidated properly"""
with override_env(dict(DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED="True")):
process_tags._enabled = True
process_tags.reload()

process_tags.get_serialized_process_tags()
assert process_tags._serialized is not None

process_tags.add_process_tag("custom.tag", value="test")
assert process_tags._serialized is None

result3 = process_tags.get_serialized_process_tags()
assert "custom.tag:test" in result3

process_tags._enabled = False
10 changes: 10 additions & 0 deletions tests/suitespec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ components:
- ddtrace/internal/opentelemetry/*
opentracer:
- ddtrace/opentracer/*
process_tags:
- ddtrace/internal/process_tags/*
- tests/process_tags/*
profiling:
- ddtrace/profiling/*
- ddtrace/internal/datadog/profiling/*
Expand Down Expand Up @@ -242,6 +245,13 @@ suites:
- '@core'
runner: riot
pattern: ^openfeature$
process_tags:
parallelism: 1
paths:
- '@process_tags'
- '@core'
- '@tracing'
runner: riot
telemetry:
parallelism: 1
paths:
Expand Down
1 change: 1 addition & 0 deletions tests/telemetry/test_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ def test_app_started_event_configuration_override(test_agent_session, run_python
{"name": "DD_ERROR_TRACKING_HANDLED_ERRORS_INCLUDE", "origin": "default", "value": ""},
{"name": "DD_EXCEPTION_REPLAY_CAPTURE_MAX_FRAMES", "origin": "default", "value": 8},
{"name": "DD_EXCEPTION_REPLAY_ENABLED", "origin": "env_var", "value": True},
{"name": "DD_EXPERIMENTAL_PROPAGATE_PROCESS_TAGS_ENABLED", "origin": "default", "value": False},
{"name": "DD_FASTAPI_ASYNC_BODY_TIMEOUT_SECONDS", "origin": "default", "value": 0.1},
{"name": "DD_IAST_DEDUPLICATION_ENABLED", "origin": "default", "value": True},
{"name": "DD_IAST_ENABLED", "origin": "default", "value": False},
Expand Down
Loading