Skip to content

Commit 5c65e68

Browse files
authored
feat(span-first): Support before_send_span (#6239)
### Description Add support for `before_send_span` in span streaming mode. `before_send_span` is different from `before_send_metric` and `before_send_log` in that: - it doesn't allow users to drop a span (i.e., return `None`) - it only allows to modify specific parts of the span To that end, we're now serializing the span earlier, and exposing the serialized dictionary in the `before_send` callback. This is consistent with metrics and logs. It also means we're now queuing dictionaries instead of `StreamedSpan` instances in the span batcher, which should also decrease our memory footprint. This aligns our implementation with JS. See https://develop.sentry.dev/sdk/telemetry/spans/scrubbing-data/ for spec. #### Issues * resolves: #5388 * resolves: https://linear.app/getsentry/issue/PY-2057/add-before-send-span
1 parent 3512246 commit 5c65e68

9 files changed

Lines changed: 262 additions & 44 deletions

File tree

sentry_sdk/_span_batcher.py

Lines changed: 20 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
if TYPE_CHECKING:
1515
from typing import Any, Callable, Optional
1616

17-
from sentry_sdk.traces import StreamedSpan
17+
from sentry_sdk._types import SpanJSON
1818

1919

20-
class SpanBatcher(Batcher["StreamedSpan"]):
20+
class SpanBatcher(Batcher["SpanJSON"]):
2121
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
2222
# a bit of a buffer for spans that appear between the trigger to flush
2323
# and actually flushing the buffer.
@@ -43,7 +43,7 @@ def __init__(
4343
# by trace_id, so that we can then send the buckets each in its own
4444
# envelope.
4545
# trace_id -> span buffer
46-
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
46+
self._span_buffer: dict[str, list["SpanJSON"]] = defaultdict(list)
4747
self._running_size: dict[str, int] = defaultdict(lambda: 0)
4848
self._capture_func = capture_func
4949
self._record_lost_func = record_lost_func
@@ -100,7 +100,7 @@ def _flush_loop(self) -> None:
100100
self._flush()
101101
self._last_full_flush = time.monotonic()
102102

103-
def add(self, span: "StreamedSpan") -> None:
103+
def add(self, span: "SpanJSON") -> None:
104104
# Bail out if the current thread is already executing batcher code.
105105
# This prevents deadlocks when code running inside the batcher (e.g.
106106
# _add_to_envelope during flush, or _flush_event.wait/set) triggers
@@ -116,7 +116,7 @@ def add(self, span: "StreamedSpan") -> None:
116116
return None
117117

118118
with self._lock:
119-
size = len(self._span_buffer[span.trace_id])
119+
size = len(self._span_buffer[span["trace_id"]])
120120
if size >= self.MAX_BEFORE_DROP:
121121
self._record_lost_func(
122122
reason="queue_overflow",
@@ -125,14 +125,15 @@ def add(self, span: "StreamedSpan") -> None:
125125
)
126126
return None
127127

128-
self._span_buffer[span.trace_id].append(span)
129-
self._running_size[span.trace_id] += self._estimate_size(span)
128+
self._span_buffer[span["trace_id"]].append(span)
129+
self._running_size[span["trace_id"]] += self._estimate_size(span)
130130

131131
if (
132132
size + 1 >= self.MAX_BEFORE_FLUSH
133-
or self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH
133+
or self._running_size[span["trace_id"]]
134+
>= self.MAX_BYTES_BEFORE_FLUSH
134135
):
135-
self._pending_flush.add(span.trace_id)
136+
self._pending_flush.add(span["trace_id"])
136137
notify = True
137138
else:
138139
notify = False
@@ -143,12 +144,12 @@ def add(self, span: "StreamedSpan") -> None:
143144
self._active.flag = False
144145

145146
@staticmethod
146-
def _estimate_size(item: "StreamedSpan") -> int:
147+
def _estimate_size(item: "SpanJSON") -> int:
147148
# Rough estimate of serialized span size that's quick to compute.
148149
# 210 is the rough size of the payload without attributes, and then we
149150
# estimate the attributes separately.
150151
estimate = 210
151-
for value in item._attributes.values():
152+
for value in (item.get("attributes") or {}).values():
152153
estimate += 50
153154

154155
if isinstance(value, str):
@@ -159,26 +160,15 @@ def _estimate_size(item: "StreamedSpan") -> int:
159160
return estimate
160161

161162
@staticmethod
162-
def _to_transport_format(item: "StreamedSpan") -> "Any":
163-
res: "dict[str, Any]" = {
164-
"trace_id": item.trace_id,
165-
"span_id": item.span_id,
166-
"name": item._name if item._name is not None else "<unlabeled span>",
167-
"status": item._status,
168-
"is_segment": item._is_segment(),
169-
"start_timestamp": item._start_timestamp.timestamp(),
170-
}
171-
172-
if item._end_timestamp:
173-
res["end_timestamp"] = item._end_timestamp.timestamp()
174-
175-
if item._parent_span_id:
176-
res["parent_span_id"] = item._parent_span_id
177-
178-
if item._attributes:
163+
def _to_transport_format(item: "SpanJSON") -> "Any":
164+
res = {k: v for k, v in item.items() if k not in ("_segment_span",)}
165+
166+
if item.get("attributes"):
179167
res["attributes"] = {
180-
k: serialize_attribute(v) for (k, v) in item._attributes.items()
168+
k: serialize_attribute(v) for (k, v) in item["attributes"].items()
181169
}
170+
else:
171+
del res["attributes"]
182172

183173
return res
184174

@@ -202,7 +192,7 @@ def _flush(self, only_pending: bool = False) -> None:
202192
if not spans:
203193
continue
204194

205-
dsc = spans[0]._dynamic_sampling_context()
195+
dsc = spans[0]["_segment_span"]._dynamic_sampling_context()
206196

207197
# Max per envelope is 1000, so if we happen to have more than
208198
# 1000 spans in one bucket, we'll need to separate them.

sentry_sdk/_types.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@ def substituted_because_contains_sensitive_data(cls) -> "AnnotatedValue":
145145

146146
from typing_extensions import Literal, TypedDict
147147

148+
import sentry_sdk
149+
148150
class SDKInfo(TypedDict):
149151
name: str
150152
version: str
@@ -318,6 +320,22 @@ class SDKInfo(TypedDict):
318320

319321
MetricProcessor = Callable[[Metric, Hint], Optional[Metric]]
320322

323+
SpanJSON = TypedDict(
324+
"SpanJSON",
325+
{
326+
"trace_id": str,
327+
"span_id": str,
328+
"parent_span_id": NotRequired[str],
329+
"name": str,
330+
"status": str,
331+
"is_segment": bool,
332+
"start_timestamp": float,
333+
"end_timestamp": NotRequired[float],
334+
"attributes": NotRequired[Attributes],
335+
"_segment_span": NotRequired["sentry_sdk.traces.StreamedSpan"],
336+
},
337+
)
338+
321339
# TODO: Make a proper type definition for this (PRs welcome!)
322340
Breadcrumb = Dict[str, Any]
323341

sentry_sdk/client.py

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
from sentry_sdk.scrubber import EventScrubber
3535
from sentry_sdk.serializer import serialize
3636
from sentry_sdk.sessions import SessionFlusher
37-
from sentry_sdk.traces import SpanStatus
37+
from sentry_sdk.traces import SpanStatus, StreamedSpan
3838
from sentry_sdk.tracing import trace
3939
from sentry_sdk.tracing_utils import has_span_streaming_enabled
4040
from sentry_sdk.transport import (
@@ -52,6 +52,7 @@
5252
format_timestamp,
5353
get_before_send_log,
5454
get_before_send_metric,
55+
get_before_send_span,
5556
get_default_release,
5657
get_sdk_name,
5758
get_type_name,
@@ -1169,34 +1170,72 @@ def _capture_telemetry(
11691170
ty: str,
11701171
scope: "Scope",
11711172
) -> None:
1172-
# Capture attributes-based telemetry (logs, metrics, spansV2)
1173+
"""
1174+
Capture attributes-based telemetry (logs, metrics, streamed spans).
1175+
1176+
Apply any attributes set on the scope to it, and run the user's
1177+
before_send_{telemetry} on it, if applicable.
1178+
"""
11731179
if telemetry is None:
11741180
return
11751181

11761182
scope.apply_to_telemetry(telemetry)
11771183

11781184
before_send = None
1185+
11791186
if ty == "log":
11801187
before_send = get_before_send_log(self.options)
1188+
serialized = telemetry
1189+
11811190
elif ty == "metric":
11821191
before_send = get_before_send_metric(self.options)
1192+
serialized = telemetry
1193+
1194+
elif ty == "span":
1195+
before_send = get_before_send_span(self.options)
1196+
serialized = telemetry._to_json() # type: ignore[union-attr]
11831197

11841198
if before_send is not None:
1185-
telemetry = before_send(telemetry, {}) # type: ignore
1199+
serialized = before_send(serialized, {}) # type: ignore[arg-type]
1200+
1201+
if ty in ("log", "metric"):
1202+
# Logs and metrics can be dropped in their respective
1203+
# before_send, so if we get None, don't queue them for sending.
1204+
if serialized is None:
1205+
return
1206+
1207+
elif ty == "span" and isinstance(telemetry, StreamedSpan):
1208+
# Spans can't be dropped in before_send_span by design. They can
1209+
# be altered though (e.g. to sanitize). Only allow changes to
1210+
# name and attributes.
1211+
if isinstance(serialized, dict) and "name" in serialized:
1212+
telemetry.name = serialized["name"] # type: ignore[typeddict-item]
1213+
telemetry._attributes = {}
1214+
for k, v in (serialized.get("attributes") or {}).items():
1215+
telemetry.set_attribute(k, v)
1216+
1217+
else:
1218+
logger.debug(
1219+
"[Tracing] Invalid return value from before_send_span. Keeping original span."
1220+
)
11861221

1187-
if telemetry is None:
1188-
return
1222+
serialized = telemetry._to_json()
11891223

11901224
batcher = None
11911225
if ty == "log":
11921226
batcher = self.log_batcher
1227+
11931228
elif ty == "metric":
11941229
batcher = self.metrics_batcher
1230+
11951231
elif ty == "span":
1232+
# We need a reference to the segment span in the batcher to populate
1233+
# the dynamic sampling context (DSC)
1234+
serialized["_segment_span"] = telemetry._segment # type: ignore
11961235
batcher = self.span_batcher
11971236

11981237
if batcher is not None:
1199-
batcher.add(telemetry) # type: ignore
1238+
batcher.add(serialized) # type: ignore
12001239

12011240
def _capture_log(self, log: "Optional[Log]", scope: "Scope") -> None:
12021241
self._capture_telemetry(log, "log", scope)

sentry_sdk/consts.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class CompressionAlgo(Enum):
5656
Log,
5757
Metric,
5858
ProfilerMode,
59+
SpanJSON,
5960
TracesSampler,
6061
TransactionProcessor,
6162
)
@@ -85,6 +86,9 @@ class CompressionAlgo(Enum):
8586
"before_send_metric": Optional[Callable[[Metric, Hint], Optional[Metric]]],
8687
"trace_lifecycle": Optional[Literal["static", "stream"]],
8788
"ignore_spans": Optional[IgnoreSpansConfig],
89+
"before_send_span": Optional[
90+
Callable[[SpanJSON, Hint], Optional[SpanJSON]]
91+
],
8892
"suppress_asgi_chained_exceptions": Optional[bool],
8993
},
9094
total=False,

sentry_sdk/traces.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@
4343
overload,
4444
)
4545

