Skip to content

Commit 84def62

Browse files
committed
feat(prometheus_remote_write sink): add custom HTTP headers support
Add support for custom HTTP headers in the prometheus_remote_write sink via the `request.headers` configuration option. This enables users to add custom headers to outgoing requests for authentication, routing, or other integration requirements with Prometheus-compatible backends.
1 parent 2ed1eb4 commit 84def62

File tree

6 files changed

+137
-18
lines changed

6 files changed

+137
-18
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
The `prometheus_remote_write` sink now supports custom HTTP headers via the `request.headers` configuration option. This allows users to add custom headers to outgoing requests, which is useful for authentication, routing, or other integration requirements with Prometheus-compatible backends.
2+
3+
authors: elohmeier

src/sinks/prometheus/remote_write/config.rs

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use http::Uri;
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use http::{HeaderValue, Uri};
24
use snafu::prelude::*;
35

46
#[cfg(feature = "aws-core")]
@@ -13,7 +15,11 @@ use crate::{
1315
UriParseSnafu,
1416
prelude::*,
1517
prometheus::PrometheusRemoteWriteAuth,
16-
util::{auth::Auth, http::http_response_retry_logic},
18+
util::{
19+
auth::Auth,
20+
http::{OrderedHeaderName, http_response_retry_logic},
21+
service::TowerRequestConfig,
22+
},
1723
},
1824
};
1925

