Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions bottlecap/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions bottlecap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ sha2 = { version = "0.10", default-features = false }
hex = { version = "0.4", default-features = false, features = ["std"] }
base64 = { version = "0.22", default-features = false }
rustls = { version = "0.23.18", default-features = false, features = ["aws-lc-rs"] }
rustls-pemfile = { version = "2.0", default-features = false, features = ["std"] }
rustls-pki-types = { version = "1.0", default-features = false }
hyper-rustls = { version = "0.27.7", default-features = false }
rand = { version = "0.8", default-features = false }
prost = { version = "0.13", default-features = false }
zstd = { version = "0.13.3", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions bottlecap/LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ rustc-hash,https://github.com/rust-lang/rustc-hash,Apache-2.0 OR MIT,The Rust Pr
rustix,https://github.com/bytecodealliance/rustix,Apache-2.0 WITH LLVM-exception OR Apache-2.0 OR MIT,"Dan Gohman <[email protected]>, Jakub Konka <[email protected]>"
rustls,https://github.com/rustls/rustls,Apache-2.0 OR ISC OR MIT,The rustls Authors
rustls-native-certs,https://github.com/rustls/rustls-native-certs,Apache-2.0 OR ISC OR MIT,The rustls-native-certs Authors
rustls-pemfile,https://github.com/rustls/pemfile,Apache-2.0 OR ISC OR MIT,The rustls-pemfile Authors
rustls-pki-types,https://github.com/rustls/pki-types,MIT OR Apache-2.0,The rustls-pki-types Authors
rustls-webpki,https://github.com/rustls/webpki,ISC,The rustls-webpki Authors
ryu,https://github.com/dtolnay/ryu,Apache-2.0 OR BSL-1.0,David Tolnay <[email protected]>
Expand Down
8 changes: 8 additions & 0 deletions bottlecap/src/config/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ pub struct EnvConfig {
/// The transport type to use for sending logs. Possible values are "auto" or "http1".
#[serde(deserialize_with = "deserialize_optional_string")]
pub http_protocol: Option<String>,
/// @env `DD_TLS_CERT_FILE`
/// The path to a file of concatenated CA certificates in PEM format.
/// Example: `/opt/ca-cert.pem`
#[serde(deserialize_with = "deserialize_optional_string")]
pub tls_cert_file: Option<String>,

// Metrics
/// @env `DD_DD_URL`
Expand Down Expand Up @@ -466,6 +471,7 @@ fn merge_config(config: &mut Config, env_config: &EnvConfig) {
merge_option!(config, env_config, proxy_https);
merge_vec!(config, env_config, proxy_no_proxy);
merge_option!(config, env_config, http_protocol);
merge_option!(config, env_config, tls_cert_file);

// Endpoints
merge_string!(config, env_config, dd_url);
Expand Down Expand Up @@ -695,6 +701,7 @@ mod tests {
jail.set_env("DD_PROXY_HTTPS", "https://proxy.example.com");
jail.set_env("DD_PROXY_NO_PROXY", "localhost,127.0.0.1");
jail.set_env("DD_HTTP_PROTOCOL", "http1");
jail.set_env("DD_TLS_CERT_FILE", "/opt/ca-cert.pem");

// Metrics
jail.set_env("DD_DD_URL", "https://metrics.datadoghq.com");
Expand Down Expand Up @@ -850,6 +857,7 @@ mod tests {
proxy_https: Some("https://proxy.example.com".to_string()),
proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()],
http_protocol: Some("http1".to_string()),
tls_cert_file: Some("/opt/ca-cert.pem".to_string()),
dd_url: "https://metrics.datadoghq.com".to_string(),
url: "https://app.datadoghq.com".to_string(),
additional_endpoints: HashMap::from([
Expand Down
2 changes: 2 additions & 0 deletions bottlecap/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ pub struct Config {
pub proxy_https: Option<String>,
pub proxy_no_proxy: Vec<String>,
pub http_protocol: Option<String>,
pub tls_cert_file: Option<String>,

// Endpoints
pub dd_url: String,
Expand Down Expand Up @@ -366,6 +367,7 @@ impl Default for Config {
proxy_https: None,
proxy_no_proxy: vec![],
http_protocol: None,
tls_cert_file: None,

// Endpoints
dd_url: String::default(),
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/config/yaml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ pub struct YamlConfig {
pub dd_url: Option<String>,
#[serde(deserialize_with = "deserialize_optional_string")]
pub http_protocol: Option<String>,
#[serde(deserialize_with = "deserialize_optional_string")]
pub tls_cert_file: Option<String>,

// Endpoints
#[serde(deserialize_with = "deserialize_additional_endpoints")]
Expand Down Expand Up @@ -417,6 +419,7 @@ fn merge_config(config: &mut Config, yaml_config: &YamlConfig) {
merge_option!(config, proxy_https, yaml_config.proxy, https);
merge_option_to_value!(config, proxy_no_proxy, yaml_config.proxy, no_proxy);
merge_option!(config, yaml_config, http_protocol);
merge_option!(config, yaml_config, tls_cert_file);

// Endpoints
merge_hashmap!(config, yaml_config, additional_endpoints);
Expand Down Expand Up @@ -747,6 +750,7 @@ proxy:
no_proxy: ["localhost", "127.0.0.1"]
dd_url: "https://metrics.datadoghq.com"
http_protocol: "http1"
tls_cert_file: "/opt/ca-cert.pem"

# Endpoints
additional_endpoints:
Expand Down Expand Up @@ -882,6 +886,7 @@ api_security_sample_delay: 60 # Seconds
proxy_https: Some("https://proxy.example.com".to_string()),
proxy_no_proxy: vec!["localhost".to_string(), "127.0.0.1".to_string()],
http_protocol: Some("http1".to_string()),
tls_cert_file: Some("/opt/ca-cert.pem".to_string()),
dd_url: "https://metrics.datadoghq.com".to_string(),
url: String::new(), // doesnt exist in yaml
additional_endpoints: HashMap::from([
Expand Down
7 changes: 4 additions & 3 deletions bottlecap/src/traces/stats_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,10 @@ impl StatsFlusher for ServerlessStatsFlusher {

let start = std::time::Instant::now();

let Ok(http_client) =
ServerlessTraceFlusher::get_http_client(self.config.proxy_https.as_ref())
else {
let Ok(http_client) = ServerlessTraceFlusher::get_http_client(
self.config.proxy_https.as_ref(),
self.config.tls_cert_file.as_ref(),
) else {
error!("STATS_FLUSHER | Failed to create HTTP client");
return;
};
Expand Down
98 changes: 86 additions & 12 deletions bottlecap/src/traces/trace_flusher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,21 @@
use async_trait::async_trait;
use dogstatsd::api_key::ApiKeyFactory;
use hyper_http_proxy;
use hyper_rustls::HttpsConnectorBuilder;
use libdd_common::{Endpoint, GenericHttpClient, hyper_migration};
use libdd_trace_utils::{
config_utils::trace_intake_url_prefixed,
send_data::SendDataBuilder,
trace_utils::{self, SendData},
};
use rustls::RootCertStore;
use rustls_pki_types::CertificateDer;
use std::error::Error;
use std::fs::File;
use std::io::BufReader;
use std::str::FromStr;
use std::sync::Arc;
use std::sync::LazyLock;
use tokio::task::JoinSet;
use tracing::{debug, error};

Expand All @@ -35,6 +41,7 @@ pub trait TraceFlusher {
traces: Vec<SendData>,
endpoint: Option<&Endpoint>,
proxy_https: &Option<String>,
tls_cert_file: &Option<String>,
) -> Option<Vec<SendData>>;

/// Flushes traces by getting every available batch on the aggregator.
Expand Down Expand Up @@ -104,7 +111,13 @@ impl TraceFlusher for ServerlessTraceFlusher {
"TRACES | Retrying to send {} previously failed batches",
traces.len()
);
let retry_result = Self::send(traces, None, &self.config.proxy_https).await;
let retry_result = Self::send(
traces,
None,
&self.config.proxy_https,
&self.config.tls_cert_file,
)
.await;
if retry_result.is_some() {
// Still failed, return to retry later
return retry_result;
Expand All @@ -131,13 +144,17 @@ impl TraceFlusher for ServerlessTraceFlusher {

let traces_clone = traces.clone();
let proxy_https = self.config.proxy_https.clone();
batch_tasks.spawn(async move { Self::send(traces_clone, None, &proxy_https).await });
let tls_cert_file = self.config.tls_cert_file.clone();
batch_tasks.spawn(async move {
Self::send(traces_clone, None, &proxy_https, &tls_cert_file).await
});

for endpoint in self.additional_endpoints.clone() {
let traces_clone = traces.clone();
let proxy_https = self.config.proxy_https.clone();
let tls_cert_file = self.config.tls_cert_file.clone();
batch_tasks.spawn(async move {
Self::send(traces_clone, Some(&endpoint), &proxy_https).await
Self::send(traces_clone, Some(&endpoint), &proxy_https, &tls_cert_file).await
});
}
}
Expand All @@ -158,6 +175,7 @@ impl TraceFlusher for ServerlessTraceFlusher {
traces: Vec<SendData>,
endpoint: Option<&Endpoint>,
proxy_https: &Option<String>,
tls_cert_file: &Option<String>,
) -> Option<Vec<SendData>> {
if traces.is_empty() {
return None;
Expand All @@ -167,7 +185,9 @@ impl TraceFlusher for ServerlessTraceFlusher {
tokio::task::yield_now().await;
debug!("TRACES | Flushing {} traces", coalesced_traces.len());

let Ok(http_client) = ServerlessTraceFlusher::get_http_client(proxy_https.as_ref()) else {
let Ok(http_client) =
ServerlessTraceFlusher::get_http_client(proxy_https.as_ref(), tls_cert_file.as_ref())
else {
error!("TRACES | Failed to create HTTP client");
return None;
};
Expand All @@ -192,25 +212,79 @@ impl TraceFlusher for ServerlessTraceFlusher {
}
}

// Initialize the crypto provider needed for setting custom root certificates
fn ensure_crypto_provider_initialized() {
static INIT_CRYPTO_PROVIDER: LazyLock<()> = LazyLock::new(|| {
#[cfg(unix)]
rustls::crypto::aws_lc_rs::default_provider()
.install_default()
.expect("Failed to install default CryptoProvider");
});

let () = &*INIT_CRYPTO_PROVIDER;
}

impl ServerlessTraceFlusher {
pub fn get_http_client(
proxy_https: Option<&String>,
tls_cert_file: Option<&String>,
) -> Result<
GenericHttpClient<hyper_http_proxy::ProxyConnector<libdd_common::connector::Connector>>,
Box<dyn Error>,
> {
// Create the base connector with optional custom TLS config
let connector = if let Some(ca_cert_path) = tls_cert_file {
// Ensure crypto provider is initialized before creating TLS config
ensure_crypto_provider_initialized();

// Load the custom certificate
let cert_file = File::open(ca_cert_path)?;
let mut reader = BufReader::new(cert_file);
let certs: Vec<CertificateDer> =
rustls_pemfile::certs(&mut reader).collect::<Result<Vec<_>, _>>()?;

// Create a root certificate store and add custom certs
let mut root_store = RootCertStore::empty();
for cert in certs {
root_store.add(cert)?;
}

// Build the TLS config with custom root certificates
let tls_config = rustls::ClientConfig::builder()
.with_root_certificates(root_store)
.with_no_client_auth();

// Build the HTTPS connector with custom config
let https_connector = HttpsConnectorBuilder::new()
.with_tls_config(tls_config)
.https_or_http()
.enable_http1()
.build();

debug!(
"TRACES | GET_HTTP_CLIENT | Added root certificate from {}",
ca_cert_path
);

// Construct the Connector::Https variant directly
libdd_common::connector::Connector::Https(https_connector)
} else {
// Use default connector
libdd_common::connector::Connector::default()
};

if let Some(proxy) = proxy_https {
let proxy =
hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?);
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(
libdd_common::connector::Connector::default(),
proxy,
)?;
Ok(hyper_migration::client_builder().build(proxy_connector))
let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?;
let client = hyper_migration::client_builder().build(proxy_connector);
debug!(
"TRACES | GET_HTTP_CLIENT | Proxy connector created with proxy: {:?}",
proxy_https
);
Ok(client)
} else {
let proxy_connector = hyper_http_proxy::ProxyConnector::new(
libdd_common::connector::Connector::default(),
)?;
let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?;
Ok(hyper_migration::client_builder().build(proxy_connector))
}
}
Expand Down
Loading