46-
from sentry_sdk._types import Attributes, AttributeValue
46+
from sentry_sdk._types import Attributes, AttributeValue, SpanJSON
4747
from sentry_sdk.profiler.continuous_profiler import ContinuousProfile
4848

4949
P = ParamSpec("P")
@@ -574,6 +574,26 @@ def _set_segment_attributes(self) -> None:
574574

575575
self.set_attribute("process.command_args", sys.argv)
576576

577+
def _to_json(self) -> "SpanJSON":
578+
res: "SpanJSON" = {
579+
"trace_id": self.trace_id,
580+
"span_id": self.span_id,
581+
"name": self._name if self._name is not None else "<unlabeled span>",
582+
"status": self._status,
583+
"is_segment": self._is_segment(),
584+
"start_timestamp": self._start_timestamp.timestamp(),
585+
}
586+
587+
if self._end_timestamp:
588+
res["end_timestamp"] = self._end_timestamp.timestamp()
589+
590+
if self._parent_span_id:
591+
res["parent_span_id"] = self._parent_span_id
592+
593+
res["attributes"] = {k: v for k, v in self._attributes.items()}
594+
595+
return res
596+
577597

578598
class NoOpStreamedSpan(StreamedSpan):
579599
__slots__ = (

sentry_sdk/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
Log,
7777
Metric,
7878
SerializedAttributeValue,
79+
SpanJSON,
7980
)
8081

