Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
The `prometheus_remote_write` sink now supports custom HTTP headers via the `request.headers` configuration option. This allows users to add custom headers to outgoing requests, which is useful for authentication, routing, or other integration requirements with Prometheus-compatible backends.

authors: elohmeier
67 changes: 61 additions & 6 deletions src/sinks/prometheus/remote_write/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use http::Uri;
use std::{collections::BTreeMap, sync::Arc};

use http::{HeaderValue, Uri};
use snafu::prelude::*;

#[cfg(feature = "aws-core")]
Expand All @@ -13,7 +15,11 @@ use crate::{
UriParseSnafu,
prelude::*,
prometheus::PrometheusRemoteWriteAuth,
util::{auth::Auth, http::http_response_retry_logic},
util::{
auth::Auth,
http::{OrderedHeaderName, http_response_retry_logic},
service::TowerRequestConfig,
},
},
};

Expand Down Expand Up @@ -79,7 +85,7 @@ pub struct RemoteWriteConfig {

#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
pub request: RemoteWriteRequestConfig,

/// The tenant ID to send.
///
Expand Down Expand Up @@ -131,6 +137,41 @@ const fn default_compression() -> Compression {

impl_generate_config_from_default!(RemoteWriteConfig);

/// Outbound HTTP request settings for the Prometheus remote write sink.
#[configurable_component]
#[derive(Clone, Debug)]
#[serde(default)]
pub struct RemoteWriteRequestConfig {
#[serde(flatten)]
pub tower: TowerRequestConfig,

/// Additional HTTP headers to add to every HTTP request.
///
/// Values are applied verbatim; template expansion is not supported.
#[serde(default)]
#[configurable(metadata(
docs::additional_props_description = "An HTTP request header and its static value."
))]
#[configurable(metadata(docs::examples = "remote_write_headers_examples()"))]
pub headers: BTreeMap<String, String>,
}

impl Default for RemoteWriteRequestConfig {
fn default() -> Self {
Self {
tower: TowerRequestConfig::default(),
headers: BTreeMap::new(),
}
}
}

fn remote_write_headers_examples() -> BTreeMap<String, String> {
BTreeMap::from([
("Accept".to_string(), "text/plain".to_string()),
("X-My-Custom-Header".to_string(), "A-Value".to_string()),
])
}

