diff --git a/quickwit/quickwit-jaeger/src/lib.rs b/quickwit/quickwit-jaeger/src/lib.rs index 340ce7eb582..1b6dfc27d0c 100644 --- a/quickwit/quickwit-jaeger/src/lib.rs +++ b/quickwit/quickwit-jaeger/src/lib.rs @@ -18,7 +18,6 @@ use std::ops::{Bound, RangeInclusive}; use std::sync::Arc; use std::time::Instant; -use async_trait::async_trait; use itertools::{Either, Itertools}; use prost::Message; use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp}; @@ -26,13 +25,11 @@ use quickwit_config::JaegerConfig; use quickwit_opentelemetry::otlp::{ Event as QwEvent, Link as QwLink, OTEL_TRACES_INDEX_ID, Span as QwSpan, SpanFingerprint, SpanId, SpanKind as QwSpanKind, SpanStatus as QwSpanStatus, TraceId, - extract_otel_traces_index_id_patterns_from_metadata, }; use quickwit_proto::jaeger::api_v2::{ KeyValue as JaegerKeyValue, Log as JaegerLog, Process as JaegerProcess, Span as JaegerSpan, SpanRef as JaegerSpanRef, SpanRefType as JaegerSpanRefType, ValueType, }; -use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; use quickwit_proto::jaeger::storage::v1::{ FindTraceIDsRequest, FindTraceIDsResponse, FindTracesRequest, GetOperationsRequest, GetOperationsResponse, GetServicesRequest, GetServicesResponse, GetTraceRequest, Operation, @@ -50,22 +47,27 @@ use time::OffsetDateTime; use time::format_description::well_known::Rfc3339; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; -use tonic::{Request, Response, Status}; +use tonic::Status; use tracing::field::Empty; use tracing::{Span as RuntimeSpan, debug, error, instrument, warn}; -use crate::metrics::JAEGER_SERVICE_METRICS; +pub(crate) use crate::metrics::JAEGER_SERVICE_METRICS; mod metrics; +mod v1; +mod v2; // OpenTelemetry to Jaeger Transformation // type TimeIntervalSecs = RangeInclusive; -type JaegerResult = Result; +pub(crate) type JaegerResult = Result; -type SpanStream = ReceiverStream>; +pub(crate) type SpanStream = ReceiverStream>; + +pub(crate) type TracesDataStream = + ReceiverStream>; #[derive(Clone)] pub struct JaegerService { @@ -88,34 +90,16 @@ impl JaegerService { #[instrument("get_services", skip_all)] pub async fn get_services_for_indexes( &self, - request: GetServicesRequest, + _request: GetServicesRequest, index_id_patterns: Vec, ) -> JaegerResult { - debug!(request=?request, index_ids=?index_id_patterns, "`get_services` request"); - - let max_hits = Some(1_000); - let start_timestamp = - Some(OffsetDateTime::now_utc().unix_timestamp() - self.lookback_period_secs); - - let search_request = ListTermsRequest { + let services = get_services_impl( + self.search_service.clone(), + self.lookback_period_secs, index_id_patterns, - field: "service_name".to_string(), - max_hits, - start_timestamp, - end_timestamp: None, - start_key: None, - end_key: None, - }; - let search_response = self.search_service.root_list_terms(search_request).await?; - let services: Vec = search_response - .terms - .into_iter() - .map(|term_bytes| extract_term(&term_bytes)) - .sorted() - .collect(); - let response = GetServicesResponse { services }; - debug!(response=?response, "`get_services` response"); - Ok(response) + ) + .await?; + Ok(GetServicesResponse { services }) } #[instrument("get_operations", skip_all, fields(service=%request.service, span_kind=%request.span_kind))] @@ -124,38 +108,18 @@ impl JaegerService { request: GetOperationsRequest, index_id_patterns: Vec, ) -> JaegerResult { - debug!(request=?request, request=?request, index_ids=?index_id_patterns, "`get_operations` request"); - - let max_hits = Some(1_000); - let start_timestamp = - Some(OffsetDateTime::now_utc().unix_timestamp() - self.lookback_period_secs); - - let span_kind_opt = request.span_kind.parse().ok(); - let start_key = SpanFingerprint::start_key(&request.service, span_kind_opt.clone()); - let end_key = SpanFingerprint::end_key(&request.service, span_kind_opt); - - let search_request = ListTermsRequest { + let operations = get_operations_impl( + self.search_service.clone(), + self.lookback_period_secs, + request.service, + request.span_kind, index_id_patterns, - field: "span_fingerprint".to_string(), - max_hits, - start_timestamp, - end_timestamp: None, - start_key, - end_key, - }; - let search_response = self.search_service.root_list_terms(search_request).await?; - let operations: Vec = search_response - .terms - .into_iter() - .map(|term_json| extract_operation(&term_json)) - .sorted() - .collect(); - debug!(operations=?operations, "`get_operations` response"); - let response = GetOperationsResponse { + ) + .await?; + Ok(GetOperationsResponse { operations, operation_names: Vec::new(), // `operation_names` is deprecated. - }; - Ok(response) + }) } // Instrumentation happens in `find_trace_ids`. @@ -247,50 +211,81 @@ impl JaegerService { trace_query: TraceQueryParameters, index_id_patterns: Vec, ) -> Result<(Vec, TimeIntervalSecs), Status> { - let span_kind_opt = None; - let min_span_start_timestamp_secs_opt = trace_query.start_time_min.map(|ts| ts.seconds); - let max_span_start_timestamp_secs_opt = trace_query.start_time_max.map(|ts| ts.seconds); - let min_span_duration_millis_opt = trace_query + let min_start_secs = trace_query.start_time_min.map(|ts| ts.seconds); + let max_start_secs = trace_query.start_time_max.map(|ts| ts.seconds); + let min_duration_millis = trace_query .duration_min .and_then(|d| to_duration_millis(&d)); - let max_span_duration_millis_opt = trace_query + let max_duration_millis = trace_query .duration_max .and_then(|d| to_duration_millis(&d)); - let query = build_search_query( + + find_trace_ids_common( + self.search_service.clone(), &trace_query.service_name, - span_kind_opt, &trace_query.operation_name, trace_query.tags, - min_span_start_timestamp_secs_opt, - max_span_start_timestamp_secs_opt, - min_span_duration_millis_opt, - max_span_duration_millis_opt, - ); - let query_ast = - serde_json::to_string(&query).map_err(|err| Status::internal(err.to_string()))?; - let aggregation_query = build_aggregations_query(trace_query.num_traces as usize); - let max_hits = 0; - let search_request = SearchRequest { + min_start_secs, + max_start_secs, + min_duration_millis, + max_duration_millis, + trace_query.num_traces as usize, index_id_patterns, - query_ast, - aggregation_request: Some(aggregation_query), - max_hits, - start_timestamp: min_span_start_timestamp_secs_opt, - end_timestamp: max_span_start_timestamp_secs_opt, - count_hits: CountHits::Underestimate.into(), - ..Default::default() - }; - let search_response = self.search_service.root_search(search_request).await?; - - let Some(agg_result_postcard) = search_response.aggregation_postcard else { - debug!("the query matched no traces"); - return Ok((Vec::new(), 0..=0)); - }; - let trace_ids = collect_trace_ids(&agg_result_postcard)?; - debug!("the query matched {} traces.", trace_ids.0.len()); - Ok(trace_ids) + ) + .await } +} +#[instrument("find_trace_ids_common", skip_all)] +#[allow(clippy::too_many_arguments)] +pub(crate) async fn find_trace_ids_common( + search_service: Arc, + service_name: &str, + operation_name: &str, + tags: HashMap, + min_start_secs: Option, + max_start_secs: Option, + min_duration_millis: Option, + max_duration_millis: Option, + num_traces: usize, + index_id_patterns: Vec, +) -> Result<(Vec, TimeIntervalSecs), Status> { + let query_ast = build_search_query( + service_name, + None, + operation_name, + tags, + min_start_secs, + max_start_secs, + min_duration_millis, + max_duration_millis, + ); + + let search_request = SearchRequest { + index_id_patterns, + query_ast: serde_json::to_string(&query_ast) + .map_err(|err| Status::internal(err.to_string()))?, + aggregation_request: Some(build_aggregations_query(num_traces)), + max_hits: 0, + start_timestamp: min_start_secs, + end_timestamp: max_start_secs, + count_hits: CountHits::Underestimate.into(), + ..Default::default() + }; + + let search_response = search_service.root_search(search_request).await?; + + let Some(agg_result_postcard) = search_response.aggregation_postcard else { + debug!("the query matched no traces"); + return Ok((Vec::new(), 0..=0)); + }; + + let trace_ids = collect_trace_ids(&agg_result_postcard)?; + debug!("the query matched {} traces.", trace_ids.0.len()); + Ok(trace_ids) +} + +impl JaegerService { #[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))] async fn stream_spans( &self, @@ -435,29 +430,7 @@ impl JaegerService { } } -macro_rules! metrics { - ($expr:expr, [$operation:ident, $($label:expr),*]) => { - let start = std::time::Instant::now(); - let labels = [stringify!($operation), $($label,)*]; - JAEGER_SERVICE_METRICS.requests_total.with_label_values(labels).inc(); - let (res, is_error) = match $expr { - ok @ Ok(_) => { - (ok, "false") - }, - err @ Err(_) => { - JAEGER_SERVICE_METRICS.request_errors_total.with_label_values(labels).inc(); - (err, "true") - }, - }; - let elapsed = start.elapsed().as_secs_f64(); - let labels = [stringify!($operation), $($label,)* is_error]; - JAEGER_SERVICE_METRICS.request_duration_seconds.with_label_values(labels).observe(elapsed); - - return res.map(Response::new); - }; -} - -fn record_error(operation_name: &'static str, request_start: Instant) { +pub(crate) fn record_error(operation_name: &'static str, request_start: Instant) { JAEGER_SERVICE_METRICS .request_errors_total .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) @@ -470,7 +443,7 @@ fn record_error(operation_name: &'static str, request_start: Instant) { .observe(elapsed); } -fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) { +pub(crate) fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) { JAEGER_SERVICE_METRICS .fetched_spans_total .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) @@ -481,87 +454,6 @@ fn record_send(operation_name: &'static str, num_spans: usize, num_bytes: usize) .inc_by(num_bytes as u64); } -#[async_trait] -impl SpanReaderPlugin for JaegerService { - type GetTraceStream = SpanStream; - - type FindTracesStream = SpanStream; - - async fn get_services( - &self, - request: Request, - ) -> Result, Status> { - let index_id_patterns = - extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; - metrics!( - self.get_services_for_indexes(request.into_inner(), index_id_patterns) - .await, - [get_services, OTEL_TRACES_INDEX_ID] - ); - } - - async fn get_operations( - &self, - request: Request, - ) -> Result, Status> { - let index_id_patterns = - extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; - metrics!( - self.get_operations_for_indexes(request.into_inner(), index_id_patterns) - .await, - [get_operations, OTEL_TRACES_INDEX_ID] - ); - } - - async fn find_trace_i_ds( - &self, - request: Request, - ) -> Result, Status> { - let index_id_patterns = - extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; - metrics!( - self.find_trace_ids_for_indexes(request.into_inner(), index_id_patterns) - .await, - [find_trace_ids, OTEL_TRACES_INDEX_ID] - ); - } - - async fn find_traces( - &self, - request: Request, - ) -> Result, Status> { - let index_id_patterns = - extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; - self.find_traces_for_indexes( - request.into_inner(), - "find_traces", - Instant::now(), - index_id_patterns, - false, /* if we use true, Jaeger will display "1 Span", and display an empty trace - * when clicking on the ui (but display the full trace after reloading the - * page) */ - ) - .await - .map(Response::new) - } - - async fn get_trace( - &self, - request: Request, - ) -> Result, Status> { - let index_id_patterns = - extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; - self.get_trace_for_indexes( - request.into_inner(), - "get_trace", - Instant::now(), - index_id_patterns, - ) - .await - .map(Response::new) - } -} - #[allow(deprecated)] fn extract_term(term_bytes: &[u8]) -> String { tantivy::Term::wrap(term_bytes) @@ -589,9 +481,77 @@ fn extract_operation(term_bytes: &[u8]) -> Operation { } } +#[instrument("get_services", skip_all)] +pub(crate) async fn get_services_impl( + search_service: Arc, + lookback_period_secs: i64, + index_id_patterns: Vec, +) -> Result, Status> { + debug!(index_ids=?index_id_patterns, "`get_services` request"); + + let max_hits = Some(1_000); + let start_timestamp = Some(OffsetDateTime::now_utc().unix_timestamp() - lookback_period_secs); + + let search_request = ListTermsRequest { + index_id_patterns, + field: "service_name".to_string(), + max_hits, + start_timestamp, + end_timestamp: None, + start_key: None, + end_key: None, + }; + let search_response = search_service.root_list_terms(search_request).await?; + let services: Vec = search_response + .terms + .into_iter() + .map(|term_bytes| extract_term(&term_bytes)) + .sorted() + .collect(); + debug!(services=?services, "`get_services` response"); + Ok(services) +} + +#[instrument("get_operations", skip_all, fields(service=%service, span_kind=%span_kind))] +pub(crate) async fn get_operations_impl( + search_service: Arc, + lookback_period_secs: i64, + service: String, + span_kind: String, + index_id_patterns: Vec, +) -> Result, Status> { + debug!(service=%service, span_kind=%span_kind, index_ids=?index_id_patterns, "`get_operations` request"); + + let max_hits = Some(1_000); + let start_timestamp = Some(OffsetDateTime::now_utc().unix_timestamp() - lookback_period_secs); + + let span_kind_opt = span_kind.parse().ok(); + let start_key = SpanFingerprint::start_key(&service, span_kind_opt.clone()); + let end_key = SpanFingerprint::end_key(&service, span_kind_opt); + + let search_request = ListTermsRequest { + index_id_patterns, + field: "span_fingerprint".to_string(), + max_hits, + start_timestamp, + end_timestamp: None, + start_key, + end_key, + }; + let search_response = search_service.root_list_terms(search_request).await?; + let operations: Vec = search_response + .terms + .into_iter() + .map(|term_json| extract_operation(&term_json)) + .sorted() + .collect(); + debug!(operations=?operations, "`get_operations` response"); + Ok(operations) +} + // TODO: builder pattern #[allow(clippy::too_many_arguments)] -fn build_search_query( +pub(crate) fn build_search_query( service_name: &str, span_kind_opt: Option, span_name: &str, @@ -746,7 +706,7 @@ fn build_search_query( } } -fn build_aggregations_query(num_traces: usize) -> String { +pub(crate) fn build_aggregations_query(num_traces: usize) -> String { let query = serde_json::to_string(&FindTraceIdsCollector { num_traces, trace_id_field_name: "trace_id".to_string(), @@ -807,7 +767,7 @@ fn qw_span_to_jaeger_span(qw_span_json: &str) -> Result { Ok(span) } -fn to_duration_millis(duration: &WellKnownDuration) -> Option { +pub(crate) fn to_duration_millis(duration: &WellKnownDuration) -> Option { let duration_millis = duration.seconds * 1_000 + (duration.nanos as i64) / 1_000_000; if duration_millis == 0 { None @@ -1144,6 +1104,7 @@ where T: Deserialize<'a> { mod tests { use quickwit_opentelemetry::otlp::{OTEL_TRACES_INDEX_ID_PATTERN, OtelSignal}; use quickwit_proto::jaeger::api_v2::ValueType; + use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; use quickwit_search::{MockSearchService, QuickwitAggregations, encode_term_for_test}; use serde_json::json; @@ -2632,4 +2593,210 @@ mod tests { let response = jaeger.get_services(request).await.unwrap().into_inner(); assert_eq!(response.services, &["service1", "service2", "service3"]); } + + #[tokio::test] + async fn test_v2_get_services() { + let mut service = MockSearchService::new(); + service + .expect_root_list_terms() + .withf(|req| { + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID_PATTERN] + && req.field == "service_name" + && req.start_timestamp.is_some() + }) + .return_once(|_| { + Ok(quickwit_proto::search::ListTermsResponse { + num_hits: 3, + terms: vec![ + encode_term_for_test!("service1"), + encode_term_for_test!("service2"), + encode_term_for_test!("service3"), + ], + elapsed_time_micros: 0, + errors: Vec::new(), + }) + }); + + let service = Arc::new(service); + let jaeger = JaegerService::new(JaegerConfig::default(), service); + + let request = + tonic::Request::new(quickwit_proto::jaeger::storage::v2::GetServicesRequest {}); + let response = + quickwit_proto::jaeger::storage::v2::trace_reader_server::TraceReader::get_services( + &jaeger, request, + ) + .await + .unwrap() + .into_inner(); + assert_eq!(response.services, &["service1", "service2", "service3"]); + } + + #[tokio::test] + async fn test_v2_get_operations() { + let mut service = MockSearchService::new(); + service + .expect_root_list_terms() + .withf(|req| { + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID_PATTERN] + && req.field == "span_fingerprint" + && req.start_timestamp.is_some() + }) + .return_once(|_| { + let fingerprint1 = + SpanFingerprint::new("test-service", QwSpanKind::from(2), "GET /api"); + let fingerprint2 = + SpanFingerprint::new("test-service", QwSpanKind::from(3), "POST /data"); + + Ok(quickwit_proto::search::ListTermsResponse { + num_hits: 2, + terms: vec![ + encode_term_for_test!(fingerprint1.as_str()), + encode_term_for_test!(fingerprint2.as_str()), + ], + elapsed_time_micros: 0, + errors: Vec::new(), + }) + }); + + let service = Arc::new(service); + let jaeger = JaegerService::new(JaegerConfig::default(), service); + + let request = + tonic::Request::new(quickwit_proto::jaeger::storage::v2::GetOperationsRequest { + service: "test-service".to_string(), + span_kind: String::new(), + }); + let response = + quickwit_proto::jaeger::storage::v2::trace_reader_server::TraceReader::get_operations( + &jaeger, request, + ) + .await + .unwrap() + .into_inner(); + assert_eq!(response.operations.len(), 2); + assert_eq!(response.operations[0].name, "GET /api"); + assert_eq!(response.operations[0].span_kind, "server"); + assert_eq!(response.operations[1].name, "POST /data"); + assert_eq!(response.operations[1].span_kind, "client"); + } + + #[tokio::test] + async fn test_v2_find_trace_ids() { + let mut service = MockSearchService::new(); + service + .expect_root_search() + .withf(|req| { + req.index_id_patterns == vec![OTEL_TRACES_INDEX_ID_PATTERN] + && req.start_timestamp.is_some() + && req.end_timestamp.is_some() + }) + .return_once(|_| { + use quickwit_search::Span as TraceSpan; + use tantivy::DateTime; + + let trace_id_1 = + TraceId::new([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]); + let trace_id_2 = TraceId::new([ + 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, + ]); + + let spans = vec![ + TraceSpan { + trace_id: trace_id_1, + span_timestamp: DateTime::from_timestamp_secs(1500), + }, + TraceSpan { + trace_id: trace_id_2, + span_timestamp: DateTime::from_timestamp_secs(1600), + }, + ]; + + let aggregation_postcard = postcard::to_allocvec(&spans).unwrap(); + + Ok(quickwit_proto::search::SearchResponse { + num_hits: 2, + hits: vec![], + elapsed_time_micros: 100, + errors: Vec::new(), + aggregation_postcard: Some(aggregation_postcard), + scroll_id: None, + failed_splits: Vec::new(), + num_successful_splits: 1, + }) + }); + + let service = Arc::new(service); + let jaeger = JaegerService::new(JaegerConfig::default(), service); + + let request = tonic::Request::new(quickwit_proto::jaeger::storage::v2::FindTracesRequest { + query: Some(quickwit_proto::jaeger::storage::v2::TraceQueryParameters { + service_name: "test-service".to_string(), + operation_name: String::new(), + attributes: vec![], + start_time_min: Some(prost_types::Timestamp { + seconds: 1000, + nanos: 0, + }), + start_time_max: Some(prost_types::Timestamp { + seconds: 2000, + nanos: 0, + }), + duration_min: None, + duration_max: None, + search_depth: 10, + }), + }); + let response = + quickwit_proto::jaeger::storage::v2::trace_reader_server::TraceReader::find_trace_i_ds( + &jaeger, request, + ) + .await + .unwrap() + .into_inner(); + assert_eq!(response.trace_ids.len(), 2); + assert_eq!(response.trace_ids[0].trace_id.len(), 16); + assert_eq!(response.trace_ids[1].trace_id.len(), 16); + } + + #[test] + fn test_convert_v2_attributes_to_v1_tags() { + let attributes = vec![ + quickwit_proto::jaeger::storage::v2::KeyValue { + key: "http.method".to_string(), + value: Some(quickwit_proto::jaeger::storage::v2::AnyValue { + value: Some( + quickwit_proto::jaeger::storage::v2::any_value::Value::StringValue( + "GET".to_string(), + ), + ), + }), + }, + quickwit_proto::jaeger::storage::v2::KeyValue { + key: "http.status_code".to_string(), + value: Some(quickwit_proto::jaeger::storage::v2::AnyValue { + value: Some( + quickwit_proto::jaeger::storage::v2::any_value::Value::IntValue(200), + ), + }), + }, + quickwit_proto::jaeger::storage::v2::KeyValue { + key: "error".to_string(), + value: Some(quickwit_proto::jaeger::storage::v2::AnyValue { + value: Some( + quickwit_proto::jaeger::storage::v2::any_value::Value::BoolValue(false), + ), + }), + }, + ]; + + let tags = crate::v2::convert_v2_attributes_to_v1_tags(attributes); + assert_eq!(tags.len(), 3); + assert_eq!(tags.get("http.method"), Some(&"GET".to_string())); + assert_eq!(tags.get("http.status_code"), Some(&"200".to_string())); + assert_eq!(tags.get("error"), Some(&"false".to_string())); + } + + // Note: test_spans_to_otel_traces_data was removed as v2 now works directly with + // native OpenTelemetry format (QwSpan) instead of converting from Jaeger v1 format } diff --git a/quickwit/quickwit-jaeger/src/v1.rs b/quickwit/quickwit-jaeger/src/v1.rs new file mode 100644 index 00000000000..11d6935db4e --- /dev/null +++ b/quickwit/quickwit-jaeger/src/v1.rs @@ -0,0 +1,134 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Jaeger v1 API implementation (SpanReaderPlugin) + +use std::time::Instant; + +use async_trait::async_trait; +use quickwit_opentelemetry::otlp::{ + OTEL_TRACES_INDEX_ID, extract_otel_traces_index_id_patterns_from_metadata, +}; +use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPlugin; +use quickwit_proto::jaeger::storage::v1::{ + FindTraceIDsRequest, FindTraceIDsResponse, FindTracesRequest, GetOperationsRequest, + GetOperationsResponse, GetServicesRequest, GetServicesResponse, GetTraceRequest, +}; +use tonic::{Request, Response, Status}; + +use crate::metrics::JAEGER_SERVICE_METRICS; +use crate::{JaegerService, SpanStream}; + +macro_rules! metrics { + ($expr:expr, [$operation:ident, $($label:expr),*]) => { + let start = std::time::Instant::now(); + let labels = [stringify!($operation), $($label,)*]; + JAEGER_SERVICE_METRICS.requests_total.with_label_values(labels).inc(); + let (res, is_error) = match $expr { + ok @ Ok(_) => { + (ok, "false") + }, + err @ Err(_) => { + JAEGER_SERVICE_METRICS.request_errors_total.with_label_values(labels).inc(); + (err, "true") + }, + }; + let elapsed = start.elapsed().as_secs_f64(); + let labels = [stringify!($operation), $($label,)* is_error]; + JAEGER_SERVICE_METRICS.request_duration_seconds.with_label_values(labels).observe(elapsed); + + return res.map(Response::new); + }; +} + +#[async_trait] +impl SpanReaderPlugin for JaegerService { + type GetTraceStream = SpanStream; + + type FindTracesStream = SpanStream; + + async fn get_services( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + metrics!( + self.get_services_for_indexes(request.into_inner(), index_id_patterns) + .await, + [get_services, OTEL_TRACES_INDEX_ID] + ); + } + + async fn get_operations( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + metrics!( + self.get_operations_for_indexes(request.into_inner(), index_id_patterns) + .await, + [get_operations, OTEL_TRACES_INDEX_ID] + ); + } + + async fn find_trace_i_ds( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + metrics!( + self.find_trace_ids_for_indexes(request.into_inner(), index_id_patterns) + .await, + [find_trace_ids, OTEL_TRACES_INDEX_ID] + ); + } + + async fn find_traces( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + self.find_traces_for_indexes( + request.into_inner(), + "find_traces", + Instant::now(), + index_id_patterns, + false, /* if we use true, Jaeger will display "1 Span", and display an empty trace + * when clicking on the ui (but display the full trace after reloading the + * page) */ + ) + .await + .map(Response::new) + } + + async fn get_trace( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + self.get_trace_for_indexes( + request.into_inner(), + "get_trace", + Instant::now(), + index_id_patterns, + ) + .await + .map(Response::new) + } +} diff --git a/quickwit/quickwit-jaeger/src/v2.rs b/quickwit/quickwit-jaeger/src/v2.rs new file mode 100644 index 00000000000..e355c18a8c3 --- /dev/null +++ b/quickwit/quickwit-jaeger/src/v2.rs @@ -0,0 +1,639 @@ +// Copyright 2021-Present Datadog, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Jaeger v2 API implementation (TraceReader) +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Instant; + +use async_trait::async_trait; +use prost_types::Timestamp as WellKnownTimestamp; +use quickwit_opentelemetry::otlp::{ + OTEL_TRACES_INDEX_ID, Span as QwSpan, TraceId, + extract_otel_traces_index_id_patterns_from_metadata, +}; +use quickwit_proto::jaeger::storage::v2::trace_reader_server::TraceReader; +use quickwit_proto::jaeger::storage::v2::{ + FindTracesRequest, FoundTraceId, GetOperationsRequest, GetOperationsResponse, + GetServicesRequest, GetServicesResponse, GetTracesRequest, Operation, +}; +use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtelValue; +use quickwit_proto::opentelemetry::proto::common::v1::{ + AnyValue as OtelAnyValue, InstrumentationScope, KeyValue as OtelKeyValue, +}; +use quickwit_proto::opentelemetry::proto::resource::v1::Resource as OtelResource; +use quickwit_proto::opentelemetry::proto::trace::v1 as otel_trace; +use quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode as OtelStatusCode; +use quickwit_proto::opentelemetry::proto::trace::v1::{ + ResourceSpans, ScopeSpans, Span as OtelSpan, Status as OtelStatus, +}; +use quickwit_proto::search::{CountHits, SearchRequest}; +use quickwit_query::BooleanOperand; +use quickwit_query::query_ast::{BoolQuery, QueryAst, TermQuery, UserInputQuery}; +use quickwit_search::SearchService; +use serde_json::Value as JsonValue; +use time::OffsetDateTime; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; +use tonic::{Request, Response, Status}; +use tracing::field::Empty; +use tracing::{Span as RuntimeSpan, debug, error, instrument}; + +use crate::metrics::JAEGER_SERVICE_METRICS; +use crate::{ + JaegerService, TimeIntervalSecs, TracesDataStream, get_operations_impl, get_services_impl, + json_deserialize, record_error, record_send, to_duration_millis, +}; + +macro_rules! metrics { + ($expr:expr, [$operation:ident, $($label:expr),*]) => { + let start = std::time::Instant::now(); + let labels = [stringify!($operation), $($label,)*]; + JAEGER_SERVICE_METRICS.requests_total.with_label_values(labels).inc(); + let (res, is_error) = match $expr { + ok @ Ok(_) => { + (ok, "false") + }, + err @ Err(_) => { + JAEGER_SERVICE_METRICS.request_errors_total.with_label_values(labels).inc(); + (err, "true") + }, + }; + let elapsed = start.elapsed().as_secs_f64(); + let labels = [stringify!($operation), $($label,)* is_error]; + JAEGER_SERVICE_METRICS.request_duration_seconds.with_label_values(labels).observe(elapsed); + + return res.map(Response::new); + }; +} + +#[async_trait] +impl TraceReader for JaegerService { + async fn get_services( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + + let services = get_services_impl( + self.search_service.clone(), + self.lookback_period_secs, + index_id_patterns, + ) + .await?; + + let response = GetServicesResponse { services }; + metrics!(Ok(response), [get_services_v2, OTEL_TRACES_INDEX_ID]); + } + + async fn get_operations( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + + let req = request.into_inner(); + + let operations = get_operations_impl( + self.search_service.clone(), + self.lookback_period_secs, + req.service, + req.span_kind, + index_id_patterns, + ) + .await? + .into_iter() + .map(|op| Operation { + name: op.name, + span_kind: op.span_kind, + }) + .collect(); + + let response = GetOperationsResponse { operations }; + metrics!(Ok(response), [get_operations_v2, OTEL_TRACES_INDEX_ID]); + } + + type GetTracesStream = TracesDataStream; + + async fn get_traces( + &self, + request: Request, + ) -> Result, Status> { + let request_start = Instant::now(); + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + + let (tx, rx) = mpsc::channel(2); + let search_service = self.search_service.clone(); + let max_fetch_spans = self.max_fetch_spans; + let lookback_period_secs = self.lookback_period_secs; + let query_list = request.into_inner().query; + + tokio::task::spawn(async move { + for query_params in query_list { + let trace_id = match TraceId::try_from(query_params.trace_id) { + Ok(id) => id, + Err(error) => { + let _ = tx + .send(Err(Status::invalid_argument(error.to_string()))) + .await; + return; + } + }; + + let end = OffsetDateTime::now_utc().unix_timestamp(); + let search_window = (end - lookback_period_secs)..=end; + + let otel_spans = match stream_otel_spans_impl( + search_service.clone(), + max_fetch_spans, + &[trace_id], + search_window, + "get_traces_v2", + request_start, + index_id_patterns.clone(), + false, + ) + .await + { + Ok(spans) => spans, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; + + if tx + .send(Ok(qw_spans_to_otel_traces_data(otel_spans))) + .await + .is_err() + { + return; + } + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + + type FindTracesStream = TracesDataStream; + + async fn find_traces( + &self, + request: Request, + ) -> Result, Status> { + let request_start = Instant::now(); + + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + + let query = request + .into_inner() + .query + .ok_or_else(|| Status::invalid_argument("Query is empty."))?; + + let (trace_ids, span_timestamps_range) = find_trace_ids_impl( + self.search_service.clone(), + self.max_trace_duration_secs, + query, + index_id_patterns.clone(), + ) + .await?; + + let search_window = (span_timestamps_range.start() - self.max_trace_duration_secs) + ..=(span_timestamps_range.end() + self.max_trace_duration_secs); + + let (tx, rx) = mpsc::channel(2); + let search_service = self.search_service.clone(); + let max_fetch_spans = self.max_fetch_spans; + + tokio::task::spawn(async move { + let all_spans = match stream_otel_spans_impl( + search_service, + max_fetch_spans, + &trace_ids, + search_window, + "find_traces_v2", + request_start, + index_id_patterns, + false, + ) + .await + { + Ok(spans) => spans, + Err(e) => { + let _ = tx.send(Err(e)).await; + return; + } + }; + + // Group by trace_id and send each trace + let mut spans_by_trace: HashMap, Vec> = HashMap::new(); + for span in all_spans { + spans_by_trace + .entry(span.trace_id.to_vec()) + .or_default() + .push(span); + } + + for spans in spans_by_trace.into_values() { + if tx + .send(Ok(qw_spans_to_otel_traces_data(spans))) + .await + .is_err() + { + return; + } + } + }); + + Ok(Response::new(ReceiverStream::new(rx))) + } + + async fn find_trace_i_ds( + &self, + request: Request, + ) -> Result, Status> { + let index_id_patterns = + extract_otel_traces_index_id_patterns_from_metadata(request.metadata())?; + + let query = request + .into_inner() + .query + .ok_or_else(|| Status::invalid_argument("Query is empty."))?; + + let (trace_ids, time_range) = find_trace_ids_impl( + self.search_service.clone(), + self.max_trace_duration_secs, + query, + index_id_patterns, + ) + .await?; + + let trace_ids = trace_ids + .into_iter() + .map(|trace_id| FoundTraceId { + trace_id: trace_id.to_vec(), + start: Some(WellKnownTimestamp { + seconds: *time_range.start(), + nanos: 0, + }), + end: Some(WellKnownTimestamp { + seconds: *time_range.end(), + nanos: 0, + }), + }) + .collect(); + + let response = quickwit_proto::jaeger::storage::v2::FindTraceIDsResponse { trace_ids }; + metrics!(Ok(response), [find_trace_ids_v2, OTEL_TRACES_INDEX_ID]); + } +} + +// === Helper functions === +#[instrument("find_trace_ids", skip_all)] +async fn find_trace_ids_impl( + search_service: Arc, + _max_trace_duration_secs: i64, + query: quickwit_proto::jaeger::storage::v2::TraceQueryParameters, + index_id_patterns: Vec, +) -> Result<(Vec, TimeIntervalSecs), Status> { + debug!(service_name=%query.service_name, operation_name=%query.operation_name, "`find_trace_ids` request"); + + let min_start_secs = query.start_time_min.as_ref().map(|ts| ts.seconds); + let max_start_secs = query.start_time_max.as_ref().map(|ts| ts.seconds); + let min_duration_millis = query.duration_min.as_ref().and_then(to_duration_millis); + let max_duration_millis = query.duration_max.as_ref().and_then(to_duration_millis); + let tags = convert_v2_attributes_to_v1_tags(query.attributes); + + crate::find_trace_ids_common( + search_service, + &query.service_name, + &query.operation_name, + tags, + min_start_secs, + max_start_secs, + min_duration_millis, + max_duration_millis, + query.search_depth as usize, + index_id_patterns, + ) + .await +} + +#[instrument("stream_otel_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))] +#[allow(clippy::too_many_arguments)] +async fn stream_otel_spans_impl( + search_service: Arc, + max_fetch_spans: u64, + trace_ids: &[TraceId], + search_window: TimeIntervalSecs, + operation_name: &'static str, + request_start: Instant, + index_id_patterns: Vec, + root_only: bool, +) -> Result, Status> { + if trace_ids.is_empty() { + return Ok(Vec::new()); + } + + let mut query = BoolQuery::default(); + + for trace_id in trace_ids { + let value = trace_id.hex_display(); + let term_query = TermQuery { + field: "trace_id".to_string(), + value, + }; + query.should.push(term_query.into()); + } + + if root_only { + let is_root = UserInputQuery { + user_text: "NOT is_root:false".to_string(), + default_fields: None, + default_operator: BooleanOperand::And, + lenient: true, + }; + let mut new_query = BoolQuery::default(); + new_query.must.push(query.into()); + new_query.must.push(is_root.into()); + query = new_query; + } + + let query_ast: QueryAst = query.into(); + let query_ast = + serde_json::to_string(&query_ast).map_err(|err| Status::internal(err.to_string()))?; + + let search_request = SearchRequest { + index_id_patterns, + query_ast, + start_timestamp: Some(*search_window.start()), + end_timestamp: Some(*search_window.end()), + max_hits: max_fetch_spans, + count_hits: CountHits::Underestimate.into(), + ..Default::default() + }; + + let search_response = match search_service.root_search(search_request).await { + Ok(search_response) => search_response, + Err(search_error) => { + error!(search_error=?search_error, "failed to fetch spans"); + record_error(operation_name, request_start); + return Err(Status::internal("Failed to fetch spans.")); + } + }; + + let mut qw_spans: Vec = Vec::with_capacity(search_response.hits.len()); + + for hit in search_response.hits { + match qw_span_from_json(&hit.json) { + Ok(span) => { + qw_spans.push(span); + } + Err(status) => { + record_error(operation_name, request_start); + return Err(status); + } + }; + } + + if trace_ids.len() > 1 { + qw_spans.sort_unstable_by(|left, right| left.trace_id.cmp(&right.trace_id)); + } + + let num_spans = qw_spans.len(); + let num_bytes = qw_spans + .iter() + .map(|span| serde_json::to_string(span).unwrap_or_default().len()) + .sum::(); + + RuntimeSpan::current().record("num_spans", num_spans); + RuntimeSpan::current().record("num_bytes", num_bytes); + + record_send(operation_name, num_spans, num_bytes); + + JAEGER_SERVICE_METRICS + .fetched_traces_total + .with_label_values([operation_name, OTEL_TRACES_INDEX_ID]) + .inc_by(trace_ids.len() as u64); + + let elapsed = request_start.elapsed().as_secs_f64(); + JAEGER_SERVICE_METRICS + .request_duration_seconds + .with_label_values([operation_name, OTEL_TRACES_INDEX_ID, "false"]) + .observe(elapsed); + + Ok(qw_spans) +} + +// === Conversion functions === +// Note: record_error and record_send are now shared in lib.rs + +/// Direct conversion from Quickwit's native OpenTelemetry span to Jaeger v2's OpenTelemetry format +fn qw_spans_to_otel_traces_data( + qw_spans: Vec, +) -> quickwit_proto::opentelemetry::proto::trace::v1::TracesData { + // Group spans by service + let mut spans_by_service: HashMap> = HashMap::new(); + for span in qw_spans { + spans_by_service + .entry(span.service_name.clone()) + .or_default() + .push(span); + } + + let resource_spans = spans_by_service + .into_iter() + .map(|(service_name, spans)| { + // Get resource attributes from first span before grouping + let first_span_attrs = spans + .first() + .map(|span| span.resource_attributes.clone()) + .unwrap_or_default(); + + // Group by scope + let mut spans_by_scope: HashMap<(Option, Option), Vec> = + HashMap::new(); + for span in spans { + let key = (span.scope_name.clone(), span.scope_version.clone()); + spans_by_scope.entry(key).or_default().push(span); + } + + let scope_spans = spans_by_scope + .into_iter() + .map(|((scope_name, scope_version), spans)| { + let otel_spans = spans.into_iter().map(qw_span_to_otel_span).collect(); + + ScopeSpans { + scope: Some(InstrumentationScope { + name: scope_name.unwrap_or_default(), + version: scope_version.unwrap_or_default(), + attributes: vec![], + dropped_attributes_count: 0, + }), + spans: otel_spans, + schema_url: String::new(), + } + }) + .collect(); + + let mut resource_attrs = vec![OtelKeyValue { + key: "service.name".to_string(), + value: Some(OtelAnyValue { + value: Some(OtelValue::StringValue(service_name)), + }), + }]; + + // Add other resource attributes + for (key, value) in first_span_attrs { + resource_attrs.push(json_value_to_otel_kv(key, value)); + } + + ResourceSpans { + resource: Some(OtelResource { + attributes: resource_attrs, + dropped_attributes_count: 0, + }), + scope_spans, + schema_url: String::new(), + } + }) + .collect(); + + quickwit_proto::opentelemetry::proto::trace::v1::TracesData { resource_spans } +} + +/// Convert a Quickwit span (native OTEL format) to Jaeger v2 OTEL span +fn qw_span_to_otel_span(qw_span: QwSpan) -> OtelSpan { + OtelSpan { + trace_id: qw_span.trace_id.to_vec(), + span_id: qw_span.span_id.to_vec(), + trace_state: qw_span.trace_state.unwrap_or_default(), + parent_span_id: qw_span + .parent_span_id + .map(|id| id.to_vec()) + .unwrap_or_default(), + name: qw_span.span_name, + kind: qw_span.span_kind as i32, + start_time_unix_nano: qw_span.span_start_timestamp_nanos, + end_time_unix_nano: qw_span.span_end_timestamp_nanos, + attributes: qw_span + .span_attributes + .into_iter() + .map(|(k, v)| json_value_to_otel_kv(k, v)) + .collect(), + dropped_attributes_count: qw_span.span_dropped_attributes_count, + events: qw_span + .events + .into_iter() + .map(|event| otel_trace::span::Event { + time_unix_nano: event.event_timestamp_nanos, + name: event.event_name, + attributes: event + .event_attributes + .into_iter() + .map(|(k, v)| json_value_to_otel_kv(k, v)) + .collect(), + dropped_attributes_count: event.event_dropped_attributes_count, + }) + .collect(), + dropped_events_count: qw_span.span_dropped_events_count, + links: qw_span + .links + .into_iter() + .map(|link| otel_trace::span::Link { + trace_id: link.link_trace_id.to_vec(), + span_id: link.link_span_id.to_vec(), + trace_state: link.link_trace_state.unwrap_or_default(), + attributes: link + .link_attributes + .into_iter() + .map(|(k, v)| json_value_to_otel_kv(k, v)) + .collect(), + dropped_attributes_count: link.link_dropped_attributes_count, + }) + .collect(), + dropped_links_count: qw_span.span_dropped_links_count, + status: Some(OtelStatus { + message: qw_span.span_status.message.unwrap_or_default(), + code: match qw_span.span_status.code { + quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode::Unset => { + OtelStatusCode::Unset as i32 + } + quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode::Ok => { + OtelStatusCode::Ok as i32 + } + quickwit_proto::opentelemetry::proto::trace::v1::status::StatusCode::Error => { + OtelStatusCode::Error as i32 + } + }, + }), + } +} + +fn json_value_to_otel_kv(key: String, value: JsonValue) -> OtelKeyValue { + let otel_value = match value { + JsonValue::String(s) => OtelValue::StringValue(s), + JsonValue::Number(n) => { + if let Some(i) = n.as_i64() { + OtelValue::IntValue(i) + } else if let Some(f) = n.as_f64() { + OtelValue::DoubleValue(f) + } else { + OtelValue::StringValue(n.to_string()) + } + } + JsonValue::Bool(b) => OtelValue::BoolValue(b), + JsonValue::Array(_) | JsonValue::Object(_) => OtelValue::StringValue(value.to_string()), + JsonValue::Null => OtelValue::StringValue(String::new()), + }; + + OtelKeyValue { + key, + value: Some(OtelAnyValue { + value: Some(otel_value), + }), + } +} + +#[allow(clippy::result_large_err)] +fn qw_span_from_json(qw_span_json: &str) -> Result { + json_deserialize(qw_span_json, "span") +} + +pub(crate) fn convert_v2_attributes_to_v1_tags( + attributes: Vec, +) -> HashMap { + attributes + .into_iter() + .filter_map(|kv| { + let value = kv.value?.value?; + let string_value = match value { + quickwit_proto::jaeger::storage::v2::any_value::Value::StringValue(s) => s, + quickwit_proto::jaeger::storage::v2::any_value::Value::IntValue(i) => i.to_string(), + quickwit_proto::jaeger::storage::v2::any_value::Value::DoubleValue(d) => { + d.to_string() + } + quickwit_proto::jaeger::storage::v2::any_value::Value::BoolValue(b) => { + b.to_string() + } + _ => return None, + }; + Some((kv.key, string_value)) + }) + .collect() +} diff --git a/quickwit/quickwit-proto/protos/third-party/jaeger/storage/v2/trace_storage.proto b/quickwit/quickwit-proto/protos/third-party/jaeger/storage/v2/trace_storage.proto new file mode 100644 index 00000000000..47b366aa7ac --- /dev/null +++ b/quickwit/quickwit-proto/protos/third-party/jaeger/storage/v2/trace_storage.proto @@ -0,0 +1,177 @@ +// Copyright (c) 2024 The Jaeger Authors. +// SPDX-License-Identifier: Apache-2.0 + +syntax = "proto3"; + +package jaeger.storage.v2; + +import "google/protobuf/duration.proto"; +import "google/protobuf/timestamp.proto"; +import "opentelemetry/proto/trace/v1/trace.proto"; + +option go_package = "storage"; + +// GetTraceParams represents the query for a single trace from the storage backend. +message GetTraceParams { + // trace_id is a 16 byte array containing the unique identifier for the trace to query. + bytes trace_id = 1; + + // start_time is the start of the time interval to search for the trace_id. + // + // This field is optional. + google.protobuf.Timestamp start_time = 2; + + // end_time is the end of the time interval to search for the trace_id. + // + // This field is optional. + google.protobuf.Timestamp end_time = 3; +} + +// GetTracesRequest represents a request to retrieve multiple traces. +message GetTracesRequest { + repeated GetTraceParams query = 1; +} + +// GetServicesRequest represents a request to get service names. +message GetServicesRequest {} + +// GetServicesResponse represents the response for GetServicesRequest. +message GetServicesResponse { + repeated string services = 1; +} + +// GetOperationsRequest represents a request to get operation names. +message GetOperationsRequest { + // service is the name of the service for which to get operation names. + // + // This field is required. + string service = 1; + + // span_kind is the type of span which is used to distinguish between + // spans generated in a particular context. + // + // This field is optional. + string span_kind = 2; +} + +// Operation contains information about an operation for a given service. +message Operation { + string name = 1; + string span_kind = 2; +} + +// GetOperationsResponse represents the response for GetOperationsRequest. +message GetOperationsResponse { + repeated Operation operations = 1; +} + +// KeyValue and all its associated types are copied from opentelemetry-proto/common/v1/common.proto +// (https://github.com/open-telemetry/opentelemetry-proto/blob/main/opentelemetry/proto/common/v1/common.proto). +// This type is used to store attributes in traces. +message KeyValue { + string key = 1; + AnyValue value = 2; +} + +message AnyValue { + oneof value { + string string_value = 1; + bool bool_value = 2; + int64 int_value = 3; + double double_value = 4; + ArrayValue array_value = 5; + KeyValueList kvlist_value = 6; + bytes bytes_value = 7; + } +} + +message KeyValueList { + repeated KeyValue values = 1; +} + +message ArrayValue { + repeated AnyValue values = 1; +} + +// TraceQueryParameters contains query parameters to find traces. For a detailed +// definition of each field in this message, refer to `TraceQueryParameters` in `jaeger.api_v3` +// (https://github.com/jaegertracing/jaeger-idl/blob/main/proto/api_v3/query_service.proto). +message TraceQueryParameters { + string service_name = 1; + string operation_name = 2; + repeated KeyValue attributes = 3; + google.protobuf.Timestamp start_time_min = 4; + google.protobuf.Timestamp start_time_max = 5; + google.protobuf.Duration duration_min = 6; + google.protobuf.Duration duration_max = 7; + int32 search_depth = 8; +} + +// FindTracesRequest represents a request to find traces. +// It can be used to retrieve the traces (FindTraces) or simply +// the trace IDs (FindTraceIDs). +message FindTracesRequest { + TraceQueryParameters query = 1; +} + +// FoundTraceID is a wrapper around trace ID returned from FindTraceIDs +// with an optional time range that may be used in GetTraces calls. +// +// The time range is provided as an optimization hint for some storage backends +// that can perform more efficient queries when they know the approximate time range. +// The value should not be used for precise time-based filtering or assumptions. +// It is meant as a rough boundary and may not be populated in all cases. +message FoundTraceID { + bytes trace_id = 1; + google.protobuf.Timestamp start = 2; + google.protobuf.Timestamp end = 3; +} + +// FindTraceIDsResponse represents the response for FindTracesRequest. +message FindTraceIDsResponse { + repeated FoundTraceID trace_ids = 1; +} + +// TraceReader is a service that allows reading traces from storage. +// Note that if you implement this service, you should also implement +// OTEL's TraceService in package opentelemetry.proto.collector.trace.v1 +// to allow pushing traces to the storage backend +// () +service TraceReader { + // GetTraces returns a stream that retrieves all traces with given IDs. + // + // Chunking requirements: + // - A single TracesData chunk MUST NOT contain spans from multiple traces. + // - Large traces MAY be split across multiple, *consecutive* TracesData chunks. + // - Each returned TracesData object MUST NOT be empty. + // + // Edge cases: + // - If no spans are found for any given trace ID, the ID is ignored. + // - If none of the trace IDs are found in the storage, an empty response is returned. + rpc GetTraces(GetTracesRequest) returns (stream opentelemetry.proto.trace.v1.TracesData) {} + + // GetServices returns all service names known to the backend from traces + // within its retention period. + rpc GetServices(GetServicesRequest) returns (GetServicesResponse) {} + + // GetOperations returns all operation names for a given service + // known to the backend from traces within its retention period. + rpc GetOperations(GetOperationsRequest) returns (GetOperationsResponse) {} + + // FindTraces returns a stream that retrieves traces matching query parameters. + // + // The chunking rules are the same as for GetTraces. + // + // If no matching traces are found, an empty stream is returned. + rpc FindTraces(FindTracesRequest) returns (stream opentelemetry.proto.trace.v1.TracesData) {} + + // FindTraceIDs returns a stream that retrieves IDs of traces matching query parameters. + // + // If no matching traces are found, an empty stream is returned. + // + // This call behaves identically to FindTraces, except that it returns only the list + // of matching trace IDs. This is useful in some contexts, such as batch jobs, where a + // large list of trace IDs may be queried first and then the full traces are loaded + // in batches. + rpc FindTraceIDs(FindTracesRequest) returns (FindTraceIDsResponse) {} +} diff --git a/quickwit/quickwit-proto/src/codegen/jaeger/jaeger.storage.v2.rs b/quickwit/quickwit-proto/src/codegen/jaeger/jaeger.storage.v2.rs new file mode 100644 index 00000000000..9adef233a95 --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/jaeger/jaeger.storage.v2.rs @@ -0,0 +1,857 @@ +// This file is @generated by prost-build. +/// GetTraceParams represents the query for a single trace from the storage backend. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetTraceParams { + /// trace_id is a 16 byte array containing the unique identifier for the trace to query. + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + /// start_time is the start of the time interval to search for the trace_id. + /// + /// This field is optional. + #[prost(message, optional, tag = "2")] + pub start_time: ::core::option::Option<::prost_types::Timestamp>, + /// end_time is the end of the time interval to search for the trace_id. + /// + /// This field is optional. + #[prost(message, optional, tag = "3")] + pub end_time: ::core::option::Option<::prost_types::Timestamp>, +} +/// GetTracesRequest represents a request to retrieve multiple traces. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetTracesRequest { + #[prost(message, repeated, tag = "1")] + pub query: ::prost::alloc::vec::Vec, +} +/// GetServicesRequest represents a request to get service names. +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetServicesRequest {} +/// GetServicesResponse represents the response for GetServicesRequest. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetServicesResponse { + #[prost(string, repeated, tag = "1")] + pub services: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, +} +/// GetOperationsRequest represents a request to get operation names. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct GetOperationsRequest { + /// service is the name of the service for which to get operation names. + /// + /// This field is required. + #[prost(string, tag = "1")] + pub service: ::prost::alloc::string::String, + /// span_kind is the type of span which is used to distinguish between + /// spans generated in a particular context. + /// + /// This field is optional. + #[prost(string, tag = "2")] + pub span_kind: ::prost::alloc::string::String, +} +/// Operation contains information about an operation for a given service. +#[derive(Ord, PartialOrd)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct Operation { + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub span_kind: ::prost::alloc::string::String, +} +/// GetOperationsResponse represents the response for GetOperationsRequest. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetOperationsResponse { + #[prost(message, repeated, tag = "1")] + pub operations: ::prost::alloc::vec::Vec, +} +/// KeyValue and all its associated types are copied from opentelemetry-proto/common/v1/common.proto +/// (). +/// This type is used to store attributes in traces. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct KeyValue { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub value: ::core::option::Option, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AnyValue { + #[prost(oneof = "any_value::Value", tags = "1, 2, 3, 4, 5, 6, 7")] + pub value: ::core::option::Option, +} +/// Nested message and enum types in `AnyValue`. +pub mod any_value { + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(string, tag = "1")] + StringValue(::prost::alloc::string::String), + #[prost(bool, tag = "2")] + BoolValue(bool), + #[prost(int64, tag = "3")] + IntValue(i64), + #[prost(double, tag = "4")] + DoubleValue(f64), + #[prost(message, tag = "5")] + ArrayValue(super::ArrayValue), + #[prost(message, tag = "6")] + KvlistValue(super::KeyValueList), + #[prost(bytes, tag = "7")] + BytesValue(::prost::alloc::vec::Vec), + } +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct KeyValueList { + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ArrayValue { + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, +} +/// TraceQueryParameters contains query parameters to find traces. For a detailed +/// definition of each field in this message, refer to `TraceQueryParameters` in `jaeger.api_v3` +/// (). +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TraceQueryParameters { + #[prost(string, tag = "1")] + pub service_name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub operation_name: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "4")] + pub start_time_min: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "5")] + pub start_time_max: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "6")] + pub duration_min: ::core::option::Option<::prost_types::Duration>, + #[prost(message, optional, tag = "7")] + pub duration_max: ::core::option::Option<::prost_types::Duration>, + #[prost(int32, tag = "8")] + pub search_depth: i32, +} +/// FindTracesRequest represents a request to find traces. +/// It can be used to retrieve the traces (FindTraces) or simply +/// the trace IDs (FindTraceIDs). +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FindTracesRequest { + #[prost(message, optional, tag = "1")] + pub query: ::core::option::Option, +} +/// FoundTraceID is a wrapper around trace ID returned from FindTraceIDs +/// with an optional time range that may be used in GetTraces calls. +/// +/// The time range is provided as an optimization hint for some storage backends +/// that can perform more efficient queries when they know the approximate time range. +/// The value should not be used for precise time-based filtering or assumptions. +/// It is meant as a rough boundary and may not be populated in all cases. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FoundTraceId { + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub start: ::core::option::Option<::prost_types::Timestamp>, + #[prost(message, optional, tag = "3")] + pub end: ::core::option::Option<::prost_types::Timestamp>, +} +/// FindTraceIDsResponse represents the response for FindTracesRequest. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct FindTraceIDsResponse { + #[prost(message, repeated, tag = "1")] + pub trace_ids: ::prost::alloc::vec::Vec, +} +/// Generated client implementations. +pub mod trace_reader_client { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + use tonic::codegen::http::Uri; + /// TraceReader is a service that allows reading traces from storage. + /// Note that if you implement this service, you should also implement + /// OTEL's TraceService in package opentelemetry.proto.collector.trace.v1 + /// to allow pushing traces to the storage backend + /// () + #[derive(Debug, Clone)] + pub struct TraceReaderClient { + inner: tonic::client::Grpc, + } + impl TraceReaderClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into, + { + let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; + Ok(Self::new(conn)) + } + } + impl TraceReaderClient + where + T: tonic::client::GrpcService, + T::Error: Into, + T::ResponseBody: Body + std::marker::Send + 'static, + ::Error: Into + std::marker::Send, + { + pub fn new(inner: T) -> Self { + let inner = tonic::client::Grpc::new(inner); + Self { inner } + } + pub fn with_origin(inner: T, origin: Uri) -> Self { + let inner = tonic::client::Grpc::with_origin(inner, origin); + Self { inner } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> TraceReaderClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + , + >>::Error: Into + std::marker::Send + std::marker::Sync, + { + TraceReaderClient::new(InterceptedService::new(inner, interceptor)) + } + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an + /// error. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.send_compressed(encoding); + self + } + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.inner = self.inner.accept_compressed(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_decoding_message_size(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.inner = self.inner.max_encoding_message_size(limit); + self + } + /// GetTraces returns a stream that retrieves all traces with given IDs. + /// + /// Chunking requirements: + /// + /// * A single TracesData chunk MUST NOT contain spans from multiple traces. + /// * Large traces MAY be split across multiple, *consecutive* TracesData chunks. + /// * Each returned TracesData object MUST NOT be empty. + /// + /// Edge cases: + /// + /// * If no spans are found for any given trace ID, the ID is ignored. + /// * If none of the trace IDs are found in the storage, an empty response is returned. + pub async fn get_traces( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response< + tonic::codec::Streaming< + super::super::super::super::opentelemetry::proto::trace::v1::TracesData, + >, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/jaeger.storage.v2.TraceReader/GetTraces", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("jaeger.storage.v2.TraceReader", "GetTraces")); + self.inner.server_streaming(req, path, codec).await + } + /// GetServices returns all service names known to the backend from traces + /// within its retention period. + pub async fn get_services( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/jaeger.storage.v2.TraceReader/GetServices", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("jaeger.storage.v2.TraceReader", "GetServices")); + self.inner.unary(req, path, codec).await + } + /// GetOperations returns all operation names for a given service + /// known to the backend from traces within its retention period. + pub async fn get_operations( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/jaeger.storage.v2.TraceReader/GetOperations", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("jaeger.storage.v2.TraceReader", "GetOperations"), + ); + self.inner.unary(req, path, codec).await + } + /// FindTraces returns a stream that retrieves traces matching query parameters. + /// + /// The chunking rules are the same as for GetTraces. + /// + /// If no matching traces are found, an empty stream is returned. + pub async fn find_traces( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response< + tonic::codec::Streaming< + super::super::super::super::opentelemetry::proto::trace::v1::TracesData, + >, + >, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/jaeger.storage.v2.TraceReader/FindTraces", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert(GrpcMethod::new("jaeger.storage.v2.TraceReader", "FindTraces")); + self.inner.server_streaming(req, path, codec).await + } + /// FindTraceIDs returns a stream that retrieves IDs of traces matching query parameters. + /// + /// If no matching traces are found, an empty stream is returned. + /// + /// This call behaves identically to FindTraces, except that it returns only the list + /// of matching trace IDs. This is useful in some contexts, such as batch jobs, where a + /// large list of trace IDs may be queried first and then the full traces are loaded + /// in batches. + pub async fn find_trace_i_ds( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + > { + self.inner + .ready() + .await + .map_err(|e| { + tonic::Status::unknown( + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic_prost::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static( + "/jaeger.storage.v2.TraceReader/FindTraceIDs", + ); + let mut req = request.into_request(); + req.extensions_mut() + .insert( + GrpcMethod::new("jaeger.storage.v2.TraceReader", "FindTraceIDs"), + ); + self.inner.unary(req, path, codec).await + } + } +} +/// Generated server implementations. +pub mod trace_reader_server { + #![allow( + unused_variables, + dead_code, + missing_docs, + clippy::wildcard_imports, + clippy::let_unit_value, + )] + use tonic::codegen::*; + /// Generated trait containing gRPC methods that should be implemented for use with TraceReaderServer. + #[async_trait] + pub trait TraceReader: std::marker::Send + std::marker::Sync + 'static { + /// Server streaming response type for the GetTraces method. + type GetTracesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::super::super::opentelemetry::proto::trace::v1::TracesData, + tonic::Status, + >, + > + + std::marker::Send + + 'static; + /// GetTraces returns a stream that retrieves all traces with given IDs. + /// + /// Chunking requirements: + /// + /// * A single TracesData chunk MUST NOT contain spans from multiple traces. + /// * Large traces MAY be split across multiple, *consecutive* TracesData chunks. + /// * Each returned TracesData object MUST NOT be empty. + /// + /// Edge cases: + /// + /// * If no spans are found for any given trace ID, the ID is ignored. + /// * If none of the trace IDs are found in the storage, an empty response is returned. + async fn get_traces( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// GetServices returns all service names known to the backend from traces + /// within its retention period. + async fn get_services( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// GetOperations returns all operation names for a given service + /// known to the backend from traces within its retention period. + async fn get_operations( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + /// Server streaming response type for the FindTraces method. + type FindTracesStream: tonic::codegen::tokio_stream::Stream< + Item = std::result::Result< + super::super::super::super::opentelemetry::proto::trace::v1::TracesData, + tonic::Status, + >, + > + + std::marker::Send + + 'static; + /// FindTraces returns a stream that retrieves traces matching query parameters. + /// + /// The chunking rules are the same as for GetTraces. + /// + /// If no matching traces are found, an empty stream is returned. + async fn find_traces( + &self, + request: tonic::Request, + ) -> std::result::Result, tonic::Status>; + /// FindTraceIDs returns a stream that retrieves IDs of traces matching query parameters. + /// + /// If no matching traces are found, an empty stream is returned. + /// + /// This call behaves identically to FindTraces, except that it returns only the list + /// of matching trace IDs. This is useful in some contexts, such as batch jobs, where a + /// large list of trace IDs may be queried first and then the full traces are loaded + /// in batches. + async fn find_trace_i_ds( + &self, + request: tonic::Request, + ) -> std::result::Result< + tonic::Response, + tonic::Status, + >; + } + /// TraceReader is a service that allows reading traces from storage. + /// Note that if you implement this service, you should also implement + /// OTEL's TraceService in package opentelemetry.proto.collector.trace.v1 + /// to allow pushing traces to the storage backend + /// () + #[derive(Debug)] + pub struct TraceReaderServer { + inner: Arc, + accept_compression_encodings: EnabledCompressionEncodings, + send_compression_encodings: EnabledCompressionEncodings, + max_decoding_message_size: Option, + max_encoding_message_size: Option, + } + impl TraceReaderServer { + pub fn new(inner: T) -> Self { + Self::from_arc(Arc::new(inner)) + } + pub fn from_arc(inner: Arc) -> Self { + Self { + inner, + accept_compression_encodings: Default::default(), + send_compression_encodings: Default::default(), + max_decoding_message_size: None, + max_encoding_message_size: None, + } + } + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> InterceptedService + where + F: tonic::service::Interceptor, + { + InterceptedService::new(Self::new(inner), interceptor) + } + /// Enable decompressing requests with the given encoding. + #[must_use] + pub fn accept_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.accept_compression_encodings.enable(encoding); + self + } + /// Compress responses with the given encoding, if the client supports it. + #[must_use] + pub fn send_compressed(mut self, encoding: CompressionEncoding) -> Self { + self.send_compression_encodings.enable(encoding); + self + } + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(mut self, limit: usize) -> Self { + self.max_decoding_message_size = Some(limit); + self + } + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(mut self, limit: usize) -> Self { + self.max_encoding_message_size = Some(limit); + self + } + } + impl tonic::codegen::Service> for TraceReaderServer + where + T: TraceReader, + B: Body + std::marker::Send + 'static, + B::Error: Into + std::marker::Send + 'static, + { + type Response = http::Response; + type Error = std::convert::Infallible; + type Future = BoxFuture; + fn poll_ready( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + Poll::Ready(Ok(())) + } + fn call(&mut self, req: http::Request) -> Self::Future { + match req.uri().path() { + "/jaeger.storage.v2.TraceReader/GetTraces" => { + #[allow(non_camel_case_types)] + struct GetTracesSvc(pub Arc); + impl< + T: TraceReader, + > tonic::server::ServerStreamingService + for GetTracesSvc { + type Response = super::super::super::super::opentelemetry::proto::trace::v1::TracesData; + type ResponseStream = T::GetTracesStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_traces(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetTracesSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/jaeger.storage.v2.TraceReader/GetServices" => { + #[allow(non_camel_case_types)] + struct GetServicesSvc(pub Arc); + impl< + T: TraceReader, + > tonic::server::UnaryService + for GetServicesSvc { + type Response = super::GetServicesResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_services(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetServicesSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/jaeger.storage.v2.TraceReader/GetOperations" => { + #[allow(non_camel_case_types)] + struct GetOperationsSvc(pub Arc); + impl< + T: TraceReader, + > tonic::server::UnaryService + for GetOperationsSvc { + type Response = super::GetOperationsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::get_operations(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = GetOperationsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/jaeger.storage.v2.TraceReader/FindTraces" => { + #[allow(non_camel_case_types)] + struct FindTracesSvc(pub Arc); + impl< + T: TraceReader, + > tonic::server::ServerStreamingService + for FindTracesSvc { + type Response = super::super::super::super::opentelemetry::proto::trace::v1::TracesData; + type ResponseStream = T::FindTracesStream; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::find_traces(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = FindTracesSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.server_streaming(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + "/jaeger.storage.v2.TraceReader/FindTraceIDs" => { + #[allow(non_camel_case_types)] + struct FindTraceIDsSvc(pub Arc); + impl< + T: TraceReader, + > tonic::server::UnaryService + for FindTraceIDsSvc { + type Response = super::FindTraceIDsResponse; + type Future = BoxFuture< + tonic::Response, + tonic::Status, + >; + fn call( + &mut self, + request: tonic::Request, + ) -> Self::Future { + let inner = Arc::clone(&self.0); + let fut = async move { + ::find_trace_i_ds(&inner, request).await + }; + Box::pin(fut) + } + } + let accept_compression_encodings = self.accept_compression_encodings; + let send_compression_encodings = self.send_compression_encodings; + let max_decoding_message_size = self.max_decoding_message_size; + let max_encoding_message_size = self.max_encoding_message_size; + let inner = self.inner.clone(); + let fut = async move { + let method = FindTraceIDsSvc(inner); + let codec = tonic_prost::ProstCodec::default(); + let mut grpc = tonic::server::Grpc::new(codec) + .apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ) + .apply_max_message_size_config( + max_decoding_message_size, + max_encoding_message_size, + ); + let res = grpc.unary(method, req).await; + Ok(res) + }; + Box::pin(fut) + } + _ => { + Box::pin(async move { + let mut response = http::Response::new( + tonic::body::Body::default(), + ); + let headers = response.headers_mut(); + headers + .insert( + tonic::Status::GRPC_STATUS, + (tonic::Code::Unimplemented as i32).into(), + ); + headers + .insert( + http::header::CONTENT_TYPE, + tonic::metadata::GRPC_CONTENT_TYPE, + ); + Ok(response) + }) + } + } + } + } + impl Clone for TraceReaderServer { + fn clone(&self) -> Self { + let inner = self.inner.clone(); + Self { + inner, + accept_compression_encodings: self.accept_compression_encodings, + send_compression_encodings: self.send_compression_encodings, + max_decoding_message_size: self.max_decoding_message_size, + max_encoding_message_size: self.max_encoding_message_size, + } + } + } + /// Generated gRPC service name + pub const SERVICE_NAME: &str = "jaeger.storage.v2.TraceReader"; + impl tonic::server::NamedService for TraceReaderServer { + const NAME: &'static str = SERVICE_NAME; + } +} diff --git a/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.common.v1.rs b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.common.v1.rs new file mode 100644 index 00000000000..e0c4b431d7b --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.common.v1.rs @@ -0,0 +1,78 @@ +// This file is @generated by prost-build. +/// AnyValue is used to represent any type of attribute value. AnyValue may contain a +/// primitive value such as a string or integer or it may contain an arbitrary nested +/// object containing arrays, key-value lists and primitives. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct AnyValue { + /// The value is one of the listed fields. It is valid for all values to be unspecified + /// in which case this AnyValue is considered to be "empty". + #[prost(oneof = "any_value::Value", tags = "1, 2, 3, 4, 5, 6, 7")] + pub value: ::core::option::Option, +} +/// Nested message and enum types in `AnyValue`. +pub mod any_value { + /// The value is one of the listed fields. It is valid for all values to be unspecified + /// in which case this AnyValue is considered to be "empty". + #[derive(Clone, PartialEq, ::prost::Oneof)] + pub enum Value { + #[prost(string, tag = "1")] + StringValue(::prost::alloc::string::String), + #[prost(bool, tag = "2")] + BoolValue(bool), + #[prost(int64, tag = "3")] + IntValue(i64), + #[prost(double, tag = "4")] + DoubleValue(f64), + #[prost(message, tag = "5")] + ArrayValue(super::ArrayValue), + #[prost(message, tag = "6")] + KvlistValue(super::KeyValueList), + #[prost(bytes, tag = "7")] + BytesValue(::prost::alloc::vec::Vec), + } +} +/// ArrayValue is a list of AnyValue messages. We need ArrayValue as a message +/// since oneof in AnyValue does not allow repeated fields. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ArrayValue { + /// Array of values. The array may be empty (contain 0 elements). + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, +} +/// KeyValueList is a list of KeyValue messages. We need KeyValueList as a message +/// since `oneof` in AnyValue does not allow repeated fields. Everywhere else where we need +/// a list of KeyValue messages (e.g. in Span) we use `repeated KeyValue` directly to +/// avoid unnecessary extra wrapping (which slows down the protocol). The 2 approaches +/// are semantically equivalent. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct KeyValueList { + /// A collection of key/value pairs of key-value pairs. The list may be empty (may + /// contain 0 elements). + /// The keys MUST be unique (it is not allowed to have more than one + /// value with the same key). + #[prost(message, repeated, tag = "1")] + pub values: ::prost::alloc::vec::Vec, +} +/// KeyValue is a key-value pair that is used to store Span attributes, Link +/// attributes, etc. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct KeyValue { + #[prost(string, tag = "1")] + pub key: ::prost::alloc::string::String, + #[prost(message, optional, tag = "2")] + pub value: ::core::option::Option, +} +/// InstrumentationScope is a message representing the instrumentation scope information +/// such as the fully qualified name and version. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct InstrumentationScope { + /// An empty instrumentation scope name means the name is unknown. + #[prost(string, tag = "1")] + pub name: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub version: ::prost::alloc::string::String, + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec, + #[prost(uint32, tag = "4")] + pub dropped_attributes_count: u32, +} diff --git a/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.resource.v1.rs b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.resource.v1.rs new file mode 100644 index 00000000000..0daa6740b11 --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.resource.v1.rs @@ -0,0 +1,14 @@ +// This file is @generated by prost-build. +/// Resource information. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Resource { + /// Set of attributes that describe the resource. + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + #[prost(message, repeated, tag = "1")] + pub attributes: ::prost::alloc::vec::Vec, + /// dropped_attributes_count is the number of dropped attributes. If the value is 0, then + /// no attributes were dropped. + #[prost(uint32, tag = "2")] + pub dropped_attributes_count: u32, +} diff --git a/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs new file mode 100644 index 00000000000..6736d97c7e2 --- /dev/null +++ b/quickwit/quickwit-proto/src/codegen/jaeger/opentelemetry.proto.trace.v1.rs @@ -0,0 +1,335 @@ +// This file is @generated by prost-build. +/// TracesData represents the traces data that can be stored in a persistent storage, +/// OR can be embedded by other protocols that transfer OTLP traces data but do +/// not implement the OTLP protocol. +/// +/// The main difference between this message and collector protocol is that +/// in this message there will not be any "control" or "metadata" specific to +/// OTLP protocol. +/// +/// When new fields are added into this message, the OTLP request MUST be updated +/// as well. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracesData { + /// An array of ResourceSpans. + /// For data coming from a single resource this array will typically contain + /// one element. Intermediary nodes that receive data from multiple origins + /// typically batch the data before forwarding further and in that case this + /// array will contain multiple elements. + #[prost(message, repeated, tag = "1")] + pub resource_spans: ::prost::alloc::vec::Vec, +} +/// A collection of ScopeSpans from a Resource. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ResourceSpans { + /// The resource for the spans in this message. + /// If this field is not set then no resource info is known. + #[prost(message, optional, tag = "1")] + pub resource: ::core::option::Option, + /// A list of ScopeSpans that originate from a resource. + #[prost(message, repeated, tag = "2")] + pub scope_spans: ::prost::alloc::vec::Vec, + /// This schema_url applies to the data in the "resource" field. It does not apply + /// to the data in the "scope_spans" field which have their own schema_url field. + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +/// A collection of Spans produced by an InstrumentationScope. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ScopeSpans { + /// The instrumentation scope information for the spans in this message. + /// Semantically when InstrumentationScope isn't set, it is equivalent with + /// an empty instrumentation scope name (unknown). + #[prost(message, optional, tag = "1")] + pub scope: ::core::option::Option, + /// A list of Spans that originate from an instrumentation scope. + #[prost(message, repeated, tag = "2")] + pub spans: ::prost::alloc::vec::Vec, + /// This schema_url applies to all spans and span events in the "spans" field. + #[prost(string, tag = "3")] + pub schema_url: ::prost::alloc::string::String, +} +/// A Span represents a single operation performed by a single component of the system. +/// +/// The next available field id is 17. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Span { + /// A unique identifier for a trace. All spans from the same trace share + /// the same `trace_id`. The ID is a 16-byte array. An ID with all zeroes + /// is considered invalid. + /// + /// This field is semantically required. Receiver should generate new + /// random trace_id if empty or invalid trace_id was received. + /// + /// This field is required. + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + /// A unique identifier for a span within a trace, assigned when the span + /// is created. The ID is an 8-byte array. An ID with all zeroes is considered + /// invalid. + /// + /// This field is semantically required. Receiver should generate new + /// random span_id if empty or invalid span_id was received. + /// + /// This field is required. + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + /// trace_state conveys information about request position in multiple distributed tracing graphs. + /// It is a trace_state in w3c-trace-context format: + /// See also for more details about this field. + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + /// The `span_id` of this span's parent span. If this is a root span, then this + /// field must be empty. The ID is an 8-byte array. + #[prost(bytes = "vec", tag = "4")] + pub parent_span_id: ::prost::alloc::vec::Vec, + /// A description of the span's operation. + /// + /// For example, the name can be a qualified method name or a file name + /// and a line number where the operation is called. A best practice is to use + /// the same display name at the same call point in an application. + /// This makes it easier to correlate spans in different traces. + /// + /// This field is semantically required to be set to non-empty string. + /// Empty value is equivalent to an unknown span name. + /// + /// This field is required. + #[prost(string, tag = "5")] + pub name: ::prost::alloc::string::String, + /// Distinguishes between spans generated in a particular context. For example, + /// two spans with the same name may be distinguished using `CLIENT` (caller) + /// and `SERVER` (callee) to identify queueing latency associated with the span. + #[prost(enumeration = "span::SpanKind", tag = "6")] + pub kind: i32, + /// start_time_unix_nano is the start time of the span. On the client side, this is the time + /// kept by the local machine where the span execution starts. On the server side, this + /// is the time when the server's application handler starts running. + /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + /// + /// This field is semantically required and it is expected that end_time >= start_time. + #[prost(fixed64, tag = "7")] + pub start_time_unix_nano: u64, + /// end_time_unix_nano is the end time of the span. On the client side, this is the time + /// kept by the local machine where the span execution ends. On the server side, this + /// is the time when the server application handler stops running. + /// Value is UNIX Epoch time in nanoseconds since 00:00:00 UTC on 1 January 1970. + /// + /// This field is semantically required and it is expected that end_time >= start_time. + #[prost(fixed64, tag = "8")] + pub end_time_unix_nano: u64, + /// attributes is a collection of key/value pairs. Note, global attributes + /// like server name can be set using the resource API. Examples of attributes: + /// + /// "/http/user_agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36" + /// "/http/server_latency": 300 + /// "abc.com/myattribute": true + /// "abc.com/score": 10.239 + /// + /// The OpenTelemetry API specification further restricts the allowed value types: + /// + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + #[prost(message, repeated, tag = "9")] + pub attributes: ::prost::alloc::vec::Vec, + /// dropped_attributes_count is the number of attributes that were discarded. Attributes + /// can be discarded because their keys are too long or because there are too many + /// attributes. If this value is 0, then no attributes were dropped. + #[prost(uint32, tag = "10")] + pub dropped_attributes_count: u32, + /// events is a collection of Event items. + #[prost(message, repeated, tag = "11")] + pub events: ::prost::alloc::vec::Vec, + /// dropped_events_count is the number of dropped events. If the value is 0, then no + /// events were dropped. + #[prost(uint32, tag = "12")] + pub dropped_events_count: u32, + /// links is a collection of Links, which are references from this span to a span + /// in the same or different trace. + #[prost(message, repeated, tag = "13")] + pub links: ::prost::alloc::vec::Vec, + /// dropped_links_count is the number of dropped links after the maximum size was + /// enforced. If this value is 0, then no links were dropped. + #[prost(uint32, tag = "14")] + pub dropped_links_count: u32, + /// An optional final status for this span. Semantically when Status isn't set, it means + /// span's status code is unset, i.e. assume STATUS_CODE_UNSET (code = 0). + #[prost(message, optional, tag = "15")] + pub status: ::core::option::Option, +} +/// Nested message and enum types in `Span`. +pub mod span { + /// Event is a time-stamped annotation of the span, consisting of user-supplied + /// text description and key-value pairs. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Event { + /// time_unix_nano is the time the event occurred. + #[prost(fixed64, tag = "1")] + pub time_unix_nano: u64, + /// name of the event. + /// This field is semantically required to be set to non-empty string. + #[prost(string, tag = "2")] + pub name: ::prost::alloc::string::String, + /// attributes is a collection of attribute key/value pairs on the event. + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + #[prost(message, repeated, tag = "3")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + /// dropped_attributes_count is the number of dropped attributes. If the value is 0, + /// then no attributes were dropped. + #[prost(uint32, tag = "4")] + pub dropped_attributes_count: u32, + } + /// A pointer from the current span to another span in the same trace or in a + /// different trace. For example, this can be used in batching operations, + /// where a single batch handler processes multiple requests from different + /// traces or when the handler receives a request from a different project. + #[derive(Clone, PartialEq, ::prost::Message)] + pub struct Link { + /// A unique identifier of a trace that this linked span is part of. The ID is a + /// 16-byte array. + #[prost(bytes = "vec", tag = "1")] + pub trace_id: ::prost::alloc::vec::Vec, + /// A unique identifier for the linked span. The ID is an 8-byte array. + #[prost(bytes = "vec", tag = "2")] + pub span_id: ::prost::alloc::vec::Vec, + /// The trace_state associated with the link. + #[prost(string, tag = "3")] + pub trace_state: ::prost::alloc::string::String, + /// attributes is a collection of attribute key/value pairs on the link. + /// Attribute keys MUST be unique (it is not allowed to have more than one + /// attribute with the same key). + #[prost(message, repeated, tag = "4")] + pub attributes: ::prost::alloc::vec::Vec< + super::super::super::common::v1::KeyValue, + >, + /// dropped_attributes_count is the number of dropped attributes. If the value is 0, + /// then no attributes were dropped. + #[prost(uint32, tag = "5")] + pub dropped_attributes_count: u32, + } + /// SpanKind is the type of span. Can be used to specify additional relationships between spans + /// in addition to a parent/child relationship. + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum SpanKind { + /// Unspecified. Do NOT use as default. + /// Implementations MAY assume SpanKind to be INTERNAL when receiving UNSPECIFIED. + Unspecified = 0, + /// Indicates that the span represents an internal operation within an application, + /// as opposed to an operation happening at the boundaries. Default value. + Internal = 1, + /// Indicates that the span covers server-side handling of an RPC or other + /// remote network request. + Server = 2, + /// Indicates that the span describes a request to some remote service. + Client = 3, + /// Indicates that the span describes a producer sending a message to a broker. + /// Unlike CLIENT and SERVER, there is often no direct critical path latency relationship + /// between producer and consumer spans. A PRODUCER span ends when the message was accepted + /// by the broker while the logical processing of the message might span a much longer time. + Producer = 4, + /// Indicates that the span describes consumer receiving a message from a broker. + /// Like the PRODUCER kind, there is often no direct critical path latency relationship + /// between producer and consumer spans. + Consumer = 5, + } + impl SpanKind { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unspecified => "SPAN_KIND_UNSPECIFIED", + Self::Internal => "SPAN_KIND_INTERNAL", + Self::Server => "SPAN_KIND_SERVER", + Self::Client => "SPAN_KIND_CLIENT", + Self::Producer => "SPAN_KIND_PRODUCER", + Self::Consumer => "SPAN_KIND_CONSUMER", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "SPAN_KIND_UNSPECIFIED" => Some(Self::Unspecified), + "SPAN_KIND_INTERNAL" => Some(Self::Internal), + "SPAN_KIND_SERVER" => Some(Self::Server), + "SPAN_KIND_CLIENT" => Some(Self::Client), + "SPAN_KIND_PRODUCER" => Some(Self::Producer), + "SPAN_KIND_CONSUMER" => Some(Self::Consumer), + _ => None, + } + } + } +} +/// The Status type defines a logical error model that is suitable for different +/// programming environments, including REST APIs and RPC APIs. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct Status { + /// A developer-facing human readable error message. + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, + /// The status code. + #[prost(enumeration = "status::StatusCode", tag = "3")] + pub code: i32, +} +/// Nested message and enum types in `Status`. +pub mod status { + /// For the semantics of status codes see + /// + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum StatusCode { + /// The default status. + Unset = 0, + /// The Span has been validated by an Application developer or Operator to + /// have completed successfully. + Ok = 1, + /// The Span contains an error. + Error = 2, + } + impl StatusCode { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Unset => "STATUS_CODE_UNSET", + Self::Ok => "STATUS_CODE_OK", + Self::Error => "STATUS_CODE_ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "STATUS_CODE_UNSET" => Some(Self::Unset), + "STATUS_CODE_OK" => Some(Self::Ok), + "STATUS_CODE_ERROR" => Some(Self::Error), + _ => None, + } + } + } +} diff --git a/quickwit/quickwit-proto/src/lib.rs b/quickwit/quickwit-proto/src/lib.rs index 30d65985346..f4ddb734d2a 100644 --- a/quickwit/quickwit-proto/src/lib.rs +++ b/quickwit/quickwit-proto/src/lib.rs @@ -49,6 +49,9 @@ pub mod jaeger { pub mod v1 { include!("codegen/jaeger/jaeger.storage.v1.rs"); } + pub mod v2 { + include!("codegen/jaeger/jaeger.storage.v2.rs"); + } } } diff --git a/quickwit/quickwit-serve/src/grpc.rs b/quickwit/quickwit-serve/src/grpc.rs index c136d8436a8..27d370c38aa 100644 --- a/quickwit/quickwit-serve/src/grpc.rs +++ b/quickwit/quickwit-serve/src/grpc.rs @@ -23,6 +23,7 @@ use quickwit_config::service::QuickwitService; use quickwit_proto::developer::DeveloperServiceClient; use quickwit_proto::indexing::IndexingServiceClient; use quickwit_proto::jaeger::storage::v1::span_reader_plugin_server::SpanReaderPluginServer; +use quickwit_proto::jaeger::storage::v2::trace_reader_server::TraceReaderServer; use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_server::LogsServiceServer; use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_server::TraceServiceServer; use quickwit_proto::search::search_service_server::SearchServiceServer; @@ -203,6 +204,15 @@ pub(crate) async fn start_grpc_server( } else { None }; + + // Mount gRPC jaeger v2 service (TraceReader) if present. + let jaeger_v2_grpc_service = if let Some(jaeger_service) = services.jaeger_service_opt.clone() { + enabled_grpc_services.insert("jaeger-v2"); + Some(TraceReaderServer::new(jaeger_service)) + } else { + None + }; + let developer_grpc_service = { enabled_grpc_services.insert("developer"); file_descriptor_sets.push(quickwit_proto::developer::DEVELOPER_FILE_DESCRIPTOR_SET); @@ -230,6 +240,7 @@ pub(crate) async fn start_grpc_server( .add_optional_service(ingest_router_grpc_service) .add_optional_service(ingester_grpc_service) .add_optional_service(jaeger_grpc_service) + .add_optional_service(jaeger_v2_grpc_service) .add_optional_service(metastore_grpc_service) .add_optional_service(otlp_log_grpc_service) .add_optional_service(otlp_trace_grpc_service)