Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
204 changes: 184 additions & 20 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
import ddtrace
from ddtrace import config
from ddtrace._trace._inferred_proxy import create_inferred_proxy_span_if_headers_exist
from ddtrace._trace._span_link import SpanLinkKind as _SpanLinkKind
from ddtrace._trace._span_pointer import _SpanPointerDescription
from ddtrace._trace._span_pointer import _SpanPointerDirection
from ddtrace._trace.span import Span
from ddtrace._trace.utils import extract_DD_context_from_messages
from ddtrace.constants import _SPAN_MEASURED_KEY
Expand All @@ -31,6 +33,7 @@
from ddtrace.contrib.internal.trace_utils import _set_url_tag
from ddtrace.ext import SpanKind
from ddtrace.ext import SpanLinkKind
from ddtrace.ext import SpanTypes
from ddtrace.ext import db
from ddtrace.ext import http
from ddtrace.ext import net
Expand All @@ -49,6 +52,8 @@
from ddtrace.internal.constants import MESSAGING_OPERATION
from ddtrace.internal.constants import MESSAGING_SYSTEM
from ddtrace.internal.constants import SPAN_LINK_KIND
from ddtrace.internal.constants import SPAN_POINTER_DOWN_DIRECTION
from ddtrace.internal.constants import SPAN_POINTER_UP_DIRECTION
from ddtrace.internal.logger import get_logger
from ddtrace.internal.sampling import _inherit_sampling_tags
from ddtrace.internal.schema.span_attribute_schema import SpanDirection
Expand Down Expand Up @@ -992,6 +997,63 @@ def _set_client_ip_tags(scope: Mapping[str, Any], span: Span):
log.debug("Could not validate client IP address for websocket send message: %s", str(e))


def _init_websocket_message_counters(scope: Dict[str, Any]) -> None:
if "datadog" not in scope:
scope["datadog"] = {}
if "websocket_receive_counter" not in scope["datadog"]:
scope["datadog"]["websocket_receive_counter"] = 0
if "websocket_send_counter" not in scope["datadog"]:
scope["datadog"]["websocket_send_counter"] = 0


def _increment_websocket_counter(scope: Dict[str, Any], counter_type: str) -> int:
"""
Increment and return websocket message counter (either websocket_receive_counter or websocket_send_counter)
"""
scope["datadog"][counter_type] += 1
return scope["datadog"][counter_type]


def _build_websocket_span_pointer_hash(
handshake_trace_id: int,
handshake_span_id: int,
counter: int,
is_server: bool,
is_incoming: bool,
) -> str:
"""
Build websocket span pointer hash.

Format: <prefix><128 bit hex trace id><64 bit hex span id><32 bit hex counter>
Prefix: 'S' for server outgoing or client incoming, 'C' for server incoming or client outgoing
"""
if (is_server and not is_incoming) or (not is_server and is_incoming):
prefix = "S"
else:
prefix = "C"

trace_id_hex = f"{handshake_trace_id:032x}"
span_id_hex = f"{handshake_span_id:016x}"
counter_hex = f"{counter:08x}"

return f"{prefix}{trace_id_hex}{span_id_hex}{counter_hex}"


def _has_distributed_tracing_context(span: Span) -> bool:
"""
Check if the handshake span has extracted distributed tracing context.

A websocket server must not set the span pointer if the handshake has not extracted a context

A span has distributed tracing context if it has a parent context that was
extracted from headers (_is_remote=True).
"""
if not span or not span._parent_context:
return False
# Check if the context was extracted from remote headers
return span._parent_context._is_remote


