From ce8b66c83401e8e2e4bd11a0622b87691b2c5f59 Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Tue, 24 Feb 2026 15:55:37 -0800 Subject: [PATCH 1/2] fix(gdpr): 128-bit item_id --- .../physical/hexint_column_processor.py | 3 ++ .../processors/physical/type_converters.py | 30 +++++++++++++ .../v1/test_endpoint_export_trace_items.py | 45 +++++++++++++++++++ 3 files changed, 78 insertions(+) diff --git a/snuba/query/processors/physical/hexint_column_processor.py b/snuba/query/processors/physical/hexint_column_processor.py index 61f3ed2e70..db5354069b 100644 --- a/snuba/query/processors/physical/hexint_column_processor.py +++ b/snuba/query/processors/physical/hexint_column_processor.py @@ -26,6 +26,9 @@ def __init__(self, columns: Set[str], size: int = 16) -> None: def _translate_literal(self, exp: Literal) -> Literal: try: + if isinstance(exp.value, int): + return exp + assert isinstance(exp.value, str) # 128 bit integers in clickhouse need to be referenced as strings if self._size == 32: diff --git a/snuba/query/processors/physical/type_converters.py b/snuba/query/processors/physical/type_converters.py index 5c4005de5c..7a458096ba 100644 --- a/snuba/query/processors/physical/type_converters.py +++ b/snuba/query/processors/physical/type_converters.py @@ -201,6 +201,36 @@ def assert_literal(lit: Expression) -> Literal: ), ) + if ( + isinstance(exp, FunctionCall) + and exp.function_name + in { + ConditionFunctions.GT, + ConditionFunctions.GTE, + ConditionFunctions.LT, + ConditionFunctions.LTE, + ConditionFunctions.EQ, + ConditionFunctions.NEQ, + } + and len(exp.parameters) == 2 + and isinstance(exp.parameters[0], FunctionCall) + and exp.parameters[0].function_name == "tuple" + and isinstance(exp.parameters[1], FunctionCall) + and exp.parameters[1].function_name == "tuple" + ): + lhs_params = exp.parameters[0].parameters + rhs_params = list(exp.parameters[1].parameters) + for i, (lhs_elem, rhs_elem) in enumerate(zip(lhs_params, rhs_params)): + # check if this LHS element references a tracked column + if any( + isinstance(node, Column) and node.column_name in self.columns + for node in lhs_elem + ): + if isinstance(rhs_elem, Literal): + rhs_params[i] = self._translate_literal(rhs_elem) + new_rhs = FunctionCall(exp.parameters[1].alias, "tuple", tuple(rhs_params)) + return FunctionCall(exp.alias, exp.function_name, (exp.parameters[0], new_rhs)) + return exp @abstractmethod diff --git a/tests/web/rpc/v1/test_endpoint_export_trace_items.py b/tests/web/rpc/v1/test_endpoint_export_trace_items.py index a481e99942..5db04cbf9c 100644 --- a/tests/web/rpc/v1/test_endpoint_export_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_export_trace_items.py @@ -128,6 +128,51 @@ def test_with_pagination(self, setup_teardown: Any) -> None: assert len(items) == _SPAN_COUNT + _LOG_COUNT + def test_pagination_with_128_bit_item_id(self, eap: Any, redis_db: Any) -> None: + num_items = 120 + trace_id = uuid.uuid4().hex + items_data = [ + gen_item_message( + start_timestamp=BASE_TIME, + trace_id=trace_id, + item_id=uuid.uuid4().int.to_bytes(16, byteorder="little"), + project_id=1, + ) + for i in range(num_items) + ] + items_storage = get_storage(StorageKey("eap_items")) + write_raw_unprocessed_events(items_storage, items_data) # type: ignore + + message = ExportTraceItemsRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int((BASE_TIME + timedelta(seconds=1)).timestamp()) + ), + ), + limit=1, + ) + items: list[TraceItem] = [] + seen_item_ids: set[bytes] = set() + while True: + response = EndpointExportTraceItems().execute(message) + for item in response.trace_items: + assert item.item_id not in seen_item_ids, ( + f"item_id {item.item_id.hex()} returned more than once, " + f"pagination is not making progress" + ) + seen_item_ids.add(item.item_id) + items.extend(response.trace_items) + if response.page_token.end_pagination: + break + message.page_token.CopyFrom(response.page_token) + + assert len(items) == num_items + def test_no_transformation_on_order_by(self, setup_teardown: Any, monkeypatch: Any) -> None: # Wrap the real run_query to capture the actual QueryResult while still hitting ClickHouse. captured: dict[str, Any] = {} From 01eef19c9e0e76e8e55f705b3653bea31825f60b Mon Sep 17 00:00:00 2001 From: Rachel Chen Date: Tue, 24 Feb 2026 15:58:41 -0800 Subject: [PATCH 2/2] no tuples --- .../physical/hexint_column_processor.py | 3 - .../processors/physical/type_converters.py | 30 ----- .../web/rpc/v1/endpoint_export_trace_items.py | 118 ++++++++++++++---- 3 files changed, 94 insertions(+), 57 deletions(-) diff --git a/snuba/query/processors/physical/hexint_column_processor.py b/snuba/query/processors/physical/hexint_column_processor.py index db5354069b..61f3ed2e70 100644 --- a/snuba/query/processors/physical/hexint_column_processor.py +++ b/snuba/query/processors/physical/hexint_column_processor.py @@ -26,9 +26,6 @@ def __init__(self, columns: Set[str], size: int = 16) -> None: def _translate_literal(self, exp: Literal) -> Literal: try: - if isinstance(exp.value, int): - return exp - assert isinstance(exp.value, str) # 128 bit integers in clickhouse need to be referenced as strings if self._size == 32: diff --git a/snuba/query/processors/physical/type_converters.py b/snuba/query/processors/physical/type_converters.py index 7a458096ba..5c4005de5c 100644 --- a/snuba/query/processors/physical/type_converters.py +++ b/snuba/query/processors/physical/type_converters.py @@ -201,36 +201,6 @@ def assert_literal(lit: Expression) -> Literal: ), ) - if ( - isinstance(exp, FunctionCall) - and exp.function_name - in { - ConditionFunctions.GT, - ConditionFunctions.GTE, - ConditionFunctions.LT, - ConditionFunctions.LTE, - ConditionFunctions.EQ, - ConditionFunctions.NEQ, - } - and len(exp.parameters) == 2 - and isinstance(exp.parameters[0], FunctionCall) - and exp.parameters[0].function_name == "tuple" - and isinstance(exp.parameters[1], FunctionCall) - and exp.parameters[1].function_name == "tuple" - ): - lhs_params = exp.parameters[0].parameters - rhs_params = list(exp.parameters[1].parameters) - for i, (lhs_elem, rhs_elem) in enumerate(zip(lhs_params, rhs_params)): - # check if this LHS element references a tracked column - if any( - isinstance(node, Column) and node.column_name in self.columns - for node in lhs_elem - ): - if isinstance(rhs_elem, Literal): - rhs_params[i] = self._translate_literal(rhs_elem) - new_rhs = FunctionCall(exp.parameters[1].alias, "tuple", tuple(rhs_params)) - return FunctionCall(exp.alias, exp.function_name, (exp.parameters[0], new_rhs)) - return exp @abstractmethod diff --git a/snuba/web/rpc/v1/endpoint_export_trace_items.py b/snuba/web/rpc/v1/endpoint_export_trace_items.py index 9f8d57976a..c5a66d327a 100644 --- a/snuba/web/rpc/v1/endpoint_export_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_export_trace_items.py @@ -20,7 +20,7 @@ from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f -from snuba.query.dsl import column, literal +from snuba.query.dsl import and_cond, column, literal, or_cond from snuba.query.expressions import FunctionCall from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -121,7 +121,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_project_id", type=AttributeKey.Type.TYPE_INT + name="last_seen_project_id", + type=AttributeKey.Type.TYPE_INT, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_int=self.last_seen_project_id), @@ -130,7 +131,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_item_type", type=AttributeKey.Type.TYPE_INT + name="last_seen_item_type", + type=AttributeKey.Type.TYPE_INT, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_int=self.last_seen_item_type), @@ -139,7 +141,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_timestamp", type=AttributeKey.Type.TYPE_DOUBLE + name="last_seen_timestamp", + type=AttributeKey.Type.TYPE_DOUBLE, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_double=self.last_seen_timestamp), @@ -148,7 +151,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_trace_id", type=AttributeKey.Type.TYPE_STRING + name="last_seen_trace_id", + type=AttributeKey.Type.TYPE_STRING, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_str=self.last_seen_trace_id), @@ -157,7 +161,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_item_id", type=AttributeKey.Type.TYPE_STRING + name="last_seen_item_id", + type=AttributeKey.Type.TYPE_STRING, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_str=hex(self.last_seen_item_id)), @@ -170,7 +175,9 @@ def to_protobuf(self) -> PageToken: def _build_query( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, ) -> Query: selected_columns = [ SelectedExpression("timestamp", f.toUnixTimestamp(column("timestamp"), alias="timestamp")), @@ -190,10 +197,12 @@ def _build_query( SelectedExpression("project_id", column("project_id", alias="project_id")), SelectedExpression("item_type", column("item_type", alias="item_type")), SelectedExpression( - "client_sample_rate", column("client_sample_rate", alias="client_sample_rate") + "client_sample_rate", + column("client_sample_rate", alias="client_sample_rate"), ), SelectedExpression( - "server_sample_rate", column("server_sample_rate", alias="server_sample_rate") + "server_sample_rate", + column("server_sample_rate", alias="server_sample_rate"), ), SelectedExpression("sampling_weight", column("sampling_weight", alias="sampling_weight")), SelectedExpression("sampling_factor", column("sampling_factor", alias="sampling_factor")), @@ -227,20 +236,79 @@ def _build_query( page_token_filter = ( [ - f.greater( - f.tuple( - column("project_id"), - column("item_type"), - column("timestamp"), - column("trace_id"), - f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))), - ), - f.tuple( - literal(page_token.last_seen_project_id), - literal(page_token.last_seen_item_type), - literal(page_token.last_seen_timestamp), - literal(page_token.last_seen_trace_id), - literal(page_token.last_seen_item_id), + or_cond( + # (project_id > page_token.last_seen_project_id) + f.greater(column("project_id"), literal(page_token.last_seen_project_id)), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type > page_token.last_seen_item_type) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.greater(column("item_type"), literal(page_token.last_seen_item_type)), + ), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp > page_token.last_seen_timestamp) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.greater( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + ), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id > page_token.last_seen_trace_id) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.equals( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + f.greater( + column("trace_id"), + literal(page_token.last_seen_trace_id), + ), + ), + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id = page_token.last_seen_trace_id AND item_id > page_token.last_seen_item_id) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.equals( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + f.equals( + column("trace_id"), + literal(page_token.last_seen_trace_id), + ), + f.greater( + f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))), + literal(page_token.last_seen_item_id), + ), + ), + ), + ), ), ) ] @@ -269,7 +337,9 @@ def _build_query( def _build_snuba_request( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, ) -> SnubaRequest: query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings() query_settings.set_skip_transform_order_by(True)