8182
P = ParamSpec("P")
@@ -2111,6 +2112,15 @@ def get_before_send_metric(
21112112
)
21122113

21132114

2115+
def get_before_send_span(
2116+
options: "Optional[dict[str, Any]]",
2117+
) -> "Optional[Callable[[SpanJSON, Hint], Optional[SpanJSON]]]":
2118+
if options is None:
2119+
return None
2120+
2121+
return options["_experiments"].get("before_send_span")
2122+
2123+
21142124
def format_attribute(val: "Any") -> "AttributeValue":
21152125
"""
21162126
Turn unsupported attribute value types into an AttributeValue.

tests/integrations/sqlalchemy/test_sqlalchemy.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1079,19 +1079,19 @@ class Person(Base):
10791079

10801080
class fake_record_sql_queries: # noqa: N801
10811081
def __init__(self, *args, **kwargs):
1082-
with record_sql_queries_supporting_streaming(
1082+
self._ctx_mgr = record_sql_queries_supporting_streaming(
10831083
*args, **kwargs
1084-
) as span:
1085-
self.span = span
1084+
)
10861085

1086+
def __enter__(self):
1087+
self.span = self._ctx_mgr.__enter__()
10871088
self.span._start_timestamp = datetime(2024, 1, 1, microsecond=0)
10881089
self.span._end_timestamp = datetime(2024, 1, 1, microsecond=101000)
1089-
1090-
def __enter__(self):
10911090
return self.span
10921091

10931092
def __exit__(self, type, value, traceback):
1094-
pass
1093+
self.span._end_timestamp = None
1094+
self._ctx_mgr.__exit__(type, value, traceback)
10951095

10961096
with mock.patch(
10971097
"sentry_sdk.integrations.sqlalchemy.record_sql_queries_supporting_streaming",

tests/tracing/test_span_batcher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ def test_weight_based_flushing_by_attribute_size(
236236
with sentry_sdk.traces.start_span(name="small span") as bare_span:
237237
pass
238238

239-
bare_span_size = SpanBatcher._estimate_size(bare_span)
239+
bare_span_size = SpanBatcher._estimate_size(bare_span._to_json())
240240
big_attr = "x" * bare_span_size
241241

242242
monkeypatch.setattr(SpanBatcher, "MAX_BYTES_BEFORE_FLUSH", bare_span_size * 3)

0 commit comments

Comments
 (0)