Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Telemetry #63

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
782 changes: 638 additions & 144 deletions Cargo.lock

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions ldk-server-protos/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,3 +452,14 @@ pub struct GetBalancesResponse {
pub pending_balances_from_channel_closures:
::prost::alloc::vec::Vec<super::types::PendingSweepBalance>,
}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetMetricsRequest {}

#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetMetricsResponse {
#[prost(string, tag = "1")]
pub metrics: String,
}
10 changes: 10 additions & 0 deletions ldk-server-protos/src/proto/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -424,3 +424,13 @@ message GetBalancesResponse {
// might not already be accounted for in `total_onchain_balance_sats`.
repeated types.PendingSweepBalance pending_balances_from_channel_closures = 6;
}

// Retrieve the ldk-server's metrics.
message GetMetricsRequest {}

// The response `content` for the `GetMetricsRequest` API, when HttpStatusCode is OK (200).
// When HttpStatusCode is not OK (non-200), the response `content` contains a serialized `ErrorResponse`.
message GetMetricsResponse {
// Prometheus metrics payload as its exposition format.
string metrics = 1;
}
2 changes: 2 additions & 0 deletions ldk-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ hex = { package = "hex-conservative", version = "0.2.1", default-features = fals
rusqlite = { version = "0.31.0", features = ["bundled"] }
rand = { version = "0.8.5", default-features = false }
async-trait = { version = "0.1.85", default-features = false }
metrics = "0.24.1"
metrics-exporter-prometheus = "0.16.2"

# Required for RabittMQ based EventPublisher. Only enabled for `events-rabbitmq` feature.
lapin = { version = "2.4.0", features = ["rustls"], default-features = false, optional = true }
Expand Down
12 changes: 12 additions & 0 deletions ldk-server/src/api/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
use crate::api::error::LdkServerError;
use crate::service::Context;
use ldk_server_protos::api::{GetMetricsRequest, GetMetricsResponse};

pub(crate) const GET_METRICS: &str = "metrics";

pub(crate) fn handle_metrics_request(
context: Context, _request: GetMetricsRequest,
) -> Result<GetMetricsResponse, LdkServerError> {
let metrics = context.prometheus_handle.render();
Ok(GetMetricsResponse { metrics })
}
1 change: 1 addition & 0 deletions ldk-server/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub(crate) mod get_payment_details;
pub(crate) mod list_channels;
pub(crate) mod list_forwarded_payments;
pub(crate) mod list_payments;
pub(crate) mod metrics;
pub(crate) mod onchain_receive;
pub(crate) mod onchain_send;
pub(crate) mod open_channel;
Expand Down
16 changes: 15 additions & 1 deletion ldk-server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
mod api;
mod io;
mod service;
mod telemetry;
mod util;

use crate::service::NodeService;

use ldk_node::{Builder, Event, Node};

use metrics::counter;
use telemetry::{collect_node_metrics, setup_prometheus};
use tokio::net::TcpListener;
use tokio::signal::unix::SignalKind;

Expand Down Expand Up @@ -108,6 +111,8 @@ fn main() {

let event_publisher: Arc<dyn EventPublisher> = Arc::new(NoopEventPublisher);

let prometheus_handle = setup_prometheus();

#[cfg(feature = "events-rabbitmq")]
let event_publisher: Arc<dyn EventPublisher> = {
let rabbitmq_config = RabbitMqConfig {
Expand Down Expand Up @@ -149,20 +154,23 @@ fn main() {
event = event_node.next_event_async() => {
match event {
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
counter!("channel_pending").increment(1);
println!(
"CHANNEL_PENDING: {} from counterparty {}",
channel_id, counterparty_node_id
);
event_node.event_handled();
},
Event::ChannelReady { channel_id, counterparty_node_id, .. } => {
counter!("channel_ready").increment(1);
println!(
"CHANNEL_READY: {} from counterparty {:?}",
channel_id, counterparty_node_id
);
event_node.event_handled();
},
Event::PaymentReceived { payment_id, payment_hash, amount_msat, .. } => {
counter!("payment_received").increment(1);
println!(
"PAYMENT_RECEIVED: with id {:?}, hash {}, amount_msat {}",
payment_id, payment_hash, amount_msat
Expand All @@ -178,6 +186,7 @@ fn main() {
Arc::clone(&paginated_store)).await;
},
Event::PaymentSuccessful {payment_id, ..} => {
counter!("payment_successful").increment(1);
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");

publish_event_and_upsert_payment(&payment_id,
Expand All @@ -189,6 +198,7 @@ fn main() {
Arc::clone(&paginated_store)).await;
},
Event::PaymentFailed {payment_id, ..} => {
counter!("payment_failed").increment(1);
let payment_id = payment_id.expect("PaymentId expected for ldk-server >=0.1");

publish_event_and_upsert_payment(&payment_id,
Expand All @@ -200,6 +210,7 @@ fn main() {
Arc::clone(&paginated_store)).await;
},
Event::PaymentClaimable {payment_id, ..} => {
counter!("payment_claimable").increment(1);
if let Some(payment_details) = event_node.payment(&payment_id) {
let payment = payment_to_proto(payment_details);
upsert_payment_details(&event_node, Arc::clone(&paginated_store), &payment);
Expand All @@ -220,6 +231,8 @@ fn main() {
outbound_amount_forwarded_msat
} => {

counter!("payment_forwarded").increment(1);

println!("PAYMENT_FORWARDED: with outbound_amount_forwarded_msat {}, total_fee_earned_msat: {}, inbound channel: {}, outbound channel: {}",
outbound_amount_forwarded_msat.unwrap_or(0), total_fee_earned_msat.unwrap_or(0), prev_channel_id, next_channel_id
);
Expand Down Expand Up @@ -279,7 +292,7 @@ fn main() {
match res {
Ok((stream, _)) => {
let io_stream = TokioIo::new(stream);
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store));
let node_service = NodeService::new(Arc::clone(&node), Arc::clone(&paginated_store), Arc::new(prometheus_handle.clone()));
runtime.spawn(async move {
if let Err(err) = http1::Builder::new().serve_connection(io_stream, node_service).await {
eprintln!("Failed to serve connection: {}", err);
Expand All @@ -289,6 +302,7 @@ fn main() {
Err(e) => eprintln!("Failed to accept connection: {}", e),
}
}
_ = collect_node_metrics(Arc::clone(&node)) => {}
_ = tokio::signal::ctrl_c() => {
println!("Received CTRL-C, shutting down..");
break;
Expand Down
13 changes: 11 additions & 2 deletions ldk-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use hyper::body::{Bytes, Incoming};
use hyper::service::Service;
use hyper::{Request, Response, StatusCode};

use metrics_exporter_prometheus::PrometheusHandle;
use prost::Message;

use crate::api::bolt11_receive::{handle_bolt11_receive_request, BOLT11_RECEIVE_PATH};
Expand All @@ -24,6 +25,7 @@ use crate::api::list_forwarded_payments::{
handle_list_forwarded_payments_request, LIST_FORWARDED_PAYMENTS_PATH,
};
use crate::api::list_payments::{handle_list_payments_request, LIST_PAYMENTS_PATH};
use crate::api::metrics::{handle_metrics_request, GET_METRICS};
use crate::api::onchain_receive::{handle_onchain_receive_request, ONCHAIN_RECEIVE_PATH};
use crate::api::onchain_send::{handle_onchain_send_request, ONCHAIN_SEND_PATH};
use crate::api::open_channel::{handle_open_channel, OPEN_CHANNEL_PATH};
Expand All @@ -40,17 +42,22 @@ use std::sync::Arc;
pub struct NodeService {
node: Arc<Node>,
paginated_kv_store: Arc<dyn PaginatedKVStore>,
prometheus_handle: Arc<PrometheusHandle>,
}

impl NodeService {
pub(crate) fn new(node: Arc<Node>, paginated_kv_store: Arc<dyn PaginatedKVStore>) -> Self {
Self { node, paginated_kv_store }
pub(crate) fn new(
node: Arc<Node>, paginated_kv_store: Arc<dyn PaginatedKVStore>,
prometheus_handle: Arc<PrometheusHandle>,
) -> Self {
Self { node, paginated_kv_store, prometheus_handle }
}
}

pub(crate) struct Context {
pub(crate) node: Arc<Node>,
pub(crate) paginated_kv_store: Arc<dyn PaginatedKVStore>,
pub(crate) prometheus_handle: Arc<PrometheusHandle>,
}

impl Service<Request<Incoming>> for NodeService {
Expand All @@ -62,6 +69,7 @@ impl Service<Request<Incoming>> for NodeService {
let context = Context {
node: Arc::clone(&self.node),
paginated_kv_store: Arc::clone(&self.paginated_kv_store),
prometheus_handle: Arc::clone(&self.prometheus_handle),
};
// Exclude '/' from path pattern matching.
match &req.uri().path()[1..] {
Expand Down Expand Up @@ -100,6 +108,7 @@ impl Service<Request<Incoming>> for NodeService {
LIST_FORWARDED_PAYMENTS_PATH => {
Box::pin(handle_request(context, req, handle_list_forwarded_payments_request))
},
GET_METRICS => Box::pin(handle_request(context, req, handle_metrics_request)),
path => {
let error = format!("Unknown request: {}", path).into_bytes();
Box::pin(async {
Expand Down
92 changes: 92 additions & 0 deletions ldk-server/src/telemetry.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
use std::io;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;

use ldk_node::{BalanceDetails, Node};
use metrics::{describe_counter, describe_gauge, gauge};

use metrics_exporter_prometheus::{PrometheusBuilder, PrometheusHandle};

pub fn setup_prometheus() -> PrometheusHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean all their descriptions, right? Just leaving the returning handle?

let prometheus_builder = PrometheusBuilder::new();
let handler =
prometheus_builder.install_recorder().expect("failed to install Prometheus recorder");

describe_counter!(
"channel_pending",
"A channel has been created and is pending confirmation on-chain."
);
describe_counter!("channel_ready", "A channel is ready to be used.");
describe_counter!("payment_received", "A payment has been received.");
describe_counter!("payment_successful", "A sent payment was successful.");
describe_counter!("payment_failed", "A sent payment has failed.");
describe_counter!(
"payment_claimable",
"A payment for a previously-registered payment hash has been received."
);
describe_counter!("payment_forwarded", "A sent payment has failed.");

describe_gauge!("node_total_onchain_balance_sats", "The total balance of our on-chain wallet.");
describe_gauge!(
"node_spendable_onchain_balance_sats",
"The currently spendable balance of our on-chain wallet."
);
describe_gauge!(
"node_total_anchor_channels_reserve_sats",
"The total anchor channel reserve balance."
);
describe_gauge!(
"node_total_lightning_balance_sats",
"The total balance that we would be able to claim across all our Lightning channels."
);
describe_gauge!(
"node_lightning_balances",
"A detailed list of all known Lightning balances that would be claimable on channel closure."
);
describe_gauge!(
"node_pending_balances_from_channel_closures",
"A detailed list of balances currently being swept from the Lightning to the on-chain wallet."
);

// TODO (arturgontijo): Add all labels here. Fix descriptions.

handler
}

pub async fn collect_node_metrics(node: Arc<Node>) -> io::Result<()> {
println!("[DEBUG] Collecting Node Metrics...");
let BalanceDetails {
total_onchain_balance_sats,
spendable_onchain_balance_sats,
total_anchor_channels_reserve_sats,
total_lightning_balance_sats,
// TODO (arturgontijo):
// lightning_balances,
// pending_balances_from_channel_closures,
..
} = node.list_balances();
set_gauge("node_total_onchain_balance_sats".to_string(), total_onchain_balance_sats as f64);
set_gauge(
"node_spendable_onchain_balance_sats".to_string(),
spendable_onchain_balance_sats as f64,
);
set_gauge(
"node_total_anchor_channels_reserve_sats".to_string(),
total_anchor_channels_reserve_sats as f64,
);
set_gauge("node_total_lightning_balance_sats".to_string(), total_lightning_balance_sats as f64);
// TODO (arturgontijo):
// set_gauge("node_lightning_balances".to_string(), lightning_balances as f64);
// set_gauge("node_pending_balances_from_channel_closures".to_string(), pending_balances_from_channel_closures as f64);

// TODO (arturgontijo): Get sleep delay from config file.
sleep(Duration::from_millis(10_000)).await;

Ok(())
}

fn set_gauge(label: String, value: f64) {
let gauge = gauge!(label);
gauge.set(value);
}
Loading