Skip to content

Commit 25751c7

Browse files
committed
ref(eap): Start unifying EAP utils
We've got a couple redundant-implementations of EAP utils, most notably the `encode_value` function. This PR unifies these into `sentry.utils.eap`. Eventually, I'd like this package to handle all of the encoding/ details so users can easily write/query EAP without needing to worry about things.
1 parent ae0e505 commit 25751c7

File tree

10 files changed

+133
-201
lines changed

10 files changed

+133
-201
lines changed

src/sentry/eventstream/item_helpers.py

Lines changed: 4 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,11 @@
33

44
from google.protobuf.timestamp_pb2 import Timestamp
55
from sentry_protos.snuba.v1.request_common_pb2 import TRACE_ITEM_TYPE_OCCURRENCE
6-
from sentry_protos.snuba.v1.trace_item_pb2 import (
7-
AnyValue,
8-
ArrayValue,
9-
KeyValue,
10-
KeyValueList,
11-
TraceItem,
12-
)
6+
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
137

148
from sentry.models.project import Project
159
from sentry.services.eventstore.models import Event, GroupEvent
10+
from sentry.utils.eap import encode_value
1611

1712

1813
def serialize_event_data_as_item(
@@ -35,36 +30,6 @@ def serialize_event_data_as_item(
3530
)
3631

3732

38-
def _encode_value(value: Any) -> AnyValue:
39-
if isinstance(value, str):
40-
return AnyValue(string_value=value)
41-
elif isinstance(value, bool):
42-
# Note: bool check must come before int check since bool is a subclass of int
43-
return AnyValue(bool_value=value)
44-
elif isinstance(value, int):
45-
return AnyValue(int_value=value)
46-
elif isinstance(value, float):
47-
return AnyValue(double_value=value)
48-
elif isinstance(value, list) or isinstance(value, tuple):
49-
# Not yet processed on EAP side
50-
return AnyValue(
51-
array_value=ArrayValue(values=[_encode_value(v) for v in value if v is not None])
52-
)
53-
elif isinstance(value, dict):
54-
# Not yet processed on EAP side
55-
return AnyValue(
56-
kvlist_value=KeyValueList(
57-
values=[
58-
KeyValue(key=str(kv[0]), value=_encode_value(kv[1]))
59-
for kv in value.items()
60-
if kv[1] is not None
61-
]
62-
)
63-
)
64-
else:
65-
raise NotImplementedError(f"encode not supported for {type(value)}")
66-
67-
6833
def encode_attributes(
6934
event: Event | GroupEvent, event_data: Mapping[str, Any], ignore_fields: set[str] | None = None
7035
) -> Mapping[str, AnyValue]:
@@ -76,14 +41,14 @@ def encode_attributes(
7641
continue
7742
if value is None:
7843
continue
79-
attributes[key] = _encode_value(value)
44+
attributes[key] = encode_value(value)
8045

8146
if event.group_id:
8247
attributes["group_id"] = AnyValue(int_value=event.group_id)
8348

8449
for key, value in event_data["tags"]:
8550
if value is None:
8651
continue
87-
attributes[f"tags[{key}]"] = _encode_value(value)
52+
attributes[f"tags[{key}]"] = encode_value(value)
8853

8954
return attributes

src/sentry/eventstream/kafka/backend.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,21 +10,19 @@
1010
from confluent_kafka import KafkaError
1111
from confluent_kafka import Message as KafkaMessage
1212
from confluent_kafka import Producer
13-
from sentry_kafka_schemas.codecs import Codec
1413
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem
1514

1615
from sentry import options
17-
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
16+
from sentry.conf.types.kafka_definition import Topic
1817
from sentry.eventstream.base import GroupStates
1918
from sentry.eventstream.snuba import KW_SKIP_SEMANTIC_PARTITIONING, SnubaProtocolEventStream
2019
from sentry.eventstream.types import EventStreamEventType
2120
from sentry.killswitches import killswitch_matches_context
2221
from sentry.utils import json
2322
from sentry.utils.confluent_producer import get_confluent_producer
23+
from sentry.utils.eap import EAP_ITEMS_CODEC
2424
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
2525

26-
EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)
27-
2826
logger = logging.getLogger(__name__)
2927

3028
if TYPE_CHECKING:

src/sentry/replays/lib/eap/write.py

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,11 @@
77
from arroyo.backends.kafka import KafkaPayload
88
from django.conf import settings
99
from google.protobuf.timestamp_pb2 import Timestamp
10-
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, ArrayValue, KeyValue, KeyValueList
1110
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem as EAPTraceItem
1211

1312
from sentry.conf.types.kafka_definition import Topic
1413
from sentry.replays.lib.eap.snuba_transpiler import TRACE_ITEM_TYPE_MAP, TRACE_ITEM_TYPES
15-
from sentry.replays.lib.kafka import EAP_ITEMS_CODEC, eap_producer
14+
from sentry.utils.eap import EAP_ITEMS_CODEC, eap_items_producer, encode_value
1615
from sentry.utils.kafka_config import get_topic_definition
1716

1817
Value = bool | bytes | str | int | float | Sequence["Value"] | MutableMapping[str, "Value"]
@@ -33,27 +32,6 @@ class TraceItem(TypedDict):
3332

3433

3534
def new_trace_item(trace_item: TraceItem) -> EAPTraceItem:
36-
def _anyvalue(value: Value) -> AnyValue:
37-
if isinstance(value, bool):
38-
return AnyValue(bool_value=value)
39-
elif isinstance(value, str):
40-
return AnyValue(string_value=value)
41-
elif isinstance(value, int):
42-
return AnyValue(int_value=value)
43-
elif isinstance(value, float):
44-
return AnyValue(double_value=value)
45-
elif isinstance(value, bytes):
46-
return AnyValue(bytes_value=value)
47-
elif isinstance(value, list):
48-
return AnyValue(array_value=ArrayValue(values=[_anyvalue(v) for v in value]))
49-
elif isinstance(value, dict):
50-
return AnyValue(
51-
kvlist_value=KeyValueList(
52-
values=[KeyValue(key=k, value=_anyvalue(v)) for k, v in value.items()]
53-
)
54-
)
55-
else:
56-
raise ValueError(f"Invalid value type for AnyValue: {type(value)}")
5735

5836
timestamp = Timestamp()
5937
timestamp.FromDatetime(trace_item["timestamp"])
@@ -70,7 +48,7 @@ def _anyvalue(value: Value) -> AnyValue:
7048
item_id=trace_item["trace_item_id"],
7149
received=received,
7250
retention_days=trace_item["retention_days"],
73-
attributes={k: _anyvalue(v) for k, v in trace_item["attributes"].items()},
51+
attributes={k: encode_value(v) for k, v in trace_item["attributes"].items()},
7452
client_sample_rate=trace_item["client_sample_rate"],
7553
server_sample_rate=trace_item["server_sample_rate"],
7654
)
@@ -99,4 +77,4 @@ def write_trace_items(trace_items: list[EAPTraceItem]) -> None:
9977
topic = get_topic_definition(Topic.SNUBA_ITEMS)["real_topic_name"]
10078
for trace_item in trace_items:
10179
payload = KafkaPayload(None, EAP_ITEMS_CODEC.encode(trace_item), [])
102-
eap_producer.produce(ArroyoTopic(topic), payload)
80+
eap_items_producer.produce(ArroyoTopic(topic), payload)

