Skip to content

Commit dd6c742

Browse files
authored
chore(bottlecap): Re-architect Telemetry API events forwarding (#262)
* add `from_stream` to `HttpRequestParser` read properly from the stream so we can read every byte, retry 3 times in case theres no data * add another unit test for invalid data * update algorithm * send events as batch * receive events as batch * process as batch and send events of interest for interest to the main thread * aggregate logs as batch * swap senders so data is sent faster * drop lock as soon as possible * add `no-default-features` to `reqwest` so it compiles in linux
1 parent 09c473c commit dd6c742

File tree

8 files changed

+190
-365
lines changed

8 files changed

+190
-365
lines changed

bottlecap/Cargo.lock

Lines changed: 3 additions & 241 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bottlecap/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ tracing-subscriber = { version = "0.3.18", default-features = false, features =
2424
ureq = { version = "2.9.7", features = ["tls", "json"], default-features = false }
2525
ustr = { version = "1.0.0", default-features = false }
2626
hmac = "0.12.1"
27-
reqwest = { version = "0.12.4", features = ["json", "blocking", "rustls-tls"] }
27+
reqwest = { version = "0.12.4", features = ["json", "blocking", "rustls-tls"], default-features = false }
2828
sha2 = "0.10.8"
2929
hex = "0.4.3"
3030
[target.'cfg(not(target_env = "msvc"))'.dependencies]

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,13 @@ fn main() -> Result<()> {
188188
LAMBDA_RUNTIME_SLUG.to_string(),
189189
&metadata_hash,
190190
));
191-
let logs_agent = LogsAgent::run(Arc::clone(&tags_provider), Arc::clone(&config));
191+
192192
let event_bus = EventBus::run();
193+
let logs_agent = LogsAgent::run(
194+
Arc::clone(&tags_provider),
195+
Arc::clone(&config),
196+
event_bus.get_sender_copy(),
197+
);
193198
let metrics_aggr = Arc::new(Mutex::new(
194199
metrics_aggregator::Aggregator::<{ constants::CONTEXTS }>::new(tags_provider.clone())
195200
.expect("failed to create aggregator"),
@@ -209,7 +214,7 @@ fn main() -> Result<()> {
209214
port: TELEMETRY_PORT,
210215
};
211216
let telemetry_listener =
212-
TelemetryListener::run(&telemetry_listener_config, event_bus.get_sender_copy())
217+
TelemetryListener::run(&telemetry_listener_config, logs_agent.get_sender_copy())
213218
.map_err(|e| Error::new(std::io::ErrorKind::InvalidData, e.to_string()))?;
214219
let telemetry_client = TelemetryApiClient::new(r.extension_id.to_string(), TELEMETRY_PORT);
215220
telemetry_client
@@ -258,61 +263,58 @@ fn main() -> Result<()> {
258263
Event::Metric(event) => {
259264
debug!("Metric event: {:?}", event);
260265
}
261-
Event::Telemetry(event) => {
262-
logs_agent.send_event(event.clone());
263-
match event.record {
264-
TelemetryRecord::PlatformInitReport {
265-
initialization_type,
266-
phase,
267-
metrics,
268-
} => {
269-
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
270-
}
271-
TelemetryRecord::PlatformRuntimeDone {
272-
request_id, status, ..
273-
} => {
274-
if status != Status::Success {
266+
Event::Telemetry(event) => match event.record {
267+
TelemetryRecord::PlatformInitReport {
268+
initialization_type,
269+
phase,
270+
metrics,
271+
} => {
272+
debug!("Platform init report for initialization_type: {:?} with phase: {:?} and metrics: {:?}", initialization_type, phase, metrics);
273+
}
274+
TelemetryRecord::PlatformRuntimeDone {
275+
request_id, status, ..
276+
} => {
277+
if status != Status::Success {
278+
if let Err(e) =
279+
lambda_enhanced_metrics.increment_errors_metric()
280+
{
281+
error!("Failed to increment error metric: {e:?}");
282+
}
283+
if status == Status::Timeout {
275284
if let Err(e) =
276-
lambda_enhanced_metrics.increment_errors_metric()
285+
lambda_enhanced_metrics.increment_timeout_metric()
277286
{
278-
error!("Failed to increment error metric: {e:?}");
279-
}
280-
if status == Status::Timeout {
281-
if let Err(e) =
282-
lambda_enhanced_metrics.increment_timeout_metric()
283-
{
284-
error!("Failed to increment timeout metric: {e:?}");
285-
}
287+
error!("Failed to increment timeout metric: {e:?}");
286288
}
287289
}
288-
debug!(
289-
"Runtime done for request_id: {:?} with status: {:?}",
290-
request_id, status
291-
);
292-
logs_agent.flush();
293-
dogstats_client.flush();
294-
break;
295290
}
296-
TelemetryRecord::PlatformReport {
297-
request_id,
298-
status,
299-
metrics,
300-
..
301-
} => {
302-
debug!(
303-
"Platform report for request_id: {:?} with status: {:?}",
304-
request_id, status
305-
);
306-
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
307-
if shutdown {
308-
break;
309-
}
310-
}
311-
_ => {
312-
debug!("Unforwarded Telemetry event: {:?}", event);
291+
debug!(
292+
"Runtime done for request_id: {:?} with status: {:?}",
293+
request_id, status
294+
);
295+
logs_agent.flush();
296+
dogstats_client.flush();
297+
break;
298+
}
299+
TelemetryRecord::PlatformReport {
300+
request_id,
301+
status,
302+
metrics,
303+
..
304+
} => {
305+
debug!(
306+
"Platform report for request_id: {:?} with status: {:?}",
307+
request_id, status
308+
);
309+
lambda_enhanced_metrics.set_report_log_metrics(&metrics);
310+
if shutdown {
311+
break;
313312
}
314313
}
315-
}
314+
_ => {
315+
debug!("Unforwarded Telemetry event: {:?}", event);
316+
}
317+
},
316318
}
317319
} else {
318320
error!("could not get the event");

bottlecap/src/logs/agent.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
use std::sync::mpsc::{self, Sender};
1+
use std::sync::mpsc::{self, Sender, SyncSender};
22
use std::sync::{Arc, Mutex};
33
use std::thread;
44

5-
use tracing::{debug, error};
5+
use tracing::debug;
66

7+
use crate::events::Event;
78
use crate::logs::{aggregator::Aggregator, datadog, processor::LogsProcessor};
89
use crate::tags;
910
use crate::telemetry::events::TelemetryEvent;
@@ -13,7 +14,7 @@ use crate::{config, LAMBDA_RUNTIME_SLUG};
1314
pub struct LogsAgent {
1415
dd_api: datadog::Api,
1516
aggregator: Arc<Mutex<Aggregator>>,
16-
tx: Sender<TelemetryEvent>,
17+
tx: Sender<Vec<TelemetryEvent>>,
1718
join_handle: std::thread::JoinHandle<()>,
1819
}
1920

@@ -22,27 +23,29 @@ impl LogsAgent {
2223
pub fn run(
2324
tags_provider: Arc<tags::provider::Provider>,
2425
datadog_config: Arc<config::Config>,
26+
event_bus: SyncSender<Event>,
2527
) -> LogsAgent {
2628
let aggregator: Arc<Mutex<Aggregator>> = Arc::new(Mutex::new(Aggregator::default()));
2729
let mut processor = LogsProcessor::new(
2830
Arc::clone(&datadog_config),
2931
tags_provider,
32+
event_bus,
3033
LAMBDA_RUNTIME_SLUG.to_string(),
3134
);
3235

3336
let cloned_aggregator = aggregator.clone();
3437

35-
let (tx, rx) = mpsc::channel::<TelemetryEvent>();
38+
let (tx, rx) = mpsc::channel::<Vec<TelemetryEvent>>();
3639
let join_handle = thread::spawn(move || loop {
3740
let received = rx.recv();
3841
// TODO(duncanista): we might need to create a Event::Shutdown
3942
// to signal shutdown and make it easier to handle any floating events
40-
let Ok(event) = received else {
43+
let Ok(events) = received else {
4144
debug!("Failed to received event in Logs Agent");
4245
break;
4346
};
4447

45-
processor.process(event, &cloned_aggregator);
48+
processor.process(events, &cloned_aggregator);
4649
});
4750

4851
let dd_api = datadog::Api::new(datadog_config.api_key.clone(), datadog_config.site.clone());
@@ -54,18 +57,19 @@ impl LogsAgent {
5457
}
5558
}
5659

57-
pub fn send_event(&self, event: TelemetryEvent) {
58-
if let Err(e) = self.tx.send(event) {
59-
error!("Error sending Telemetry event to the Logs Agent: {}", e);
60-
}
60+
#[must_use]
61+
pub fn get_sender_copy(&self) -> Sender<Vec<TelemetryEvent>> {
62+
self.tx.clone()
6163
}
6264

6365
pub fn flush(&self) {
6466
LogsAgent::flush_internal(&self.aggregator, &self.dd_api);
6567
}
6668

6769
fn flush_internal(aggregator: &Arc<Mutex<Aggregator>>, dd_api: &datadog::Api) {
68-
let logs = aggregator.lock().expect("lock poisoned").get_batch();
70+
let mut guard = aggregator.lock().expect("lock poisoned");
71+
let logs = guard.get_batch();
72+
drop(guard);
6973
dd_api.send(&logs).expect("Failed to send logs to Datadog");
7074
}
7175

bottlecap/src/logs/aggregator.rs

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
1-
use serde::Serialize;
21
use std::collections::VecDeque;
3-
use tracing::{debug, warn};
2+
use tracing::warn;
43

54
use crate::logs::constants;
65

@@ -38,10 +37,9 @@ impl Aggregator {
3837
}
3938
}
4039

41-
pub fn add<T: Serialize>(&mut self, log: T) {
42-
match serde_json::to_string(&log) {
43-
Ok(log) => self.messages.push_back(log),
44-
Err(e) => debug!("Failed to serialize log: {}", e),
40+
pub fn add_batch(&mut self, logs: Vec<String>) {
41+
for log in logs {
42+
self.messages.push_back(log);
4543
}
4644
}
4745

@@ -91,7 +89,7 @@ mod tests {
9189
use crate::logs::lambda::{IntakeLog, Lambda, Message};
9290

9391
#[test]
94-
fn test_add() {
92+
fn test_add_batch() {
9593
let mut aggregator = Aggregator::default();
9694
let log = IntakeLog {
9795
message: Message {
@@ -108,9 +106,10 @@ mod tests {
108106
tags: "tags".to_string(),
109107
source: "source".to_string(),
110108
};
111-
aggregator.add(log.clone());
109+
let serialized_log = serde_json::to_string(&log).unwrap();
110+
aggregator.add_batch(vec![serialized_log.clone()]);
112111
assert_eq!(aggregator.messages.len(), 1);
113-
assert_eq!(aggregator.messages[0], serde_json::to_string(&log).unwrap());
112+
assert_eq!(aggregator.messages[0], serialized_log);
114113
}
115114

116115
#[test]
@@ -131,7 +130,8 @@ mod tests {
131130
tags: "tags".to_string(),
132131
source: "source".to_string(),
133132
};
134-
aggregator.add(log.clone());
133+
let serialized_log = serde_json::to_string(&log).unwrap();
134+
aggregator.add_batch(vec![serialized_log.clone()]);
135135
assert_eq!(aggregator.messages.len(), 1);
136136
let batch = aggregator.get_batch();
137137
let serialized_batch = format!("[{}]", serde_json::to_string(&log).unwrap());
@@ -157,9 +157,12 @@ mod tests {
157157
source: "source".to_string(),
158158
};
159159
// Add 3 logs
160-
aggregator.add(log.clone());
161-
aggregator.add(log.clone());
162-
aggregator.add(log.clone());
160+
let serialized_log = serde_json::to_string(&log).unwrap();
161+
aggregator.add_batch(vec![
162+
serialized_log.clone(),
163+
serialized_log.clone(),
164+
serialized_log.clone(),
165+
]);
163166

164167
// The batch should only contain the first 2 logs
165168
let first_batch = aggregator.get_batch();
@@ -194,12 +197,14 @@ mod tests {
194197
source: "source".to_string(),
195198
};
196199
// Add 2 logs
197-
aggregator.add(log.clone());
200+
let serialized_log = serde_json::to_string(&log).unwrap();
201+
aggregator.add_batch(vec![serialized_log.clone()]);
198202

199203
// This log will exceed the max content size
200204
let mut big_log = log.clone();
201205
big_log.message.message = "a".repeat(256);
202-
aggregator.add(big_log.clone());
206+
let serialized_big_log = serde_json::to_string(&log).unwrap();
207+
aggregator.add_batch(vec![serialized_big_log.clone()]);
203208

204209
let first_batch = aggregator.get_batch();
205210
let serialized_log = serde_json::to_string(&log).unwrap();

0 commit comments

Comments
 (0)