#[async_trait::async_trait]
#[typetag::serde(name = "prometheus_remote_write")]
impl SinkConfig for RemoteWriteConfig {
Expand All @@ -141,7 +182,10 @@ impl SinkConfig for RemoteWriteConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
let request_settings = self.request.into_settings();
let request_settings = self.request.tower.into_settings();
let validated_headers = Arc::new(crate::sinks::util::http::validate_headers(
&self.request.headers,
)?);
let buckets = self.buckets.clone();
let quantiles = self.quantiles.clone();
let default_namespace = self.default_namespace.clone();
Expand Down Expand Up @@ -183,6 +227,7 @@ impl SinkConfig for RemoteWriteConfig {
endpoint.clone(),
self.compression,
auth.clone(),
Arc::clone(&validated_headers),
)
.boxed();

Expand All @@ -191,6 +236,7 @@ impl SinkConfig for RemoteWriteConfig {
client,
auth,
compression: self.compression,
headers: validated_headers,
};
let service = ServiceBuilder::new()
.settings(request_settings, http_response_retry_logic())
Expand Down Expand Up @@ -225,10 +271,19 @@ async fn healthcheck(
endpoint: Uri,
compression: Compression,
auth: Option<Auth>,
headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
) -> crate::Result<()> {
let body = bytes::Bytes::new();
let request =
build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
let request = build_request(
http::Method::GET,
&endpoint,
compression,
body,
None,
auth,
headers,
)
.await?;
let response = client.send(request).await?;

match response.status() {
Expand Down
22 changes: 19 additions & 3 deletions src/sinks/prometheus/remote_write/service.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,26 @@
use std::task::{Context, Poll};
use std::{
collections::BTreeMap,
sync::Arc,
task::{Context, Poll},
};

#[cfg(feature = "aws-core")]
use aws_credential_types::provider::SharedCredentialsProvider;
#[cfg(feature = "aws-core")]
use aws_types::region::Region;
use bytes::Bytes;
use http::Uri;
use http::{HeaderValue, Uri};

use super::request_builder::RemoteWriteRequest;
use crate::{
http::HttpClient,
internal_events::EndpointBytesSent,
sinks::{
prelude::*,
util::{auth::Auth, http::HttpResponse},
util::{
auth::Auth,
http::{HttpResponse, OrderedHeaderName},
},
},
};

Expand All @@ -34,6 +41,7 @@ pub(super) struct RemoteWriteService {
pub(super) auth: Option<Auth>,
pub(super) client: HttpClient,
pub(super) compression: super::Compression,
pub(super) headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
}

impl Service<RemoteWriteRequest> for RemoteWriteService {
Expand All @@ -51,6 +59,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
let endpoint = self.endpoint.clone();
let auth = self.auth.clone();
let compression = self.compression;
let headers = Arc::clone(&self.headers);

Box::pin(async move {
let metadata = std::mem::take(request.metadata_mut());
Expand All @@ -64,6 +73,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
request.request,
request.tenant_id.as_ref(),
auth,
headers,
)
.await?;

Expand Down Expand Up @@ -106,6 +116,7 @@ pub(super) async fn build_request(
body: Bytes,
tenant_id: Option<&String>,
auth: Option<Auth>,
custom_headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
) -> crate::Result<http::Request<hyper::Body>> {
let mut builder = http::Request::builder()
.method(method)
Expand All @@ -121,6 +132,11 @@ pub(super) async fn build_request(
builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
}

// Apply custom headers
for (name, value) in custom_headers.iter() {
builder = builder.header(name.inner().clone(), value.clone());
}

let mut request = builder.body(body)?;

if let Some(auth) = auth {
Expand Down
32 changes: 32 additions & 0 deletions src/sinks/prometheus/remote_write/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,38 @@ async fn sends_templated_x_scope_orgid_header() {
assert_eq!(orgid.len(), 11);
}

#[tokio::test]
async fn sends_custom_headers() {
let outputs = send_request(
indoc! {r#"
[request.headers]
X-Custom-Header = "custom-value"
X-Another-Header = "another-value"
"#},
vec![create_event("gauge-4".into(), 42.0)],
)
.await;

assert_eq!(outputs.len(), 1);
let (headers, req) = &outputs[0];

// Verify custom headers are present
assert_eq!(headers["x-custom-header"], "custom-value");
assert_eq!(headers["x-another-header"], "another-value");

// Verify standard headers are still present
assert_eq!(headers["x-prometheus-remote-write-version"], "0.1.0");
assert_eq!(headers["content-type"], "application/x-protobuf");

// Verify the metric data is correct
assert_eq!(req.timeseries.len(), 1);
assert_eq!(
req.timeseries[0].labels,
labels!("__name__" => "gauge-4", "production" => "true", "region" => "us-west-1")
);
assert_eq!(req.timeseries[0].samples[0].value, 42.0);
}

#[tokio::test]
async fn retains_state_between_requests() {
// This sink converts all incremental events to absolute, and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,14 +341,8 @@ generated: components: sinks: prometheus_remote_write: configuration: {
}
}
request: {
description: """
Middleware settings for outbound requests.

Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior.

Note that the retry backoff policy follows the Fibonacci sequence.
"""
required: false
description: "Outbound HTTP request settings for the Prometheus remote write sink."
required: false
type: object: options: {
adaptive_concurrency: {
description: """
Expand Down Expand Up @@ -447,6 +441,25 @@ generated: components: sinks: prometheus_remote_write: configuration: {
uint: {}
}
}
headers: {
description: """
Additional HTTP headers to add to every HTTP request.

Values are applied verbatim; template expansion is not supported.
"""
required: false
type: object: {
examples: [{
Accept: "text/plain"
"X-My-Custom-Header": "A-Value"
}]
options: "*": {
description: "An HTTP request header and its static value."
required: true
type: string: {}
}
}
}
rate_limit_duration_secs: {
description: "The time window used for the `rate_limit_num` option."
required: false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ components: sinks: prometheus_remote_write: {
retry_initial_backoff_secs: 1
retry_max_duration_secs: 10
timeout_secs: 60
headers: false
headers: true
}
tls: {
enabled: true
Expand Down
Loading