Skip to content

Commit 66b21b8

Browse files
support for grafana cloud which is snappy default and no suffix in url
fixed generics more clarity on errors error description return in push
1 parent 36acf47 commit 66b21b8

File tree

3 files changed

+59
-2
lines changed

3 files changed

+59
-2
lines changed

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ default = ["protobuf"]
2222
gen = ["protobuf-codegen-pure"]
2323
nightly = ["libc"]
2424
process = ["libc", "procfs"]
25-
push = ["reqwest", "libc", "protobuf"]
25+
push = ["reqwest", "libc", "protobuf", "snap"]
2626

2727
[dependencies]
2828
cfg-if = "^1.0"
@@ -31,6 +31,7 @@ lazy_static = "^1.4"
3131
libc = { version = "^0.2", optional = true }
3232
parking_lot = "^0.12"
3333
protobuf = { version = "^2.0", optional = true }
34+
snap = { version = "^1.1", optional = true }
3435
memchr = "^2.3"
3536
reqwest = { version = "^0.11", features = ["blocking"], optional = true }
3637
thiserror = "^1.0"

src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ pub use self::pulling_gauge::PullingGauge;
224224
#[cfg(feature = "push")]
225225
pub use self::push::{
226226
hostname_grouping_key, push_add_collector, push_add_metrics, push_collector, push_metrics,
227-
BasicAuthentication,
227+
push_raw, BasicAuthentication,
228228
};
229229
pub use self::registry::Registry;
230230
pub use self::registry::{default_registry, gather, register, unregister};

src/push.rs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,62 @@ fn push<S: BuildHasher>(
173173
}
174174
}
175175

176+
/// Pushes snappy compressed data to url as is.
177+
pub fn push_raw(
178+
url: &str,
179+
mfs: Vec<proto::MetricFamily>,
180+
method: &str,
181+
basic_auth: Option<BasicAuthentication>,
182+
) -> Result<()> {
183+
let mut push_url = if url.contains("://") {
184+
url.to_owned()
185+
} else {
186+
format!("http://{}", url)
187+
};
188+
189+
if push_url.ends_with('/') {
190+
push_url.pop();
191+
}
192+
193+
let encoder = ProtobufEncoder::new();
194+
let mut proto_buf = Vec::new();
195+
196+
for mf in mfs {
197+
// Ignore error, `no metrics` and `no name`.
198+
let _ = encoder.encode(&[mf], &mut proto_buf);
199+
}
200+
201+
let buf = snap::raw::Encoder::new()
202+
.compress_vec(&proto_buf)
203+
.expect("msg");
204+
205+
let mut builder = HTTP_CLIENT
206+
.request(
207+
Method::from_str(method).unwrap(),
208+
Url::from_str(&push_url).unwrap(),
209+
)
210+
.header(CONTENT_TYPE, encoder.format_type())
211+
.header("Content-Encoding", "snappy")
212+
.body(buf);
213+
214+
if let Some(BasicAuthentication { username, password }) = basic_auth {
215+
builder = builder.basic_auth(username, Some(password));
216+
}
217+
218+
let response = builder.send().map_err(|e| Error::Msg(format!("{}", e)))?;
219+
220+
match response.status() {
221+
StatusCode::ACCEPTED => Ok(()),
222+
StatusCode::OK => Ok(()),
223+
_ => Err(Error::Msg(format!(
224+
"unexpected status code {} while pushing to {} {}",
225+
response.status(),
226+
push_url,
227+
response.text().map(|text| format!("with body `{}`", text)).unwrap_or_default()
228+
))),
229+
}
230+
}
231+
176232
fn push_from_collector<S: BuildHasher>(
177233
job: &str,
178234
grouping: HashMap<String, String, S>,

0 commit comments

Comments
 (0)