Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ check-cfg = [

[workspace.dependencies]
anyhow = "1.0.75"
backtrace = "0.3"
foundations = { version = "5.5.3", path = "./foundations" }
foundations-macros = { version = "=5.5.3", path = "./foundations-macros", default-features = false }
bindgen = { version = "0.72", default-features = false }
Expand Down
3 changes: 3 additions & 0 deletions foundations/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ metrics = [

# Enables memory profiling features (require `jemalloc` feature to be enabled)
memory-profiling = [
"dep:backtrace",
"dep:once_cell",
"dep:tikv-jemalloc-ctl",
"dep:tempfile",
Expand Down Expand Up @@ -194,6 +195,7 @@ workspace = true

[dependencies]
anyhow = { workspace = true, features = ["backtrace", "std"] }
backtrace = { workspace = true, optional = true }
foundations-macros = { workspace = true, optional = true, default-features = false }
cf-rustracing = { workspace = true, optional = true }
cf-rustracing-jaeger = { workspace = true, optional = true }
Expand Down Expand Up @@ -264,6 +266,7 @@ neli = { workspace = true, optional = true }
neli-proc-macros = { workspace = true, optional = true }

[dev-dependencies]
backtrace = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true, features = ["rc"] }
tempfile = { workspace = true }
Expand Down
3 changes: 3 additions & 0 deletions foundations/src/telemetry/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use tokio::sync::watch;

mod router;

#[cfg(feature = "memory-profiling")]
mod pprof_symbol;

use router::Router;

enum TelemetryStream {
Expand Down
48 changes: 48 additions & 0 deletions foundations/src/telemetry/server/pprof_symbol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use crate::Result;
use crate::telemetry::reexports::http_body_util::BodyExt;
use hyper::body::Incoming;
use hyper::{Method, Request};
use std::fmt::Write;

/// Resolves program counter addresses to symbol names.
///
/// This implements the pprof symbol resolution protocol used by `jeprof` and
/// other pprof-compatible tools. The input is read from the POST body (or GET
/// query string) as `+`-separated hex addresses (with optional `0x` prefix).
/// The output is a text response with a `num_symbols` header followed by one
/// line per resolved symbol in the format `0x<addr>\t<name>`.
pub(super) async fn pprof_symbol(req: Request<Incoming>) -> Result<String> {
let mut buf = String::new();

// Always emit num_symbols header. The value doesn't matter to pprof tools
// as long as it is > 0, which signals that symbol information is available.
// This is also what Go does: https://cs.opensource.google/go/go/+/refs/tags/go1.26.1:src/net/http/pprof/pprof.go;l=197
writeln!(buf, "num_symbols: 1")?;

let input = if req.method() == Method::POST {
let body = req.into_body().collect().await?.to_bytes();
String::from_utf8(body.to_vec())?
} else {
req.uri().query().unwrap_or_default().to_string()
};

for token in input.split('+') {
let hex_str = token
.trim()
.strip_prefix("0x")
.or_else(|| token.trim().strip_prefix("0X"))
.unwrap_or(token.trim());

let Ok(addr) = u64::from_str_radix(hex_str, 16) else {
continue;
};

backtrace::resolve(addr as usize as *mut std::ffi::c_void, |symbol| {
if let Some(name) = symbol.name() {
let _ = writeln!(buf, "{:#x}\t{}", addr, name);
}
});
}

Ok(buf)
}
17 changes: 17 additions & 0 deletions foundations/src/telemetry/server/router.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(all(target_os = "linux", feature = "memory-profiling"))]
use super::memory_profiling;
#[cfg(feature = "memory-profiling")]
use super::pprof_symbol;
#[cfg(feature = "metrics")]
use crate::telemetry::metrics;
use crate::telemetry::reexports::http_body_util::{BodyExt, Empty, Full, combinators::BoxBody};
Expand Down Expand Up @@ -109,6 +111,21 @@ impl RouteMap {
}),
});

#[cfg(feature = "memory-profiling")]
self.set(TelemetryServerRoute {
path: "/pprof/symbol".into(),
methods: vec![Method::GET, Method::POST],
handler: Box::new(|req, _| {
async move {
into_response(
"text/plain; charset=utf-8",
pprof_symbol::pprof_symbol(req).await,
)
}
.boxed()
}),
});

#[cfg(feature = "tracing")]
self.set(TelemetryServerRoute {
path: "/debug/traces".into(),
Expand Down
82 changes: 82 additions & 0 deletions foundations/tests/telemetry_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,33 @@ use foundations::telemetry::settings::MemoryProfilerSettings;
#[cfg(target_os = "linux")]
use foundations::telemetry::MemoryProfiler;

/// Captures its own instruction pointer via [`backtrace::trace`] and returns it
/// along with the symbol name that the `backtrace` crate resolves it to. This
/// gives us a (pc, expected_name) pair that the `/pprof/symbol` endpoint must
/// also be able to resolve, since it uses the same crate.
#[inline(never)]
fn capture_self_pc() -> (usize, String) {
let mut result = None;
backtrace::trace(|frame| {
if result.is_some() {
return false;
}
let ip = frame.ip() as usize;
backtrace::resolve(ip as *mut std::ffi::c_void, |symbol| {
if result.is_none()
&& let Some(name) = symbol.name()
{
let name = name.to_string();
if name.contains("capture_self_pc") {
result = Some((ip, name));
}
}
});
result.is_none()
});
result.expect("should find capture_self_pc frame")
}

#[tokio::test]
async fn telemetry_server() {
let server_addr = SocketAddr::from((Ipv4Addr::LOCALHOST, 1337));
Expand Down Expand Up @@ -121,6 +148,61 @@ async fn telemetry_server() {
.contains("Allocated")
);

// Capture a real PC and the symbol the backtrace crate resolves it to, so
// we can verify both GET and POST resolve to the same name.
let (known_pc, expected_symbol) = capture_self_pc();
let addr_hex = format!("{:#x}", known_pc);

// Test symbol resolution via GET (addresses in query string), GET without
// addresses (just checks availability), and POST (addresses in body).
let symbol_requests: Vec<(&str, String)> = vec![
(
"GET",
format!("http://{server_addr}/pprof/symbol?{addr_hex}"),
),
(
"GET (no addr)",
format!("http://{server_addr}/pprof/symbol"),
),
("POST", format!("http://{server_addr}/pprof/symbol")),
];

for (method, url) in &symbol_requests {
let res = if *method == "POST" {
reqwest::Client::new()
.post(url)
.body(addr_hex.clone())
.send()
.await
} else {
reqwest::get(url).await
}
.unwrap()
.text()
.await
.unwrap();

eprintln!("pprof/symbol {method}: url={url}, expecting='{expected_symbol}'");
eprintln!("pprof/symbol {method}: response:\n{res}");

assert!(
res.contains("num_symbols: 1"),
"pprof/symbol {method} should contain 'num_symbols: 1', got: {res}"
);

if !method.contains("no addr") {
assert!(
res.contains(&expected_symbol),
"pprof/symbol {method} should resolve to '{expected_symbol}', got: {res}"
);
let known_pc_str = format!("{:#x}", known_pc);
assert!(
res.contains(&known_pc_str),
"pprof/symbol {method} should resolve to '{known_pc_str}', got: {res}"
);
}
}

let telemetry_ctx = TelemetryContext::current();
let _scope = telemetry_ctx.scope();

Expand Down
Loading