Skip to content

Commit fd0e0c0

Browse files
authored
chore: decouple hyper from trace_processor (#390)
* decouple `hyper` from `trace_processor` * add `handle_traces` * fix tests * removed unused import
1 parent 34753b1 commit fd0e0c0

File tree

2 files changed

+143
-175
lines changed

2 files changed

+143
-175
lines changed

bottlecap/src/traces/trace_agent.rs

Lines changed: 97 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@ use tracing::{debug, error};
1414
use crate::config;
1515
use crate::tags::provider;
1616
use crate::traces::{stats_flusher, stats_processor, trace_flusher, trace_processor};
17-
use datadog_trace_mini_agent::http_utils::log_and_create_http_response;
17+
use datadog_trace_mini_agent::http_utils::{self, log_and_create_http_response};
1818
use datadog_trace_protobuf::pb;
19-
use datadog_trace_utils::trace_utils::SendData;
19+
use datadog_trace_utils::trace_utils::{self, SendData};
2020

2121
const TRACE_AGENT_PORT: usize = 8126;
2222
const V4_TRACE_ENDPOINT_PATH: &str = "/v0.4/traces";
@@ -133,30 +133,38 @@ impl TraceAgent {
133133
tags_provider: Arc<provider::Provider>,
134134
) -> http::Result<Response<Body>> {
135135
match (req.method(), req.uri().path()) {
136-
(&Method::PUT | &Method::POST, V4_TRACE_ENDPOINT_PATH) => {
137-
match trace_processor
138-
.process_traces(config, req, trace_tx, tags_provider, ApiVersion::V04)
139-
.await
140-
{
141-
Ok(result) => Ok(result),
142-
Err(err) => log_and_create_http_response(
143-
&format!("Error processing traces: {err}"),
144-
StatusCode::INTERNAL_SERVER_ERROR,
145-
),
146-
}
147-
}
148-
(&Method::PUT | &Method::POST, V5_TRACE_ENDPOINT_PATH) => {
149-
match trace_processor
150-
.process_traces(config, req, trace_tx, tags_provider, ApiVersion::V05)
151-
.await
152-
{
153-
Ok(result) => Ok(result),
154-
Err(err) => log_and_create_http_response(
155-
&format!("Error processing traces: {err}"),
156-
StatusCode::INTERNAL_SERVER_ERROR,
157-
),
158-
}
159-
}
136+
(&Method::PUT | &Method::POST, V4_TRACE_ENDPOINT_PATH) => match Self::handle_traces(
137+
config,
138+
req,
139+
trace_processor.clone(),
140+
trace_tx,
141+
tags_provider,
142+
ApiVersion::V04,
143+
)
144+
.await
145+
{
146+
Ok(result) => Ok(result),
147+
Err(err) => log_and_create_http_response(
148+
&format!("Error processing traces: {err}"),
149+
StatusCode::INTERNAL_SERVER_ERROR,
150+
),
151+
},
152+
(&Method::PUT | &Method::POST, V5_TRACE_ENDPOINT_PATH) => match Self::handle_traces(
153+
config,
154+
req,
155+
trace_processor.clone(),
156+
trace_tx,
157+
tags_provider,
158+
ApiVersion::V05,
159+
)
160+
.await
161+
{
162+
Ok(result) => Ok(result),
163+
Err(err) => log_and_create_http_response(
164+
&format!("Error processing traces: {err}"),
165+
StatusCode::INTERNAL_SERVER_ERROR,
166+
),
167+
},
160168
(&Method::PUT | &Method::POST, STATS_ENDPOINT_PATH) => {
161169
match stats_processor.process_stats(req, stats_tx).await {
162170
Ok(result) => Ok(result),
@@ -181,6 +189,69 @@ impl TraceAgent {
181189
}
182190
}
183191

192+
async fn handle_traces(
193+
config: Arc<config::Config>,
194+
req: Request<Body>,
195+
trace_processor: Arc<dyn trace_processor::TraceProcessor + Send + Sync>,
196+
trace_tx: Sender<SendData>,
197+
tags_provider: Arc<provider::Provider>,
198+
version: ApiVersion,
199+
) -> http::Result<Response<Body>> {
200+
debug!("Received traces to process");
201+
let (parts, body) = req.into_parts();
202+
203+
if let Some(response) = http_utils::verify_request_content_length(
204+
&parts.headers,
205+
MAX_CONTENT_LENGTH,
206+
"Error processing traces",
207+
) {
208+
return response;
209+
}
210+
211+
let tracer_header_tags = (&parts.headers).into();
212+
213+
let (body_size, traces) = match version {
214+
ApiVersion::V04 => match trace_utils::get_traces_from_request_body(body).await {
215+
Ok(result) => result,
216+
Err(err) => {
217+
return log_and_create_http_response(
218+
&format!("Error deserializing trace from request body: {err}"),
219+
StatusCode::INTERNAL_SERVER_ERROR,
220+
);
221+
}
222+
},
223+
ApiVersion::V05 => match trace_utils::get_v05_traces_from_request_body(body).await {
224+
Ok(result) => result,
225+
Err(err) => {
226+
return log_and_create_http_response(
227+
&format!("Error deserializing trace from request body: {err}"),
228+
StatusCode::INTERNAL_SERVER_ERROR,
229+
);
230+
}
231+
},
232+
};
233+
234+
let send_data = trace_processor.process_traces(
235+
config,
236+
tags_provider,
237+
tracer_header_tags,
238+
traces,
239+
body_size,
240+
);
241+
242+
// send trace payload to our trace flusher
243+
match trace_tx.send(send_data).await {
244+
Ok(()) => log_and_create_http_response(
245+
"Successfully buffered traces to be flushed.",
246+
StatusCode::ACCEPTED,
247+
),
248+
Err(err) => log_and_create_http_response(
249+
&format!("Error sending traces to the trace flusher: {err}"),
250+
StatusCode::INTERNAL_SERVER_ERROR,
251+
),
252+
}
253+
}
254+
184255
fn info_handler() -> http::Result<Response<Body>> {
185256
let response_json = json!(
186257
{

0 commit comments

Comments
 (0)