diff --git a/snuba/web/rpc/v1/endpoint_export_trace_items.py b/snuba/web/rpc/v1/endpoint_export_trace_items.py index 9f8d57976a8..c5a66d327ae 100644 --- a/snuba/web/rpc/v1/endpoint_export_trace_items.py +++ b/snuba/web/rpc/v1/endpoint_export_trace_items.py @@ -20,7 +20,7 @@ from snuba.query import OrderBy, OrderByDirection, SelectedExpression from snuba.query.data_source.simple import Entity from snuba.query.dsl import Functions as f -from snuba.query.dsl import column, literal +from snuba.query.dsl import and_cond, column, literal, or_cond from snuba.query.expressions import FunctionCall from snuba.query.logical import Query from snuba.query.query_settings import HTTPQuerySettings @@ -121,7 +121,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_project_id", type=AttributeKey.Type.TYPE_INT + name="last_seen_project_id", + type=AttributeKey.Type.TYPE_INT, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_int=self.last_seen_project_id), @@ -130,7 +131,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_item_type", type=AttributeKey.Type.TYPE_INT + name="last_seen_item_type", + type=AttributeKey.Type.TYPE_INT, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_int=self.last_seen_item_type), @@ -139,7 +141,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_timestamp", type=AttributeKey.Type.TYPE_DOUBLE + name="last_seen_timestamp", + type=AttributeKey.Type.TYPE_DOUBLE, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_double=self.last_seen_timestamp), @@ -148,7 +151,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_trace_id", type=AttributeKey.Type.TYPE_STRING + name="last_seen_trace_id", + type=AttributeKey.Type.TYPE_STRING, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_str=self.last_seen_trace_id), @@ -157,7 +161,8 @@ def to_protobuf(self) -> PageToken: TraceItemFilter( comparison_filter=ComparisonFilter( key=AttributeKey( - name="last_seen_item_id", type=AttributeKey.Type.TYPE_STRING + name="last_seen_item_id", + type=AttributeKey.Type.TYPE_STRING, ), op=ComparisonFilter.OP_EQUALS, value=AttributeValue(val_str=hex(self.last_seen_item_id)), @@ -170,7 +175,9 @@ def to_protobuf(self) -> PageToken: def _build_query( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, ) -> Query: selected_columns = [ SelectedExpression("timestamp", f.toUnixTimestamp(column("timestamp"), alias="timestamp")), @@ -190,10 +197,12 @@ def _build_query( SelectedExpression("project_id", column("project_id", alias="project_id")), SelectedExpression("item_type", column("item_type", alias="item_type")), SelectedExpression( - "client_sample_rate", column("client_sample_rate", alias="client_sample_rate") + "client_sample_rate", + column("client_sample_rate", alias="client_sample_rate"), ), SelectedExpression( - "server_sample_rate", column("server_sample_rate", alias="server_sample_rate") + "server_sample_rate", + column("server_sample_rate", alias="server_sample_rate"), ), SelectedExpression("sampling_weight", column("sampling_weight", alias="sampling_weight")), SelectedExpression("sampling_factor", column("sampling_factor", alias="sampling_factor")), @@ -227,20 +236,79 @@ def _build_query( page_token_filter = ( [ - f.greater( - f.tuple( - column("project_id"), - column("item_type"), - column("timestamp"), - column("trace_id"), - f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))), - ), - f.tuple( - literal(page_token.last_seen_project_id), - literal(page_token.last_seen_item_type), - literal(page_token.last_seen_timestamp), - literal(page_token.last_seen_trace_id), - literal(page_token.last_seen_item_id), + or_cond( + # (project_id > page_token.last_seen_project_id) + f.greater(column("project_id"), literal(page_token.last_seen_project_id)), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type > page_token.last_seen_item_type) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.greater(column("item_type"), literal(page_token.last_seen_item_type)), + ), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp > page_token.last_seen_timestamp) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.greater( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + ), + or_cond( + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id > page_token.last_seen_trace_id) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.equals( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + f.greater( + column("trace_id"), + literal(page_token.last_seen_trace_id), + ), + ), + # (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id = page_token.last_seen_trace_id AND item_id > page_token.last_seen_item_id) + and_cond( + f.equals( + column("project_id"), + literal(page_token.last_seen_project_id), + ), + f.equals( + column("item_type"), + literal(page_token.last_seen_item_type), + ), + f.equals( + column("timestamp"), + literal(page_token.last_seen_timestamp), + ), + f.equals( + column("trace_id"), + literal(page_token.last_seen_trace_id), + ), + f.greater( + f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))), + literal(page_token.last_seen_item_id), + ), + ), + ), + ), ), ) ] @@ -269,7 +337,9 @@ def _build_query( def _build_snuba_request( - in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None + in_msg: ExportTraceItemsRequest, + limit: int, + page_token: ExportTraceItemsPageToken | None = None, ) -> SnubaRequest: query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings() query_settings.set_skip_transform_order_by(True) diff --git a/tests/web/rpc/v1/test_endpoint_export_trace_items.py b/tests/web/rpc/v1/test_endpoint_export_trace_items.py index a481e999427..5db04cbf9ca 100644 --- a/tests/web/rpc/v1/test_endpoint_export_trace_items.py +++ b/tests/web/rpc/v1/test_endpoint_export_trace_items.py @@ -128,6 +128,51 @@ def test_with_pagination(self, setup_teardown: Any) -> None: assert len(items) == _SPAN_COUNT + _LOG_COUNT + def test_pagination_with_128_bit_item_id(self, eap: Any, redis_db: Any) -> None: + num_items = 120 + trace_id = uuid.uuid4().hex + items_data = [ + gen_item_message( + start_timestamp=BASE_TIME, + trace_id=trace_id, + item_id=uuid.uuid4().int.to_bytes(16, byteorder="little"), + project_id=1, + ) + for i in range(num_items) + ] + items_storage = get_storage(StorageKey("eap_items")) + write_raw_unprocessed_events(items_storage, items_data) # type: ignore + + message = ExportTraceItemsRequest( + meta=RequestMeta( + project_ids=[1], + organization_id=1, + cogs_category="something", + referrer="something", + start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())), + end_timestamp=Timestamp( + seconds=int((BASE_TIME + timedelta(seconds=1)).timestamp()) + ), + ), + limit=1, + ) + items: list[TraceItem] = [] + seen_item_ids: set[bytes] = set() + while True: + response = EndpointExportTraceItems().execute(message) + for item in response.trace_items: + assert item.item_id not in seen_item_ids, ( + f"item_id {item.item_id.hex()} returned more than once, " + f"pagination is not making progress" + ) + seen_item_ids.add(item.item_id) + items.extend(response.trace_items) + if response.page_token.end_pagination: + break + message.page_token.CopyFrom(response.page_token) + + assert len(items) == num_items + def test_no_transformation_on_order_by(self, setup_teardown: Any, monkeypatch: Any) -> None: # Wrap the real run_query to capture the actual QueryResult while still hitting ClickHouse. captured: dict[str, Any] = {}