Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bottlecap/src/bin/bottlecap/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#![deny(missing_copy_implementations)]
#![deny(missing_debug_implementations)]

use bottlecap::tags::lambda::tags::resolve_runtime;
use bottlecap::{
base_url,
config::{self, AwsConfig, Config},
Expand Down Expand Up @@ -53,6 +54,7 @@ use dogstatsd::{
};
use reqwest::Client;
use serde::Deserialize;
use std::sync::OnceLock;
use std::{
collections::hash_map,
collections::HashMap,
Expand Down Expand Up @@ -255,6 +257,11 @@ async fn extension_loop_active(
) -> Result<()> {
let mut event_bus = EventBus::run();

let runtime_resolution = Arc::new(OnceLock::new());

let runtime_resolver = Arc::clone(&runtime_resolution);
tokio::spawn(async move { runtime_resolver.set(resolve_runtime().await) });

let tags_provider = setup_tag_provider(
aws_config,
config,
Expand All @@ -264,6 +271,7 @@ async fn extension_loop_active(
config,
resolved_api_key.clone(),
&tags_provider,
Arc::clone(&runtime_resolution),
event_bus.get_sender_copy(),
);

Expand Down Expand Up @@ -509,6 +517,7 @@ fn start_logs_agent(
config: &Arc<Config>,
resolved_api_key: String,
tags_provider: &Arc<TagProvider>,
runtime_resolution: Arc<OnceLock<String>>,
event_bus: Sender<Event>,
) -> (Sender<TelemetryEvent>, LogsFlusher) {
let mut logs_agent = LogsAgent::new(Arc::clone(tags_provider), Arc::clone(config), event_bus);
Expand All @@ -519,7 +528,7 @@ fn start_logs_agent(
build_fqdn_logs(config.site.clone()),
);
tokio::spawn(async move {
logs_agent.spin().await;
logs_agent.spin(runtime_resolution).await;
});
(logs_agent_channel, logs_flusher)
}
Expand Down
14 changes: 9 additions & 5 deletions bottlecap/src/logs/agent.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock};
use tokio::sync::mpsc::{self, Sender};

use crate::events::Event;
Expand Down Expand Up @@ -40,15 +40,19 @@ impl LogsAgent {
}
}

pub async fn spin(&mut self) {
pub async fn spin(&mut self, runtime_resolution: Arc<OnceLock<String>>) {
while let Some(event) = self.rx.recv().await {
self.processor.process(event, &self.aggregator).await;
self.processor
.process(event, runtime_resolution.clone(), &self.aggregator)
.await;
}
}

pub async fn sync_consume(&mut self) {
pub async fn sync_consume(&mut self, runtime_resolution: Arc<OnceLock<String>>) {
if let Some(events) = self.rx.recv().await {
self.processor.process(events, &self.aggregator).await;
self.processor
.process(events, runtime_resolution.clone(), &self.aggregator)
.await;
}
}

Expand Down
70 changes: 51 additions & 19 deletions bottlecap/src/logs/lambda/processor.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock};
use tokio::sync::mpsc::Sender;

use tracing::error;
Expand Down Expand Up @@ -189,7 +189,11 @@ impl LambdaProcessor {
}
}

fn get_intake_log(&mut self, mut lambda_message: Message) -> Result<IntakeLog, Box<dyn Error>> {
fn get_intake_log(
&mut self,
mut lambda_message: Message,
runtime_resolution: Arc<OnceLock<String>>,
) -> Result<IntakeLog, Box<dyn Error>> {
let request_id = match lambda_message.lambda.request_id {
// Log already has a `request_id`
Some(request_id) => Some(request_id.clone()),
Expand All @@ -200,11 +204,15 @@ impl LambdaProcessor {

lambda_message.lambda.request_id = request_id;

let mut tags = self.tags.clone();
if let Some(runtime) = runtime_resolution.get() {
tags = format!("{tags},runtime:{runtime}");
}
let log = IntakeLog {
hostname: self.function_arn.clone(),
source: LAMBDA_RUNTIME_SLUG.to_string(),
service: self.service.clone(),
tags: self.tags.clone(),
tags,
message: lambda_message,
};

Expand All @@ -217,16 +225,25 @@ impl LambdaProcessor {
}
}

async fn make_log(&mut self, event: TelemetryEvent) -> Result<IntakeLog, Box<dyn Error>> {
async fn make_log(
&mut self,
event: TelemetryEvent,
runtime_resolution: Arc<OnceLock<String>>,
) -> Result<IntakeLog, Box<dyn Error>> {
match self.get_message(event).await {
Ok(lambda_message) => self.get_intake_log(lambda_message),
Ok(lambda_message) => self.get_intake_log(lambda_message, runtime_resolution),
// TODO: Check what to do when we can't process the event
Err(e) => Err(e),
}
}

pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc<Mutex<Aggregator>>) {
if let Ok(mut log) = self.make_log(event).await {
pub async fn process(
&mut self,
event: TelemetryEvent,
runtime_resolution: Arc<OnceLock<String>>,
aggregator: &Arc<Mutex<Aggregator>>,
) {
if let Ok(mut log) = self.make_log(event, runtime_resolution).await {
let should_send_log =
LambdaProcessor::apply_rules(&self.rules, &mut log.message.message);
if should_send_log {
Expand Down Expand Up @@ -264,14 +281,13 @@ impl LambdaProcessor {
mod tests {
use super::*;

use chrono::{TimeZone, Utc};
use serde_json::{Number, Value};
use std::collections::hash_map::HashMap;

use crate::logs::lambda::Lambda;
use crate::telemetry::events::{
InitPhase, InitType, ReportMetrics, RuntimeDoneMetrics, Status,
};
use chrono::{TimeZone, Utc};
use serde_json::{Number, Value};
use std::collections::hash_map::HashMap;

macro_rules! get_message_tests {
($($name:ident: $value:expr,)*) => {
Expand Down Expand Up @@ -499,7 +515,9 @@ mod tests {
};

let lambda_message = processor.get_message(event.clone()).await.unwrap();
let intake_log = processor.get_intake_log(lambda_message).unwrap();
let intake_log = processor
.get_intake_log(lambda_message, Arc::new(OnceLock::new()))
.unwrap();

assert_eq!(intake_log.source, LAMBDA_RUNTIME_SLUG.to_string());
assert_eq!(intake_log.hostname, "test-arn".to_string());
Expand Down Expand Up @@ -544,7 +562,9 @@ mod tests {
let lambda_message = processor.get_message(event.clone()).await.unwrap();
assert_eq!(lambda_message.lambda.request_id, None);

let intake_log = processor.get_intake_log(lambda_message).unwrap_err();
let intake_log = processor
.get_intake_log(lambda_message, Arc::new(OnceLock::new()))
.unwrap_err();
assert_eq!(
intake_log.to_string(),
"No request_id available, queueing for later"
Expand Down Expand Up @@ -578,7 +598,9 @@ mod tests {
};

let start_lambda_message = processor.get_message(start_event.clone()).await.unwrap();
processor.get_intake_log(start_lambda_message).unwrap();
processor
.get_intake_log(start_lambda_message, Arc::new(OnceLock::new()))
.unwrap();

// This could be any event that doesn't have a `request_id`
let event = TelemetryEvent {
Expand All @@ -587,7 +609,9 @@ mod tests {
};

let lambda_message = processor.get_message(event.clone()).await.unwrap();
let intake_log = processor.get_intake_log(lambda_message).unwrap();
let intake_log = processor
.get_intake_log(lambda_message, Arc::new(OnceLock::new()))
.unwrap();
assert_eq!(
intake_log.message.lambda.request_id,
Some("test-request-id".to_string())
Expand Down Expand Up @@ -623,7 +647,9 @@ mod tests {
},
};

processor.process(event.clone(), &aggregator).await;
processor
.process(event.clone(), Arc::new(OnceLock::new()), &aggregator)
.await;

let mut aggregator_lock = aggregator.lock().unwrap();
let batch = aggregator_lock.get_batch();
Expand Down Expand Up @@ -670,7 +696,9 @@ mod tests {
record: TelemetryRecord::Function(Value::String("test-function".to_string())),
};

processor.process(event.clone(), &aggregator).await;
processor
.process(event.clone(), Arc::new(OnceLock::new()), &aggregator)
.await;
assert_eq!(processor.orphan_logs.len(), 1);

let mut aggregator_lock = aggregator.lock().unwrap();
Expand Down Expand Up @@ -706,7 +734,9 @@ mod tests {
},
};

processor.process(start_event.clone(), &aggregator).await;
processor
.process(start_event.clone(), Arc::new(OnceLock::new()), &aggregator)
.await;
assert_eq!(
processor.invocation_context.request_id,
"test-request-id".to_string()
Expand All @@ -718,7 +748,9 @@ mod tests {
record: TelemetryRecord::Function(Value::String("test-function".to_string())),
};

processor.process(event.clone(), &aggregator).await;
processor
.process(event.clone(), Arc::new(OnceLock::new()), &aggregator)
.await;

// Verify aggregator logs
let mut aggregator_lock = aggregator.lock().unwrap();
Expand Down
6 changes: 3 additions & 3 deletions bottlecap/src/logs/processor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::{Arc, Mutex};
use std::sync::{Arc, Mutex, OnceLock};
use tokio::sync::mpsc::Sender;

use tracing::debug;
Expand Down Expand Up @@ -29,10 +29,10 @@ impl LogsProcessor {
}
}

pub async fn process(&mut self, event: TelemetryEvent, aggregator: &Arc<Mutex<Aggregator>>) {
pub async fn process(&mut self, event: TelemetryEvent, runtime_resolution: Arc<OnceLock<String>>, aggregator: &Arc<Mutex<Aggregator>>) {
match self {
LogsProcessor::Lambda(lambda_processor) => {
lambda_processor.process(event, aggregator).await;
lambda_processor.process(event, runtime_resolution, aggregator).await;
}
}
}
Expand Down
21 changes: 12 additions & 9 deletions bottlecap/src/tags/lambda/tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,6 @@ fn tags_from_env(
tags_map.insert(MEMORY_SIZE_KEY.to_string(), memory_size);
}

let runtime = resolve_runtime("/proc", "/etc/os-release");
// TODO runtime resolution is too fast, need to change approach. Resolving it anyway to get debug info and performance of resolution
debug!("Resolved runtime: {runtime}. Not adding to tags yet");
// tags_map.insert(RUNTIME_KEY.to_string(), runtime);

tags_map.insert(ARCHITECTURE_KEY.to_string(), arch_to_platform().to_string());
tags_map.insert(
EXTENSION_VERSION_KEY.to_string(),
Expand All @@ -144,7 +139,15 @@ fn tags_from_env(
tags_map
}

fn resolve_runtime(proc_path: &str, fallback_provided_al_path: &str) -> String {
pub async fn resolve_runtime() -> String {
let start = Instant::now();
let runtime = read_runtime("/proc", "/etc/os-release");
let duration = start.elapsed().as_micros();
debug!("Resolved runtime in {duration}us");
runtime
}

fn read_runtime(proc_path: &str, fallback_provided_al_path: &str) -> String {
let start = Instant::now();
match fs::read_dir(proc_path) {
Ok(proc_dir) => {
Expand Down Expand Up @@ -336,7 +339,7 @@ mod tests {
let mut file = File::create(&path).unwrap();
file.write_all(content.as_bytes()).unwrap();

let runtime = resolve_runtime(proc_id_folder.parent().unwrap().to_str().unwrap(), "");
let runtime = read_runtime(proc_id_folder.parent().unwrap().to_str().unwrap(), "");
fs::remove_file(path).unwrap();
assert_eq!(runtime, "java123");
}
Expand All @@ -348,7 +351,7 @@ mod tests {
let mut file = File::create(path).unwrap();
file.write_all(content.as_bytes()).unwrap();

let runtime = resolve_runtime("", path);
let runtime = read_runtime("", path);
fs::remove_file(path).unwrap();
assert_eq!(runtime, "provided.al2");
}
Expand All @@ -361,7 +364,7 @@ mod tests {
let mut file = File::create(path).unwrap();
file.write_all(content.as_bytes()).unwrap();

let runtime = resolve_runtime("", path);
let runtime = read_runtime("", path);
fs::remove_file(path).unwrap();
assert_eq!(runtime, "provided.al2023");
}
Expand Down
4 changes: 2 additions & 2 deletions bottlecap/tests/logs_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use bottlecap::telemetry::events::TelemetryEvent;
use bottlecap::LAMBDA_RUNTIME_SLUG;
use httpmock::prelude::*;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::{Arc, OnceLock};

mod common;

Expand Down Expand Up @@ -66,7 +66,7 @@ async fn test_logs() {
.expect("Failed sending telemetry events");
}

logs_agent.sync_consume().await;
logs_agent.sync_consume(Arc::new(OnceLock::new())).await;

let _ = logs_flusher.flush().await;

Expand Down