Skip to content

Commit e618fdf

Browse files
authored
chore(datadog_agent source): reuse code from util/http/encoding.rs (#24071)
* chore(datadog_agent source): reuse code from src/sources/util/http/encoding.rs * update comment
1 parent a1ca14f commit e618fdf

File tree

8 files changed

+25
-36
lines changed

8 files changed

+25
-36
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -643,7 +643,7 @@ sources-aws_ecs_metrics = ["sources-utils-http-client"]
643643
sources-aws_kinesis_firehose = ["dep:base64"]
644644
sources-aws_s3 = ["aws-core", "dep:aws-sdk-sqs", "dep:aws-sdk-s3", "dep:async-compression", "sources-aws_sqs", "tokio-util/io"]
645645
sources-aws_sqs = ["aws-core", "dep:aws-sdk-sqs"]
646-
sources-datadog_agent = ["sources-utils-http-error", "protobuf-build", "dep:prost"]
646+
sources-datadog_agent = ["sources-utils-http-encoding", "protobuf-build", "dep:prost"]
647647
sources-demo_logs = ["dep:fakedata"]
648648
sources-dnstap = ["sources-utils-net-tcp", "dep:base64", "dep:hickory-proto", "dep:dnsmsg-parser", "dep:dnstap-parser", "protobuf-build", "dep:prost"]
649649
sources-docker_logs = ["docker"]

src/sources/datadog_agent/mod.rs

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ use crate::{
5757
},
5858
event::Event,
5959
http::{KeepaliveConfig, MaxConnectionAgeLayer, build_http_trace_layer},
60-
internal_events::{HttpBytesReceived, HttpDecompressError, StreamClosedError},
60+
internal_events::{HttpBytesReceived, StreamClosedError},
6161
schema,
6262
serde::{bool_or_struct, default_decoding, default_framing_message_based},
63-
sources::{self},
63+
sources::{self, util::http::emit_decompress_error},
6464
tls::{MaybeTlsSettings, TlsEnableableConfig},
6565
};
6666

@@ -475,20 +475,20 @@ impl DatadogAgentSource {
475475
let mut decoded = Vec::new();
476476
MultiGzDecoder::new(body.reader())
477477
.read_to_end(&mut decoded)
478-
.map_err(|error| handle_decode_error(encoding, error))?;
478+
.map_err(|error| emit_decompress_error(encoding, error))?;
479479
decoded.into()
480480
}
481481
"zstd" => {
482482
let mut decoded = Vec::new();
483483
zstd::stream::copy_decode(body.reader(), &mut decoded)
484-
.map_err(|error| handle_decode_error(encoding, error))?;
484+
.map_err(|error| emit_decompress_error(encoding, error))?;
485485
decoded.into()
486486
}
487487
"deflate" | "x-deflate" => {
488488
let mut decoded = Vec::new();
489489
ZlibDecoder::new(body.reader())
490490
.read_to_end(&mut decoded)
491-
.map_err(|error| handle_decode_error(encoding, error))?;
491+
.map_err(|error| emit_decompress_error(encoding, error))?;
492492
decoded.into()
493493
}
494494
encoding => {
@@ -548,17 +548,6 @@ pub(crate) async fn handle_request(
548548
}
549549
}
550550

551-
fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
552-
emit!(HttpDecompressError {
553-
encoding,
554-
error: &error
555-
});
556-
ErrorMessage::new(
557-
StatusCode::UNPROCESSABLE_ENTITY,
558-
format!("Failed decompressing payload with {encoding} decoder."),
559-
)
560-
}
561-
562551
// https://github.com/DataDog/datadog-agent/blob/a33248c2bc125920a9577af1e16f12298875a4ad/pkg/logs/processor/json.go#L23-L49
563552
#[derive(Clone, Debug, Deserialize, Serialize)]
564553
#[serde(deny_unknown_fields)]