def _on_asgi_websocket_receive_message(ctx, scope, message):
"""
Handle websocket receive message events.
Expand All @@ -1011,16 +1073,40 @@ def _on_asgi_websocket_receive_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}
# Add span pointer attributes if distributed tracing is enabled and context was extracted
if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span):
counter = _increment_websocket_counter(scope, "websocket_receive_counter")

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=True,
)

link_attributes.update(
{
"link.name": SPAN_POINTER_UP_DIRECTION,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": _SpanPointerDirection.UPSTREAM,
"ptr.hash": ptr_hash,
}
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_send_message(ctx, scope, message):
Expand All @@ -1041,10 +1127,35 @@ def _on_asgi_websocket_send_message(ctx, scope, message):
span.set_metric(websocket.MESSAGE_FRAMES, 1)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}

# Add span pointer attributes if distributed tracing is enabled and context was extracted
if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span):
counter = _increment_websocket_counter(scope, "websocket_send_counter")

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=False,
)

link_attributes.update(
{
"link.name": SPAN_POINTER_DOWN_DIRECTION,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": _SpanPointerDirection.DOWNSTREAM,
"ptr.hash": ptr_hash,
}
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)


Expand All @@ -1068,13 +1179,39 @@ def _on_asgi_websocket_close_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span

link_attributes = {SPAN_LINK_KIND: SpanLinkKind.RESUMING}

# Add span pointer attributes if distributed tracing is enabled and context was extracted
if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span):
counter = _increment_websocket_counter(scope, "websocket_send_counter")

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=False,
)

link_attributes.update(
{
"link.name": SPAN_POINTER_DOWN_DIRECTION,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": _SpanPointerDirection.DOWNSTREAM,
"ptr.hash": ptr_hash,
}
)

span.set_link(
trace_id=ctx.parent.span.trace_id,
span_id=ctx.parent.span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.RESUMING},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_websocket_disconnect_message(ctx, scope, message):
Expand All @@ -1093,16 +1230,41 @@ def _on_asgi_websocket_disconnect_message(ctx, scope, message):
_set_websocket_close_tags(span, message)

if hasattr(ctx, "parent") and ctx.parent.span:
handshake_span = ctx.parent.span
link_attributes = {SPAN_LINK_KIND: SpanLinkKind.EXECUTED}

# Add span pointer attributes if distributed tracing is enabled and context was extracted
if integration_config.distributed_tracing and _has_distributed_tracing_context(handshake_span):
counter = _increment_websocket_counter(scope, "websocket_receive_counter")

ptr_hash = _build_websocket_span_pointer_hash(
handshake_trace_id=handshake_span.trace_id,
handshake_span_id=handshake_span.span_id,
counter=counter,
is_server=True,
is_incoming=True,
)

link_attributes.update(
{
"link.name": SPAN_POINTER_UP_DIRECTION,
"dd.kind": _SpanLinkKind.SPAN_POINTER.value,
"ptr.kind": SpanTypes.WEBSOCKET,
"ptr.dir": _SpanPointerDirection.UPSTREAM,
"ptr.hash": ptr_hash,
}
)

span.set_link(
trace_id=ctx.parent_span.trace_id,
span_id=ctx.parent_span.span_id,
attributes={SPAN_LINK_KIND: SpanLinkKind.EXECUTED},
trace_id=handshake_span.trace_id,
span_id=handshake_span.span_id,
attributes=link_attributes,
)

if getattr(integration_config, "asgi_websocket_messages_inherit_sampling", True):
_inherit_sampling_tags(span, ctx.parent.span._local_root)
_inherit_sampling_tags(span, handshake_span._local_root)

_copy_trace_level_tags(span, ctx.parent.span)
_copy_trace_level_tags(span, handshake_span)


def _on_asgi_request(ctx: core.ExecutionContext) -> None:
Expand All @@ -1115,14 +1277,16 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None:
span = _start_span(ctx)
ctx.set_item("req_span", span)

if scope["type"] == "websocket":
span._set_tag_str("http.upgraded", "websocket")

if "datadog" not in scope:
scope["datadog"] = {"request_spans": [span]}
else:
scope["datadog"]["request_spans"].append(span)

if scope["type"] == "websocket":
span._set_tag_str("http.upgraded", SpanTypes.WEBSOCKET)
# Initialize websocket message counters for span pointer tracking
_init_websocket_message_counters(scope)


def listen():
core.on("wsgi.request.prepare", _on_request_prepare)
Expand Down
2 changes: 2 additions & 0 deletions ddtrace/internal/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
SAMPLING_DECISION_MAKER_RESOURCE = "_dd.dm.resource"
SPAN_LINK_KIND = "dd.kind"
SPAN_LINKS_KEY = "_dd.span_links"
SPAN_POINTER_DOWN_DIRECTION = "span-pointer-down"
SPAN_POINTER_UP_DIRECTION = "span-pointer-up"
SPAN_EVENTS_KEY = "events"
SPAN_API_DATADOG = "datadog"
SPAN_API_OTEL = "otel"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
features:
- |
Adds span pointers for WebSocket message tracing to enable distributed context propagation across client-server boundaries.
Loading
Loading