src/sentry/replays/lib/kafka.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,32 +1,11 @@
11
from arroyo.backends.kafka import KafkaPayload
22
from arroyo.types import Topic as ArroyoTopic
3-
from sentry_kafka_schemas.codecs import Codec
4-
from sentry_protos.snuba.v1.trace_item_pb2 import TraceItem
53

6-
from sentry.conf.types.kafka_definition import Topic, get_topic_codec
4+
from sentry.conf.types.kafka_definition import Topic
75
from sentry.utils.arroyo_producer import SingletonProducer, get_arroyo_producer
86
from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
97
from sentry.utils.pubsub import KafkaPublisher
108

11-
#
12-
# EAP PRODUCER
13-
#
14-
15-
16-
EAP_ITEMS_CODEC: Codec[TraceItem] = get_topic_codec(Topic.SNUBA_ITEMS)
17-
18-
19-
def _get_eap_items_producer():
20-
"""Get a Kafka producer for EAP TraceItems."""
21-
return get_arroyo_producer(
22-
name="sentry.replays.lib.kafka.eap_items",
23-
topic=Topic.SNUBA_ITEMS,
24-
)
25-
26-
27-
eap_producer = SingletonProducer(_get_eap_items_producer)
28-
29-
309
#
3110
# REPLAY PRODUCER
3211
#