@@ -79,7 +85,7 @@ pub struct RemoteWriteConfig {
7985

8086
#[configurable(derived)]
8187
#[serde(default)]
82-
pub request: TowerRequestConfig,
88+
pub request: RemoteWriteRequestConfig,
8389

8490
/// The tenant ID to send.
8591
///
@@ -131,6 +137,41 @@ const fn default_compression() -> Compression {
131137

132138
impl_generate_config_from_default!(RemoteWriteConfig);
133139

140+
/// Outbound HTTP request settings for the Prometheus remote write sink.
141+
#[configurable_component]
142+
#[derive(Clone, Debug)]
143+
#[serde(default)]
144+
pub struct RemoteWriteRequestConfig {
145+
#[serde(flatten)]
146+
pub tower: TowerRequestConfig,
147+
148+
/// Additional HTTP headers to add to every HTTP request.
149+
///
150+
/// Values are applied verbatim; template expansion is not supported.
151+
#[serde(default)]
152+
#[configurable(metadata(
153+
docs::additional_props_description = "An HTTP request header and its static value."
154+
))]
155+
#[configurable(metadata(docs::examples = "remote_write_headers_examples()"))]
156+
pub headers: BTreeMap<String, String>,
157+
}
158+
159+
impl Default for RemoteWriteRequestConfig {
160+
fn default() -> Self {
161+
Self {
162+
tower: TowerRequestConfig::default(),
163+
headers: BTreeMap::new(),
164+
}
165+
}
166+
}
167+
168+
fn remote_write_headers_examples() -> BTreeMap<String, String> {
169+
BTreeMap::from([
170+
("Accept".to_string(), "text/plain".to_string()),
171+
("X-My-Custom-Header".to_string(), "A-Value".to_string()),
172+
])
173+
}
174+
134175
#[async_trait::async_trait]
135176
#[typetag::serde(name = "prometheus_remote_write")]
136177
impl SinkConfig for RemoteWriteConfig {
@@ -141,7 +182,10 @@ impl SinkConfig for RemoteWriteConfig {
141182
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
142183
let endpoint = self.endpoint.parse::<Uri>().context(UriParseSnafu)?;
143184
let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
144-
let request_settings = self.request.into_settings();
185+
let request_settings = self.request.tower.into_settings();
186+
let validated_headers = Arc::new(crate::sinks::util::http::validate_headers(
187+
&self.request.headers,
188+
)?);
145189
let buckets = self.buckets.clone();
146190
let quantiles = self.quantiles.clone();
147191
let default_namespace = self.default_namespace.clone();
@@ -183,6 +227,7 @@ impl SinkConfig for RemoteWriteConfig {
183227
endpoint.clone(),
184228
self.compression,
185229
auth.clone(),
230+
Arc::clone(&validated_headers),
186231
)
187232
.boxed();
188233

@@ -191,6 +236,7 @@ impl SinkConfig for RemoteWriteConfig {
191236
client,
192237
auth,
193238
compression: self.compression,
239+
headers: validated_headers,
194240
};
195241
let service = ServiceBuilder::new()
196242
.settings(request_settings, http_response_retry_logic())
@@ -225,10 +271,19 @@ async fn healthcheck(
225271
endpoint: Uri,
226272
compression: Compression,
227273
auth: Option<Auth>,
274+
headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
228275
) -> crate::Result<()> {
229276
let body = bytes::Bytes::new();
230-
let request =
231-
build_request(http::Method::GET, &endpoint, compression, body, None, auth).await?;
277+
let request = build_request(
278+
http::Method::GET,
279+
&endpoint,
280+
compression,
281+
body,
282+
None,
283+
auth,
284+
headers,
285+
)
286+
.await?;
232287
let response = client.send(request).await?;
233288

234289
match response.status() {

src/sinks/prometheus/remote_write/service.rs

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,26 @@
1-
use std::task::{Context, Poll};
1+
use std::{
2+
collections::BTreeMap,
3+
sync::Arc,
4+
task::{Context, Poll},
5+
};
26

37
#[cfg(feature = "aws-core")]
48
use aws_credential_types::provider::SharedCredentialsProvider;
59
#[cfg(feature = "aws-core")]
610
use aws_types::region::Region;
711
use bytes::Bytes;
8-
use http::Uri;
12+
use http::{HeaderValue, Uri};
913

1014
use super::request_builder::RemoteWriteRequest;
1115
use crate::{
1216
http::HttpClient,
1317
internal_events::EndpointBytesSent,
1418
sinks::{
1519
prelude::*,
16-
util::{auth::Auth, http::HttpResponse},
20+
util::{
21+
auth::Auth,
22+
http::{HttpResponse, OrderedHeaderName},
23+
},
1724
},
1825
};
1926

@@ -34,6 +41,7 @@ pub(super) struct RemoteWriteService {
3441
pub(super) auth: Option<Auth>,
3542
pub(super) client: HttpClient,
3643
pub(super) compression: super::Compression,
44+
pub(super) headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
3745
}
3846

3947
impl Service<RemoteWriteRequest> for RemoteWriteService {
@@ -51,6 +59,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
5159
let endpoint = self.endpoint.clone();
5260
let auth = self.auth.clone();
5361
let compression = self.compression;
62+
let headers = Arc::clone(&self.headers);
5463

5564
Box::pin(async move {
5665
let metadata = std::mem::take(request.metadata_mut());
@@ -64,6 +73,7 @@ impl Service<RemoteWriteRequest> for RemoteWriteService {
6473
request.request,
6574
request.tenant_id.as_ref(),
6675
auth,
76+
headers,
6777
)
6878
.await?;
6979

@@ -106,6 +116,7 @@ pub(super) async fn build_request(
106116
body: Bytes,
107117
tenant_id: Option<&String>,
108118
auth: Option<Auth>,
119+
custom_headers: Arc<BTreeMap<OrderedHeaderName, HeaderValue>>,
109120
) -> crate::Result<http::Request<hyper::Body>> {
110121
let mut builder = http::Request::builder()
111122
.method(method)
@@ -121,6 +132,11 @@ pub(super) async fn build_request(
121132
builder = builder.header(headers::X_SCOPE_ORGID, tenant_id);
122133
}
123134

135+
// Apply custom headers
136+
for (name, value) in custom_headers.iter() {
137+
builder = builder.header(name.inner().clone(), value.clone());
138+
}
139+
124140
let mut request = builder.body(body)?;
125141

126142
if let Some(auth) = auth {

src/sinks/prometheus/remote_write/tests.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,38 @@ async fn sends_templated_x_scope_orgid_header() {
135135
assert_eq!(orgid.len(), 11);
136136
}
137137

138+
#[tokio::test]
139+
async fn sends_custom_headers() {
140+
let outputs = send_request(
141+
indoc! {r#"
142+
[request.headers]
143+
X-Custom-Header = "custom-value"
144+
X-Another-Header = "another-value"
145+
"#},
146+
vec![create_event("gauge-4".into(), 42.0)],
147+
)
148+
.await;
149+
150+
assert_eq!(outputs.len(), 1);
151+
let (headers, req) = &outputs[0];
152+
153+
// Verify custom headers are present
154+
assert_eq!(headers["x-custom-header"], "custom-value");
155+
assert_eq!(headers["x-another-header"], "another-value");
156+
157+
// Verify standard headers are still present
158+
assert_eq!(headers["x-prometheus-remote-write-version"], "0.1.0");
159+
assert_eq!(headers["content-type"], "application/x-protobuf");
160+
161+
// Verify the metric data is correct
162+
assert_eq!(req.timeseries.len(), 1);
163+
assert_eq!(
164+
req.timeseries[0].labels,
165+
labels!("__name__" => "gauge-4", "production" => "true", "region" => "us-west-1")
166+
);
167+
assert_eq!(req.timeseries[0].samples[0].value, 42.0);
168+
}
169+
138170
#[tokio::test]
139171
async fn retains_state_between_requests() {
140172
// This sink converts all incremental events to absolute, and

website/cue/reference/components/sinks/generated/prometheus_remote_write.cue

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -341,14 +341,8 @@ generated: components: sinks: prometheus_remote_write: configuration: {
341341
}
342342
}
343343
request: {
344-
description: """
345-
Middleware settings for outbound requests.
346-
347-
Various settings can be configured, such as concurrency and rate limits, timeouts, and retry behavior.
348-
349-
Note that the retry backoff policy follows the Fibonacci sequence.
350-
"""
351-
required: false
344+
description: "Outbound HTTP request settings for the Prometheus remote write sink."
345+
required: false
352346
type: object: options: {
353347
adaptive_concurrency: {
354348
description: """
@@ -447,6 +441,25 @@ generated: components: sinks: prometheus_remote_write: configuration: {
447441
uint: {}
448442
}
449443
}
444+
headers: {
445+
description: """
446+
Additional HTTP headers to add to every HTTP request.
447+
448+
Values are applied verbatim; template expansion is not supported.
449+
"""
450+
required: false
451+
type: object: {
452+
examples: [{
453+
Accept: "text/plain"
454+
"X-My-Custom-Header": "A-Value"
455+
}]
456+
options: "*": {
457+
description: "An HTTP request header and its static value."
458+
required: true
459+
type: string: {}
460+
}
461+
}
462+
}
450463
rate_limit_duration_secs: {
451464
description: "The time window used for the `rate_limit_num` option."
452465
required: false

website/cue/reference/components/sinks/prometheus_remote_write.cue

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ components: sinks: prometheus_remote_write: {
3434
retry_initial_backoff_secs: 1
3535
retry_max_duration_secs: 10
3636
timeout_secs: 60
37-
headers: false
37+
headers: true
3838
}
3939
tls: {
4040
enabled: true

0 commit comments

Comments
 (0)