src/sources/opentelemetry/http.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::{
3939
sources::{
4040
http_server::HttpConfigParamKind,
4141
opentelemetry::config::{LOGS, METRICS, OpentelemetryConfig, TRACES},
42-
util::{add_headers, decode},
42+
util::{add_headers, decompress_body},
4343
},
4444
tls::MaybeTlsSettings,
4545
};
@@ -214,9 +214,9 @@ fn build_warp_log_filter(
214214
deserializer: Option<OtlpDeserializer>,
215215
) -> BoxedFilter<(Response,)> {
216216
let make_events = move |encoding_header: Option<String>, headers: HeaderMap, body: Bytes| {
217-
decode(encoding_header.as_deref(), body)
217+
decompress_body(encoding_header.as_deref(), body)
218218
.inspect_err(|err| {
219-
// Other status codes are already handled by `sources::util::decode` (tech debt).
219+
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
220220
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
221221
emit!(HttpBadRequest::new(
222222
err.status_code().as_u16(),
@@ -254,9 +254,9 @@ fn build_warp_metrics_filter(
254254
deserializer: Option<OtlpDeserializer>,
255255
) -> BoxedFilter<(Response,)> {
256256
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
257-
decode(encoding_header.as_deref(), body)
257+
decompress_body(encoding_header.as_deref(), body)
258258
.inspect_err(|err| {
259-
// Other status codes are already handled by `sources::util::decode` (tech debt).
259+
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
260260
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
261261
emit!(HttpBadRequest::new(
262262
err.status_code().as_u16(),
@@ -290,9 +290,9 @@ fn build_warp_trace_filter(
290290
deserializer: Option<OtlpDeserializer>,
291291
) -> BoxedFilter<(Response,)> {
292292
let make_events = move |encoding_header: Option<String>, _headers: HeaderMap, body: Bytes| {
293-
decode(encoding_header.as_deref(), body)
293+
decompress_body(encoding_header.as_deref(), body)
294294
.inspect_err(|err| {
295-
// Other status codes are already handled by `sources::util::decode` (tech debt).
295+
// Other status codes are already handled by `sources::util::decompress_body` (tech debt).
296296
if err.status_code() == StatusCode::UNSUPPORTED_MEDIA_TYPE {
297297
emit!(HttpBadRequest::new(
298298
err.status_code().as_u16(),

src/sources/prometheus/remote_write.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
serde::bool_or_struct,
2121
sources::{
2222
self,
23-
util::{HttpSource, decode, http::HttpMethod},
23+
util::{HttpSource, decompress_body, http::HttpMethod},
2424
},
2525
tls::TlsEnableableConfig,
2626
};
@@ -186,7 +186,7 @@ impl RemoteWriteSource {
186186
impl HttpSource for RemoteWriteSource {
187187
fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
188188
// Default to snappy decoding the request body.
189-
decode(encoding_header.or(Some("snappy")), body)
189+
decompress_body(encoding_header.or(Some("snappy")), body)
190190
}
191191

192192
fn build_events(

src/sources/util/http/encoding.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::{common::http::ErrorMessage, internal_events::HttpDecompressError};
1010
/// Decompresses the body based on the Content-Encoding header.
1111
///
1212
/// Supports gzip, deflate, snappy, zstd, and identity (no compression).
13-
pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
13+
pub fn decompress_body(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessage> {
1414
if let Some(encodings) = header {
1515
for encoding in encodings.rsplit(',').map(str::trim) {
1616
body = match encoding {
@@ -19,22 +19,22 @@ pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessa
1919
let mut decoded = Vec::new();
2020
MultiGzDecoder::new(body.reader())
2121
.read_to_end(&mut decoded)
22-
.map_err(|error| handle_decode_error(encoding, error))?;
22+
.map_err(|error| emit_decompress_error(encoding, error))?;
2323
decoded.into()
2424
}
2525
"deflate" => {
2626
let mut decoded = Vec::new();
2727
ZlibDecoder::new(body.reader())
2828
.read_to_end(&mut decoded)
29-
.map_err(|error| handle_decode_error(encoding, error))?;
29+
.map_err(|error| emit_decompress_error(encoding, error))?;
3030
decoded.into()
3131
}
3232
"snappy" => SnappyDecoder::new()
3333
.decompress_vec(&body)
34-
.map_err(|error| handle_decode_error(encoding, error))?
34+
.map_err(|error| emit_decompress_error(encoding, error))?
3535
.into(),
3636
"zstd" => zstd::decode_all(body.reader())
37-
.map_err(|error| handle_decode_error(encoding, error))?
37+
.map_err(|error| emit_decompress_error(encoding, error))?
3838
.into(),
3939
encoding => {
4040
return Err(ErrorMessage::new(
@@ -49,7 +49,7 @@ pub fn decode(header: Option<&str>, mut body: Bytes) -> Result<Bytes, ErrorMessa
4949
Ok(body)
5050
}
5151

52-
fn handle_decode_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
52+
pub fn emit_decompress_error(encoding: &str, error: impl std::error::Error) -> ErrorMessage {
5353
emit!(HttpDecompressError {
5454
encoding,
5555
error: &error

src/sources/util/http/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ mod prelude;
1717
mod query;
1818

1919
#[cfg(feature = "sources-utils-http-encoding")]
20-
pub use encoding::decode;
20+
pub use encoding::{decompress_body, emit_decompress_error};
2121
#[cfg(feature = "sources-utils-http-headers")]
2222
pub use headers::add_headers;
2323
pub use method::HttpMethod;

src/sources/util/http/prelude.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ use warp::{
2121
reject::Rejection,
2222
};
2323

24-
use super::encoding::decode;
24+
use super::encoding::decompress_body;
2525
use crate::{
2626
SourceSender,
2727
common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
@@ -57,7 +57,7 @@ pub trait HttpSource: Clone + Send + Sync + 'static {
5757
) -> Result<Vec<Event>, ErrorMessage>;
5858

5959
fn decode(&self, encoding_header: Option<&str>, body: Bytes) -> Result<Bytes, ErrorMessage> {
60-
decode(encoding_header, body)
60+
decompress_body(encoding_header, body)
6161
}
6262

6363
#[allow(clippy::too_many_arguments)]

src/sources/util/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ pub use self::http::add_query_parameters;
6868
feature = "sources-prometheus-remote-write",
6969
feature = "sources-utils-http-encoding"
7070
))]
71-
pub use self::http::decode;
71+
pub use self::http::decompress_body;
7272
#[cfg(any(
7373
feature = "sources-aws_sqs",
7474
feature = "sources-gcp_pubsub",

0 commit comments

Comments
 (0)