Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 94 additions & 24 deletions snuba/web/rpc/v1/endpoint_export_trace_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from snuba.query import OrderBy, OrderByDirection, SelectedExpression
from snuba.query.data_source.simple import Entity
from snuba.query.dsl import Functions as f
from snuba.query.dsl import column, literal
from snuba.query.dsl import and_cond, column, literal, or_cond
from snuba.query.expressions import FunctionCall
from snuba.query.logical import Query
from snuba.query.query_settings import HTTPQuerySettings
Expand Down Expand Up @@ -121,7 +121,8 @@ def to_protobuf(self) -> PageToken:
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_project_id", type=AttributeKey.Type.TYPE_INT
name="last_seen_project_id",
type=AttributeKey.Type.TYPE_INT,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_int=self.last_seen_project_id),
Expand All @@ -130,7 +131,8 @@ def to_protobuf(self) -> PageToken:
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_item_type", type=AttributeKey.Type.TYPE_INT
name="last_seen_item_type",
type=AttributeKey.Type.TYPE_INT,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_int=self.last_seen_item_type),
Expand All @@ -139,7 +141,8 @@ def to_protobuf(self) -> PageToken:
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_timestamp", type=AttributeKey.Type.TYPE_DOUBLE
name="last_seen_timestamp",
type=AttributeKey.Type.TYPE_DOUBLE,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_double=self.last_seen_timestamp),
Expand All @@ -148,7 +151,8 @@ def to_protobuf(self) -> PageToken:
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_trace_id", type=AttributeKey.Type.TYPE_STRING
name="last_seen_trace_id",
type=AttributeKey.Type.TYPE_STRING,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=self.last_seen_trace_id),
Expand All @@ -157,7 +161,8 @@ def to_protobuf(self) -> PageToken:
TraceItemFilter(
comparison_filter=ComparisonFilter(
key=AttributeKey(
name="last_seen_item_id", type=AttributeKey.Type.TYPE_STRING
name="last_seen_item_id",
type=AttributeKey.Type.TYPE_STRING,
),
op=ComparisonFilter.OP_EQUALS,
value=AttributeValue(val_str=hex(self.last_seen_item_id)),
Expand All @@ -170,7 +175,9 @@ def to_protobuf(self) -> PageToken:


def _build_query(
in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None
in_msg: ExportTraceItemsRequest,
limit: int,
page_token: ExportTraceItemsPageToken | None = None,
) -> Query:
selected_columns = [
SelectedExpression("timestamp", f.toUnixTimestamp(column("timestamp"), alias="timestamp")),
Expand All @@ -190,10 +197,12 @@ def _build_query(
SelectedExpression("project_id", column("project_id", alias="project_id")),
SelectedExpression("item_type", column("item_type", alias="item_type")),
SelectedExpression(
"client_sample_rate", column("client_sample_rate", alias="client_sample_rate")
"client_sample_rate",
column("client_sample_rate", alias="client_sample_rate"),
),
SelectedExpression(
"server_sample_rate", column("server_sample_rate", alias="server_sample_rate")
"server_sample_rate",
column("server_sample_rate", alias="server_sample_rate"),
),
SelectedExpression("sampling_weight", column("sampling_weight", alias="sampling_weight")),
SelectedExpression("sampling_factor", column("sampling_factor", alias="sampling_factor")),
Expand Down Expand Up @@ -227,20 +236,79 @@ def _build_query(

page_token_filter = (
[
f.greater(
f.tuple(
column("project_id"),
column("item_type"),
column("timestamp"),
column("trace_id"),
f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))),
),
f.tuple(
literal(page_token.last_seen_project_id),
literal(page_token.last_seen_item_type),
literal(page_token.last_seen_timestamp),
literal(page_token.last_seen_trace_id),
literal(page_token.last_seen_item_id),
or_cond(
# (project_id > page_token.last_seen_project_id)
f.greater(column("project_id"), literal(page_token.last_seen_project_id)),
or_cond(
# (project_id = page_token.last_seen_project_id AND item_type > page_token.last_seen_item_type)
and_cond(
f.equals(
column("project_id"),
literal(page_token.last_seen_project_id),
),
f.greater(column("item_type"), literal(page_token.last_seen_item_type)),
),
or_cond(
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp > page_token.last_seen_timestamp)
and_cond(
f.equals(
column("project_id"),
literal(page_token.last_seen_project_id),
),
f.equals(
column("item_type"),
literal(page_token.last_seen_item_type),
),
f.greater(
column("timestamp"),
literal(page_token.last_seen_timestamp),
),
),
or_cond(
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id > page_token.last_seen_trace_id)
and_cond(
f.equals(
column("project_id"),
literal(page_token.last_seen_project_id),
),
f.equals(
column("item_type"),
literal(page_token.last_seen_item_type),
),
f.equals(
column("timestamp"),
literal(page_token.last_seen_timestamp),
),
f.greater(
column("trace_id"),
literal(page_token.last_seen_trace_id),
),
),
# (project_id = page_token.last_seen_project_id AND item_type = page_token.last_seen_item_type AND timestamp = page_token.last_seen_timestamp AND trace_id = page_token.last_seen_trace_id AND item_id > page_token.last_seen_item_id)
and_cond(
f.equals(
column("project_id"),
literal(page_token.last_seen_project_id),
),
f.equals(
column("item_type"),
literal(page_token.last_seen_item_type),
),
f.equals(
column("timestamp"),
literal(page_token.last_seen_timestamp),
),
f.equals(
column("trace_id"),
literal(page_token.last_seen_trace_id),
),
f.greater(
f.reinterpretAsUInt128(f.reverse(f.unhex(column("item_id")))),
literal(page_token.last_seen_item_id),
),
),
),
),
),
)
]
Expand Down Expand Up @@ -269,7 +337,9 @@ def _build_query(


def _build_snuba_request(
in_msg: ExportTraceItemsRequest, limit: int, page_token: ExportTraceItemsPageToken | None = None
in_msg: ExportTraceItemsRequest,
limit: int,
page_token: ExportTraceItemsPageToken | None = None,
) -> SnubaRequest:
query_settings = setup_trace_query_settings() if in_msg.meta.debug else HTTPQuerySettings()
query_settings.set_skip_transform_order_by(True)
Expand Down
45 changes: 45 additions & 0 deletions tests/web/rpc/v1/test_endpoint_export_trace_items.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,51 @@ def test_with_pagination(self, setup_teardown: Any) -> None:

assert len(items) == _SPAN_COUNT + _LOG_COUNT

def test_pagination_with_128_bit_item_id(self, eap: Any, redis_db: Any) -> None:
num_items = 120
trace_id = uuid.uuid4().hex
items_data = [
gen_item_message(
start_timestamp=BASE_TIME,
trace_id=trace_id,
item_id=uuid.uuid4().int.to_bytes(16, byteorder="little"),
project_id=1,
)
for i in range(num_items)
]
items_storage = get_storage(StorageKey("eap_items"))
write_raw_unprocessed_events(items_storage, items_data) # type: ignore

message = ExportTraceItemsRequest(
meta=RequestMeta(
project_ids=[1],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(seconds=int(BASE_TIME.timestamp())),
end_timestamp=Timestamp(
seconds=int((BASE_TIME + timedelta(seconds=1)).timestamp())
),
),
limit=1,
)
items: list[TraceItem] = []
seen_item_ids: set[bytes] = set()
while True:
response = EndpointExportTraceItems().execute(message)
for item in response.trace_items:
assert item.item_id not in seen_item_ids, (
f"item_id {item.item_id.hex()} returned more than once, "
f"pagination is not making progress"
)
seen_item_ids.add(item.item_id)
items.extend(response.trace_items)
if response.page_token.end_pagination:
break
message.page_token.CopyFrom(response.page_token)

assert len(items) == num_items

def test_no_transformation_on_order_by(self, setup_teardown: Any, monkeypatch: Any) -> None:
# Wrap the real run_query to capture the actual QueryResult while still hitting ClickHouse.
captured: dict[str, Any] = {}
Expand Down
Loading