src/sentry/spans/consumers/process_segments/convert.py

Lines changed: 9 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
1-
from typing import Any, cast
1+
from typing import cast
22

3-
import orjson
43
import sentry_sdk
54
from google.protobuf.timestamp_pb2 import Timestamp
65
from sentry_kafka_schemas.schema_types.buffered_segments_v1 import SpanLink
76
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
87
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem
98

109
from sentry.spans.consumers.process_segments.types import CompatibleSpan
11-
12-
I64_MAX = 2**63 - 1
10+
from sentry.utils.eap import encode_value
1311

1412
FIELD_TO_ATTRIBUTE = {
1513
"end_timestamp": "sentry.end_timestamp_precise",
@@ -41,7 +39,7 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
4139
continue
4240
try:
4341
# NOTE: This ignores the `type` field of the attribute itself
44-
attributes[k] = _anyvalue(value)
42+
attributes[k] = encode_value(value, True)
4543
except Exception:
4644
sentry_sdk.capture_exception()
4745
else:
@@ -58,12 +56,12 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
5856

5957
# For `is_segment`, we trust the value written by `flush_segments` over a pre-existing attribute:
6058
if (is_segment := span.get("is_segment")) is not None:
61-
attributes["sentry.is_segment"] = _anyvalue(is_segment)
59+
attributes["sentry.is_segment"] = encode_value(is_segment, True)
6260

6361
for field_name, attribute_name in FIELD_TO_ATTRIBUTE.items():
6462
attribute = span.get(field_name) # type:ignore[assignment]
6563
if attribute is not None:
66-
attributes[attribute_name] = _anyvalue(attribute)
64+
attributes[attribute_name] = encode_value(attribute, True)
6765

6866
# Rename some attributes from their sentry-conventions name to what the product currently expects.
6967
# Eventually this should all be handled by deprecation policies in sentry-conventions.
@@ -83,14 +81,16 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
8381
try:
8482
if attr in RENAME_ATTRIBUTES:
8583
attr = RENAME_ATTRIBUTES[attr]
86-
attributes[f"sentry._meta.fields.attributes.{attr}"] = _anyvalue({"meta": meta})
84+
attributes[f"sentry._meta.fields.attributes.{attr}"] = encode_value(
85+
{"meta": meta}, True
86+
)
8787
except Exception:
8888
sentry_sdk.capture_exception()
8989

9090
if links := span.get("links"):
9191
try:
9292
sanitized_links = [_sanitize_span_link(link) for link in links if link is not None]
93-
attributes["sentry.links"] = _anyvalue(sanitized_links)
93+
attributes["sentry.links"] = encode_value(sanitized_links, True)
9494
except Exception:
9595
sentry_sdk.capture_exception()
9696
attributes["sentry.dropped_links_count"] = AnyValue(int_value=len(links))
@@ -111,23 +111,6 @@ def convert_span_to_item(span: CompatibleSpan) -> TraceItem:
111111
)
112112

113113

