Skip to content

Commit 5b4399e

Browse files
astuyveduncanistaalexgallotta
authored
chore(bottlecap): switch flushing strategy to race (#318)
* feat: race flush * refactor: periodic only when configured * fmt * when flushing strategy is default, set periodic flush tick to `1s` * on `End`, never flush until the end of the invocation * remove `tokio_unstable` feature for building * remove debug comment * remove `invocation_times` mod * update `flush_control.rs` * use `flush_control` in main * allow `end,<ms>` strategy allows to flush periodically over a given amount of seconds and at the end * update `debug` comment for flushing * simplify logic for flush strategy parsing * remove log that could spam debug * refactor code and add unit test --------- Co-authored-by: jordan gonzález <[email protected]> Co-authored-by: alexgallotta <[email protected]>
1 parent e1a622f commit 5b4399e

File tree

5 files changed

+153
-197
lines changed

5 files changed

+153
-197
lines changed

bottlecap/src/bin/bottlecap/main.rs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,23 @@ use bottlecap::{
1919
invocation_context::{InvocationContext, InvocationContextBuffer},
2020
},
2121
logger,
22-
logs::{agent::LogsAgent, flusher::Flusher as LogsFlusher},
22+
logs::{
23+
agent::LogsAgent,
24+
flusher::{build_fqdn_logs, Flusher as LogsFlusher},
25+
},
2326
metrics::{
2427
aggregator::Aggregator as MetricsAggregator,
2528
constants::CONTEXTS,
2629
dogstatsd::{DogStatsD, DogStatsDConfig},
2730
enhanced::lambda::Lambda as enhanced_metrics,
28-
flusher::Flusher as MetricsFlusher,
31+
flusher::{build_fqdn_metrics, Flusher as MetricsFlusher},
2932
},
3033
secrets::decrypt,
3134
tags::{lambda, provider::Provider as TagProvider},
3235
telemetry::{
3336
self,
3437
client::TelemetryApiClient,
35-
events::TelemetryEvent,
36-
events::{Status, TelemetryRecord},
38+
events::{Status, TelemetryEvent, TelemetryRecord},
3739
listener::TelemetryListener,
3840
},
3941
traces::{
@@ -49,6 +51,8 @@ use bottlecap::{
4951
};
5052
use datadog_trace_obfuscation::obfuscation_config;
5153
use decrypt::resolve_secrets;
54+
use reqwest::Client;
55+
use serde::Deserialize;
5256
use std::{
5357
collections::hash_map,
5458
collections::HashMap,
@@ -61,17 +65,12 @@ use std::{
6165
sync::{Arc, Mutex},
6266
};
6367
use telemetry::listener::TelemetryListenerConfig;
68+
use tokio::sync::mpsc::Sender;
6469
use tokio::sync::Mutex as TokioMutex;
70+
use tokio_util::sync::CancellationToken;
6571
use tracing::{debug, error};
6672
use tracing_subscriber::EnvFilter;
6773

68-
use bottlecap::logs::flusher::build_fqdn_logs;
69-
use bottlecap::metrics::flusher::build_fqdn_metrics;
70-
use reqwest::Client;
71-
use serde::Deserialize;
72-
use tokio::sync::mpsc::Sender;
73-
use tokio_util::sync::CancellationToken;
74-
7574
#[derive(Clone, Deserialize)]
7675
#[serde(rename_all = "camelCase")]
7776
struct RegisterResponse {
@@ -327,10 +326,13 @@ async fn extension_loop_active(
327326
let telemetry_listener_cancel_token =
328327
setup_telemetry_client(&r.extension_id, logs_agent_channel).await?;
329328

330-
let mut flush_control = FlushControl::new(config.serverless_flush_strategy);
329+
let flush_control = FlushControl::new(config.serverless_flush_strategy);
331330
let mut invocation_context_buffer = InvocationContextBuffer::default();
332331
let mut shutdown = false;
333332

333+
let mut flush_interval = flush_control.get_flush_interval();
334+
flush_interval.tick().await; // discard first tick, which is instantaneous
335+
334336
loop {
335337
let evt = next_event(client, &r.extension_id).await;
336338
match evt {
@@ -360,10 +362,17 @@ async fn extension_loop_active(
360362
}
361363
// Block until we get something from the telemetry API
362364
// Check if flush logic says we should block and flush or not
363-
if flush_control.should_flush() || shutdown {
364-
loop {
365-
let received = event_bus.rx.recv().await;
366-
if let Some(event) = received {
365+
loop {
366+
tokio::select! {
367+
_ = flush_interval.tick() => {
368+
tokio::join!(
369+
logs_flusher.flush(),
370+
metrics_flusher.flush(),
371+
trace_flusher.manual_flush(),
372+
stats_flusher.manual_flush()
373+
);
374+
}
375+
Some(event) = event_bus.rx.recv() => {
367376
match event {
368377
Event::Metric(event) => {
369378
debug!("Metric event: {:?}", event);
@@ -411,12 +420,14 @@ async fn extension_loop_active(
411420
// pass the invocation deadline to
412421
// flush tasks here, so they can
413422
// retry if we have more time
414-
tokio::join!(
415-
logs_flusher.flush(),
416-
metrics_flusher.flush(),
417-
trace_flusher.manual_flush(),
418-
stats_flusher.manual_flush()
419-
);
423+
if flush_control.should_flush_end() {
424+
tokio::join!(
425+
logs_flusher.flush(),
426+
metrics_flusher.flush(),
427+
trace_flusher.manual_flush(),
428+
stats_flusher.manual_flush()
429+
);
430+
}
420431
break;
421432
}
422433
TelemetryRecord::PlatformReport {
@@ -453,8 +464,6 @@ async fn extension_loop_active(
453464
}
454465
},
455466
}
456-
} else {
457-
error!("could not get the event");
458467
}
459468
}
460469
}

bottlecap/src/config/flush_strategy.rs

Lines changed: 66 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ pub struct PeriodicStrategy {
1010
pub enum FlushStrategy {
1111
Default,
1212
End,
13+
EndPeriodically(PeriodicStrategy),
1314
Periodically(PeriodicStrategy),
1415
}
1516

1617
// Deserialize for FlushStrategy
17-
// Flush Strategy can be either "end" or "periodically,<ms>"
18+
// Flush Strategy can be either "end", "end,<ms>", or "periodically,<ms>"
1819
impl<'de> Deserialize<'de> for FlushStrategy {
1920
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
2021
where
@@ -26,22 +27,22 @@ impl<'de> Deserialize<'de> for FlushStrategy {
2627
} else {
2728
let mut split_value = value.as_str().split(',');
2829
// "periodically,60000"
29-
match split_value.next() {
30-
Some(first_value) if first_value.starts_with("periodically") => {
31-
let interval = split_value.next();
32-
// "60000"
33-
if let Some(interval) = interval {
34-
if let Ok(parsed_interval) = interval.parse() {
35-
return Ok(FlushStrategy::Periodically(PeriodicStrategy {
36-
interval: parsed_interval,
37-
}));
38-
}
39-
debug!("Invalid flush interval: {}, using default", interval);
40-
Ok(FlushStrategy::Default)
41-
} else {
42-
debug!("Invalid flush strategy: {}, using default", value);
43-
Ok(FlushStrategy::Default)
44-
}
30+
// "end,1000"
31+
let strategy = split_value.next();
32+
let interval: Option<u64> = split_value.next().and_then(|v| v.parse().ok());
33+
34+
match (strategy, interval) {
35+
(Some("periodically"), Some(interval)) => {
36+
Ok(FlushStrategy::Periodically(PeriodicStrategy { interval }))
37+
}
38+
(Some("end"), Some(interval)) => {
39+
Ok(FlushStrategy::EndPeriodically(PeriodicStrategy {
40+
interval,
41+
}))
42+
}
43+
(Some(strategy), _) => {
44+
debug!("Invalid flush interval: {}, using default", strategy);
45+
Ok(FlushStrategy::Default)
4546
}
4647
_ => {
4748
debug!("Invalid flush strategy: {}, using default", value);
@@ -51,3 +52,51 @@ impl<'de> Deserialize<'de> for FlushStrategy {
5152
}
5253
}
5354
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
60+
#[test]
61+
fn deserialize_end() {
62+
let flush_strategy: FlushStrategy = serde_json::from_str("\"end\"").unwrap();
63+
assert_eq!(flush_strategy, FlushStrategy::End);
64+
}
65+
66+
#[test]
67+
fn deserialize_periodically() {
68+
let flush_strategy: FlushStrategy = serde_json::from_str("\"periodically,60000\"").unwrap();
69+
assert_eq!(
70+
flush_strategy,
71+
FlushStrategy::Periodically(PeriodicStrategy { interval: 60000 })
72+
);
73+
}
74+
75+
#[test]
76+
fn deserialize_end_periodically() {
77+
let flush_strategy: FlushStrategy = serde_json::from_str("\"end,1000\"").unwrap();
78+
assert_eq!(
79+
flush_strategy,
80+
FlushStrategy::EndPeriodically(PeriodicStrategy { interval: 1000 })
81+
);
82+
}
83+
84+
#[test]
85+
fn deserialize_invalid() {
86+
let flush_strategy: FlushStrategy = serde_json::from_str("\"invalid\"").unwrap();
87+
assert_eq!(flush_strategy, FlushStrategy::Default);
88+
}
89+
90+
#[test]
91+
fn deserialize_invalid_interval() {
92+
let flush_strategy: FlushStrategy =
93+
serde_json::from_str("\"periodically,invalid\"").unwrap();
94+
assert_eq!(flush_strategy, FlushStrategy::Default);
95+
}
96+
97+
#[test]
98+
fn deserialize_invalid_end_interval() {
99+
let flush_strategy: FlushStrategy = serde_json::from_str("\"end,invalid\"").unwrap();
100+
assert_eq!(flush_strategy, FlushStrategy::Default);
101+
}
102+
}
Lines changed: 54 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,11 @@
11
use crate::config::flush_strategy::FlushStrategy;
2-
use crate::lifecycle::invocation_times::InvocationTimes;
3-
use ::std::time;
4-
use tracing::debug;
2+
use tokio::time::Interval;
53

6-
const TWENTY_SECONDS: u64 = 20 * 1000;
4+
const DEFAULT_FLUSH_INTERVAL: u64 = 1000; // 1s
75

6+
#[derive(Clone, Copy, Debug, PartialEq)]
87
pub struct FlushControl {
9-
pub last_flush: u64,
108
flush_strategy: FlushStrategy,
11-
invocation_times: InvocationTimes,
129
}
1310

1411
// FlushControl is called at the end of every invocation and decides whether or not we should flush
@@ -26,49 +23,24 @@ pub struct FlushControl {
2623
impl FlushControl {
2724
#[must_use]
2825
pub fn new(flush_strategy: FlushStrategy) -> FlushControl {
29-
FlushControl {
30-
flush_strategy,
31-
last_flush: 0,
32-
invocation_times: InvocationTimes::new(),
33-
}
26+
FlushControl { flush_strategy }
3427
}
3528

36-
pub fn should_flush(&mut self) -> bool {
37-
let now = match time::SystemTime::now().duration_since(time::UNIX_EPOCH) {
38-
Ok(now) => now.as_secs(),
39-
Err(e) => {
40-
debug!("Failed to get current time: {:?}", e);
41-
return false;
42-
}
43-
};
44-
self.invocation_times.add(now);
29+
#[must_use]
30+
pub fn should_flush_end(&self) -> bool {
31+
!matches!(&self.flush_strategy, FlushStrategy::Periodically(_))
32+
}
33+
34+
#[must_use]
35+
pub fn get_flush_interval(&self) -> Interval {
4536
match &self.flush_strategy {
4637
FlushStrategy::Default => {
47-
if self.invocation_times.should_adapt_to_periodic(now) {
48-
let should_periodic_flush = self.should_periodic_flush(now, TWENTY_SECONDS);
49-
debug!(
50-
"Adapting over to periodic flush strategy. should_periodic_flush: {}",
51-
should_periodic_flush
52-
);
53-
return should_periodic_flush;
54-
}
55-
debug!("Not enough invocations to adapt to periodic flush, flushing at the end of the invocation");
56-
self.last_flush = now;
57-
true
38+
tokio::time::interval(tokio::time::Duration::from_millis(DEFAULT_FLUSH_INTERVAL))
5839
}
59-
FlushStrategy::Periodically(periodic) => {
60-
self.should_periodic_flush(now, periodic.interval)
40+
FlushStrategy::Periodically(p) | FlushStrategy::EndPeriodically(p) => {
41+
tokio::time::interval(tokio::time::Duration::from_millis(p.interval))
6142
}
62-
FlushStrategy::End => true,
63-
}
64-
}
65-
66-
fn should_periodic_flush(&mut self, now: u64, interval: u64) -> bool {
67-
if now - self.last_flush > (interval / 1000) {
68-
self.last_flush = now;
69-
true
70-
} else {
71-
false
43+
FlushStrategy::End => tokio::time::interval(tokio::time::Duration::MAX),
7244
}
7345
}
7446
}
@@ -78,32 +50,47 @@ mod tests {
7850
use super::*;
7951
use crate::config::flush_strategy::PeriodicStrategy;
8052

81-
#[test]
82-
fn should_flush_default_end() {
83-
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
84-
assert!(flush_control.should_flush());
85-
}
86-
#[test]
87-
fn should_flush_default_periodic() {
88-
const LOOKBACK_COUNT: usize = 20;
89-
let mut flush_control = super::FlushControl::new(FlushStrategy::Default);
90-
for _ in 0..LOOKBACK_COUNT - 1 {
91-
assert!(flush_control.should_flush());
92-
}
93-
assert!(!flush_control.should_flush());
94-
}
9553
#[test]
9654
fn should_flush_end() {
97-
let mut flush_control = super::FlushControl::new(FlushStrategy::End);
98-
assert!(flush_control.should_flush());
55+
let flush_control = FlushControl::new(FlushStrategy::Default);
56+
assert!(flush_control.should_flush_end());
57+
58+
let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
59+
interval: 1,
60+
}));
61+
assert!(flush_control.should_flush_end());
62+
63+
let flush_control = FlushControl::new(FlushStrategy::End);
64+
assert!(flush_control.should_flush_end());
65+
66+
let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
67+
interval: 1,
68+
}));
69+
assert!(!flush_control.should_flush_end());
9970
}
100-
#[test]
101-
fn should_flush_periodically() {
102-
let mut flush_control =
103-
super::FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
104-
interval: 1,
105-
}));
106-
assert!(flush_control.should_flush());
107-
assert!(!flush_control.should_flush());
71+
72+
#[tokio::test]
73+
async fn get_flush_interval() {
74+
let flush_control = FlushControl::new(FlushStrategy::Default);
75+
assert_eq!(
76+
flush_control.get_flush_interval().period().as_millis(),
77+
DEFAULT_FLUSH_INTERVAL as u128
78+
);
79+
80+
let flush_control = FlushControl::new(FlushStrategy::Periodically(PeriodicStrategy {
81+
interval: 1,
82+
}));
83+
assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1);
84+
85+
let flush_control = FlushControl::new(FlushStrategy::EndPeriodically(PeriodicStrategy {
86+
interval: 1,
87+
}));
88+
assert_eq!(flush_control.get_flush_interval().period().as_millis(), 1);
89+
90+
let flush_control = FlushControl::new(FlushStrategy::End);
91+
assert_eq!(
92+
flush_control.get_flush_interval().period().as_millis(),
93+
tokio::time::Duration::MAX.as_millis()
94+
);
10895
}
10996
}

0 commit comments

Comments
 (0)