Skip to content

Commit 7fd8db7

Browse files
refactor enhanced metrics monitoring (#901)
Use a long-running task that resumes at the start of an invocation, instead of spawning a new task on each invocation. Uses a channel-based service to pause and resume monitoring. Tested with a function that opens a large number of file descriptors to make sure it still captures data accurately. Tested with self-monitoring as well (new implementation, previous implementation): <img width="1337" height="316" alt="Screenshot 2025-10-20 at 4 16 47 PM" src="https://github.com/user-attachments/assets/3e3a0470-8566-42f9-926b-2ae3df34a4f7" />
1 parent 4a6eed2 commit 7fd8db7

File tree

7 files changed

+437
-363
lines changed

7 files changed

+437
-363
lines changed

bottlecap/src/lifecycle/invocation/context.rs

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,6 @@ mod tests {
432432
use crate::proc::{CPUData, NetworkData};
433433
use serde_json::json;
434434
use std::collections::HashMap;
435-
use tokio::sync::watch;
436435

437436
use super::*;
438437

@@ -567,15 +566,10 @@ mod tests {
567566
});
568567

569568
let uptime_offset = Some(50f64);
570-
let (tmp_chan_tx, _) = watch::channel(());
571-
let (process_chan_tx, _) = watch::channel(());
572-
573569
let enhanced_metric_data = Some(EnhancedMetricData {
574570
network_offset,
575571
cpu_offset,
576572
uptime_offset,
577-
tmp_chan_tx,
578-
process_chan_tx,
579573
});
580574

581575
buffer.add_enhanced_metric_data(&request_id, enhanced_metric_data.clone());

bottlecap/src/lifecycle/invocation/processor.rs

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ use chrono::{DateTime, Utc};
88
use datadog_trace_protobuf::pb::Span;
99
use datadog_trace_utils::tracer_header_tags;
1010
use serde_json::Value;
11-
use tokio::sync::watch;
1211
use tokio::time::Instant;
1312
use tracing::{debug, warn};
1413

@@ -103,11 +102,14 @@ impl Processor {
103102
config.trace_aws_service_representation_enabled,
104103
);
105104

105+
let enhanced_metrics = EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config));
106+
enhanced_metrics.start_usage_metrics_task(); // starts the long-running task that monitors usage metrics (fd_use, threads_use, tmp_used)
107+
106108
Processor {
107109
context_buffer: ContextBuffer::default(),
108110
inferrer: SpanInferrer::new(Arc::clone(&config)),
109111
propagator,
110-
enhanced_metrics: EnhancedMetrics::new(metrics_aggregator, Arc::clone(&config)),
112+
enhanced_metrics,
111113
aws_config,
112114
tracer_detected: false,
113115
runtime: None,
@@ -136,26 +138,18 @@ impl Processor {
136138
.unwrap_or_default();
137139

138140
if self.config.lambda_proc_enhanced_metrics {
141+
// Resume tmp, fd, and threads enhanced metrics monitoring
142+
self.enhanced_metrics.resume_usage_metrics_monitoring();
143+
139144
// Collect offsets for network and cpu metrics
140145
let network_offset: Option<NetworkData> = proc::get_network_data().ok();
141146
let cpu_offset: Option<CPUData> = proc::get_cpu_data().ok();
142147
let uptime_offset: Option<f64> = proc::get_uptime().ok();
143148

144-
// Start a channel for monitoring tmp enhanced data
145-
let (tmp_chan_tx, tmp_chan_rx) = watch::channel(());
146-
self.enhanced_metrics.set_tmp_enhanced_metrics(tmp_chan_rx);
147-
148-
// Start a channel for monitoring file descriptor and thread count
149-
let (process_chan_tx, process_chan_rx) = watch::channel(());
150-
self.enhanced_metrics
151-
.set_process_enhanced_metrics(process_chan_rx);
152-
153149
let enhanced_metric_offsets = Some(EnhancedMetricData {
154150
network_offset,
155151
cpu_offset,
156152
uptime_offset,
157-
tmp_chan_tx,
158-
process_chan_tx,
159153
});
160154
self.context_buffer
161155
.add_enhanced_metric_data(&request_id, enhanced_metric_offsets);
@@ -313,6 +307,10 @@ impl Processor {
313307
}
314308
}
315309

310+
// Set tmp, fd, and threads enhanced metrics
311+
self.enhanced_metrics.set_max_enhanced_metrics();
312+
self.enhanced_metrics.set_usage_enhanced_metrics(); // sets use metric values and pauses monitoring task
313+
316314
self.context_buffer
317315
.add_runtime_duration(request_id, metrics.duration_ms);
318316

@@ -388,10 +386,6 @@ impl Processor {
388386
offsets.cpu_offset.clone(),
389387
offsets.uptime_offset,
390388
);
391-
// Send the signal to stop monitoring tmp
392-
_ = offsets.tmp_chan_tx.send(());
393-
// Send the signal to stop monitoring file descriptors and threads
394-
_ = offsets.process_chan_tx.send(());
395389
}
396390

397391
// todo(duncanista): Add missing metric tags for ASM

bottlecap/src/lifecycle/invocation/processor_service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ pub enum ProcessorCommand {
100100
SendCtxSpans {
101101
tags_provider: Arc<provider::Provider>,
102102
trace_sender: Arc<SendingTraceProcessor>,
103-
context: Context,
103+
context: Box<Context>,
104104
},
105105
Shutdown,
106106
}
@@ -338,7 +338,7 @@ impl InvocationProcessorHandle {
338338
.send(ProcessorCommand::SendCtxSpans {
339339
tags_provider: Arc::clone(tags_provider),
340340
trace_sender: Arc::clone(trace_sender),
341-
context,
341+
context: Box::new(context),
342342
})
343343
.await
344344
}
@@ -491,7 +491,7 @@ impl InvocationProcessorService {
491491
context,
492492
} => {
493493
self.processor
494-
.send_ctx_spans(&tags_provider, &trace_sender, context)
494+
.send_ctx_spans(&tags_provider, &trace_sender, *context)
495495
.await;
496496
}
497497
ProcessorCommand::Shutdown => {

0 commit comments

Comments
 (0)