114-
def _anyvalue(value: Any) -> AnyValue:
115-
if isinstance(value, str):
116-
return AnyValue(string_value=value)
117-
elif isinstance(value, bool):
118-
return AnyValue(bool_value=value)
119-
elif isinstance(value, int):
120-
if value > I64_MAX:
121-
return AnyValue(double_value=float(value))
122-
return AnyValue(int_value=value)
123-
elif isinstance(value, float):
124-
return AnyValue(double_value=value)
125-
elif isinstance(value, (list, dict)):
126-
return AnyValue(string_value=orjson.dumps(value).decode())
127-
128-
raise ValueError(f"Unknown value type: {type(value)}")
129-
130-
131114
def _timestamp(value: float) -> Timestamp:
132115
return Timestamp(
133116
seconds=int(value),

src/sentry/testutils/cases.py

Lines changed: 11 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
from sentry.users.models.useremail import UserEmail
151151
from sentry.utils import json
152152
from sentry.utils.auth import SsoSession
153+
from sentry.utils.eap import encode_value
153154
from sentry.utils.json import dumps_htmlsafe
154155
from sentry.utils.not_set import NOT_SET, NotSet, default_if_not_set
155156
from sentry.utils.samples import load_data
@@ -3316,20 +3317,6 @@ class _OptionalOurLogData(TypedDict, total=False):
33163317
item_id: int
33173318

33183319

3319-
def scalar_to_any_value(value: Any) -> AnyValue:
3320-
if isinstance(value, str):
3321-
return AnyValue(string_value=value)
3322-
if isinstance(value, int):
3323-
return AnyValue(int_value=value)
3324-
if isinstance(value, float):
3325-
return AnyValue(double_value=value)
3326-
if isinstance(value, bool):
3327-
return AnyValue(bool_value=value)
3328-
if isinstance(value, dict):
3329-
return AnyValue(**value)
3330-
raise Exception(f"cannot convert {value} of type {type(value)} to AnyValue")
3331-
3332-
33333320
def span_to_trace_item(span) -> TraceItem:
33343321
client_sample_rate = 1.0
33353322
server_sample_rate = 1.0
@@ -3339,15 +3326,15 @@ def span_to_trace_item(span) -> TraceItem:
33393326
for k, v in span.get(field, {}).items():
33403327
if v is None:
33413328
continue
3342-
attributes[k] = scalar_to_any_value(v)
3329+
attributes[k] = encode_value(v)
33433330

33443331
for k, v in span.get("sentry_tags", {}).items():
33453332
if v is None:
33463333
continue
33473334
if k == "description":
33483335
k = "normalized_description"
33493336

3350-
attributes[f"sentry.{k}"] = scalar_to_any_value(v)
3337+
attributes[f"sentry.{k}"] = encode_value(v)
33513338

33523339
for k, v in span.get("measurements", {}).items():
33533340
if v is None or v["value"] is None:
@@ -3357,10 +3344,10 @@ def span_to_trace_item(span) -> TraceItem:
33573344
elif k == "server_sample_rate":
33583345
server_sample_rate = v["value"]
33593346
else:
3360-
attributes[k] = scalar_to_any_value(float(v["value"]))
3347+
attributes[k] = encode_value(float(v["value"]))
33613348

33623349
if "description" in span and span["description"] is not None:
3363-
description = scalar_to_any_value(span["description"])
3350+
description = encode_value(span["description"])
33643351
attributes["sentry.raw_description"] = description
33653352

33663353
for field in {
@@ -3382,7 +3369,7 @@ def span_to_trace_item(span) -> TraceItem:
33823369
double_value=float(is_segment),
33833370
)
33843371
else:
3385-
value = scalar_to_any_value(span[field])
3372+
value = encode_value(span[field])
33863373
attributes[f"sentry.{field}"] = value
33873374

33883375
timestamp = Timestamp()
@@ -3453,10 +3440,10 @@ def create_ourlog(
34533440
attributes_proto = {}
34543441

34553442
for k, v in attributes.items():
3456-
attributes_proto[k] = scalar_to_any_value(v)
3443+
attributes_proto[k] = encode_value(v)
34573444

34583445
for k, v in extra_data.items():
3459-
attributes_proto[f"sentry.{k}"] = scalar_to_any_value(v)
3446+
attributes_proto[f"sentry.{k}"] = encode_value(v)
34603447

34613448
timestamp_proto = Timestamp()
34623449

@@ -3532,7 +3519,7 @@ def create_trace_metric(
35323519

35333520
if attributes:
35343521
for k, v in attributes.items():
3535-
attributes_proto[k] = scalar_to_any_value(v)
3522+
attributes_proto[k] = encode_value(v)
35363523

35373524
return TraceItem(
35383525
organization_id=organization.id,
@@ -3577,7 +3564,7 @@ def create_profile_function(
35773564

35783565
if attributes:
35793566
for k, v in attributes.items():
3580-
attributes_proto[k] = scalar_to_any_value(v)
3567+
attributes_proto[k] = encode_value(v)
35813568

35823569
return TraceItem(
35833570
organization_id=organization.id,
@@ -3892,7 +3879,7 @@ def create_eap_uptime_result(
38923879
attributes_proto = {}
38933880
for k, v in attributes_data.items():
38943881
if v is not None:
3895-
attributes_proto[k] = scalar_to_any_value(v)
3882+
attributes_proto[k] = encode_value(v)
38963883

38973884
timestamp_proto = Timestamp()
38983885
timestamp_proto.FromDatetime(scheduled_check_time)

0 commit comments

Comments
 (0)