diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 9bdf4a496..340e0afb7 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -38,6 +38,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "allocator-api2" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" + [[package]] name = "anyhow" version = "1.0.98" @@ -404,7 +410,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.11.0", "lazy_static", "lazycell", "log", @@ -427,7 +433,7 @@ dependencies = [ "bitflags 2.9.1", "cexpr", "clang-sys", - "itertools 0.12.1", + "itertools 0.11.0", "log", "prettyplease", "proc-macro2", @@ -518,6 +524,7 @@ dependencies = [ "datadog-trace-normalization", "datadog-trace-obfuscation", "datadog-trace-protobuf", + "datadog-trace-stats", "datadog-trace-utils", "ddcommon", "ddsketch-agent", @@ -759,6 +766,14 @@ dependencies = [ "typenum", ] +[[package]] +name = "datadog-ddsketch" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" +dependencies = [ + "prost", +] + [[package]] name = "datadog-fips" version = "0.1.0" @@ -785,8 +800,8 @@ dependencies = [ [[package]] name = "datadog-trace-normalization" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "anyhow", "datadog-trace-protobuf", @@ -794,8 +809,8 @@ dependencies = [ [[package]] name = "datadog-trace-obfuscation" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "anyhow", "datadog-trace-protobuf", @@ -811,18 +826,29 @@ dependencies = [ [[package]] name = "datadog-trace-protobuf" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "prost", "serde", "serde_bytes", ] +[[package]] +name = "datadog-trace-stats" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" +dependencies = [ + "datadog-ddsketch", + "datadog-trace-protobuf", + "datadog-trace-utils", + "hashbrown 0.15.4", +] + [[package]] name = "datadog-trace-utils" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "anyhow", "bytes", @@ -849,8 +875,8 @@ dependencies = [ [[package]] name = "ddcommon" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "anyhow", "cc", @@ -1123,6 +1149,12 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foldhash" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" + [[package]] name = "form_urlencoded" version = "1.2.1" @@ -1340,6 +1372,11 @@ name = "hashbrown" version = "0.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5971ac85611da7067dbfcabef3c70ebb5606018acd9e2a3903a0da507521e0d5" +dependencies = [ + "allocator-api2", + "equivalent", + "foldhash", +] [[package]] name = "headers" @@ -1610,7 +1647,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.6.0", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -1785,15 +1822,6 @@ dependencies = [ "either", ] -[[package]] -name = "itertools" -version = "0.12.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" -dependencies = [ - "either", -] - [[package]] name = "itertools" version = "0.14.0" @@ -2490,7 +2518,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "be769465445e8c1474e9c5dac2018218498557af32d9ed057325ec9a41ae81bf" dependencies = [ "heck", - "itertools 0.14.0", + "itertools 0.11.0", "log", "multimap", "once_cell", @@ -2510,7 +2538,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.14.0", + "itertools 0.11.0", "proc-macro2", "quote", "syn 2.0.104", @@ -3510,8 +3538,8 @@ dependencies = [ [[package]] name = "tinybytes" -version = "20.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=9405db9cb4ef733f3954c3ee77ce71a502e98e50#9405db9cb4ef733f3954c3ee77ce71a502e98e50" +version = "22.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=ba8955394cf35cf24a1a508fbe6264ad84702567#ba8955394cf35cf24a1a508fbe6264ad84702567" dependencies = [ "serde", ] diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index 68d65feee..9a162d5f7 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -56,11 +56,12 @@ ustr = { version = "1.0.0", default-features = false } # be found in the clippy.toml file adjacent to this Cargo.toml. datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222"} ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "c89b58e5784b985819baf11f13f7d35876741222"} -ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" , features = ["mini_agent"] } -datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } -datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "9405db9cb4ef733f3954c3ee77ce71a502e98e50" } +ddcommon = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" } +datadog-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" } +datadog-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" , features = ["mini_agent"] } +datadog-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" } +datadog-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" } +datadog-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "ba8955394cf35cf24a1a508fbe6264ad84702567" } dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "936b3440a1ffc3dd68d040354b721a3042aad47d", default-features = false } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "936b3440a1ffc3dd68d040354b721a3042aad47d", default-features = false } libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] } diff --git a/bottlecap/LICENSE-3rdparty.csv b/bottlecap/LICENSE-3rdparty.csv index 4c88681e1..a2ef6aad7 100644 --- a/bottlecap/LICENSE-3rdparty.csv +++ b/bottlecap/LICENSE-3rdparty.csv @@ -3,6 +3,7 @@ addr2line,https://github.com/gimli-rs/addr2line,Apache-2.0 OR MIT,The addr2line adler2,https://github.com/oyvindln/adler2,0BSD OR MIT OR Apache-2.0,"Jonas Schievink , oyvindln " ahash,https://github.com/tkaitchuck/ahash,MIT OR Apache-2.0,Tom Kaitchuck aho-corasick,https://github.com/BurntSushi/aho-corasick,Unlicense OR MIT,Andrew Gallant +allocator-api2,https://github.com/zakarumych/allocator-api2,MIT OR Apache-2.0,Zakarum anyhow,https://github.com/dtolnay/anyhow,MIT OR Apache-2.0,David Tolnay async-trait,https://github.com/dtolnay/async-trait,MIT OR Apache-2.0,David Tolnay atomic,https://github.com/Amanieu/atomic-rs,Apache-2.0 OR MIT,Amanieu d'Antras @@ -31,11 +32,13 @@ core-foundation-sys,https://github.com/servo/core-foundation-rs,MIT OR Apache-2. cpufeatures,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers crc32fast,https://github.com/srijs/rust-crc32fast,MIT OR Apache-2.0,"Sam Rijs , Alex Crichton " crypto-common,https://github.com/RustCrypto/traits,MIT OR Apache-2.0,RustCrypto Developers +datadog-ddsketch,https://github.com/DataDog/libdatadog,Apache-2.0,The datadog-ddsketch Authors datadog-fips,https://github.com/DataDog/serverless-components,Apache-2.0,The datadog-fips Authors datadog-protos,https://github.com/DataDog/saluki,Apache-2.0,The datadog-protos Authors datadog-trace-normalization,https://github.com/DataDog/libdatadog,Apache-2.0,David Lee datadog-trace-obfuscation,https://github.com/DataDog/libdatadog,Apache-2.0,David Lee datadog-trace-protobuf,https://github.com/DataDog/libdatadog,Apache-2.0,David Lee +datadog-trace-stats,https://github.com/DataDog/libdatadog,Apache-2.0,The datadog-trace-stats Authors datadog-trace-utils,https://github.com/DataDog/libdatadog,Apache-2.0,The datadog-trace-utils Authors ddcommon,https://github.com/DataDog/libdatadog,Apache-2.0,The ddcommon Authors ddsketch-agent,https://github.com/DataDog/saluki,Apache-2.0,The ddsketch-agent Authors @@ -53,6 +56,7 @@ figment,https://github.com/SergioBenitez/Figment,MIT OR Apache-2.0,Sergio Benite flate2,https://github.com/rust-lang/flate2-rs,MIT OR Apache-2.0,"Alex Crichton , Josh Triplett " float-cmp,https://github.com/mikedilger/float-cmp,MIT,Mike Dilger fnv,https://github.com/servo/rust-fnv,Apache-2.0 OR MIT,Alex Crichton +foldhash,https://github.com/orlp/foldhash,Zlib,Orson Peters form_urlencoded,https://github.com/servo/rust-url,MIT OR Apache-2.0,The rust-url developers futures,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures Authors futures-channel,https://github.com/rust-lang/futures-rs,MIT OR Apache-2.0,The futures-channel Authors diff --git a/bottlecap/src/otlp/transform.rs b/bottlecap/src/otlp/transform.rs index f74484c2e..4f098a194 100644 --- a/bottlecap/src/otlp/transform.rs +++ b/bottlecap/src/otlp/transform.rs @@ -861,6 +861,7 @@ pub fn otel_span_to_dd_span( metrics: HashMap::new(), meta_struct: HashMap::new(), span_links: Vec::new(), + span_events: Vec::new(), }; // Set error status diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 31fc32d07..5ee4562b4 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -7,7 +7,6 @@ pub mod proxy_aggregator; pub mod proxy_flusher; pub mod span_pointers; pub mod stats_aggregator; -pub mod stats_concentrator; pub mod stats_concentrator_service; pub mod stats_flusher; pub mod stats_generator; diff --git a/bottlecap/src/traces/stats_aggregator.rs b/bottlecap/src/traces/stats_aggregator.rs index 64437ece2..16e98c1f6 100644 --- a/bottlecap/src/traces/stats_aggregator.rs +++ b/bottlecap/src/traces/stats_aggregator.rs @@ -55,7 +55,9 @@ impl StatsAggregator { // Pull stats data from concentrator match self.concentrator.flush(force_flush).await { Ok(stats) => { - self.queue.extend(stats); + if let Some(stats) = stats { + self.queue.push_back(stats); + } } Err(e) => { error!("Error getting stats from the stats concentrator: {e:?}"); @@ -121,6 +123,8 @@ mod tests { tags: vec![], git_commit_sha: "git_commit_sha".to_string(), image_tag: "image_tag".to_string(), + process_tags: "process_tags".to_string(), + process_tags_hash: 0, }; aggregator.add(payload.clone()); @@ -153,6 +157,8 @@ mod tests { tags: vec![], git_commit_sha: "git_commit_sha".to_string(), image_tag: "image_tag".to_string(), + process_tags: "process_tags".to_string(), + process_tags_hash: 0, }; aggregator.add(payload.clone()); assert_eq!(aggregator.queue.len(), 1); @@ -169,8 +175,8 @@ mod tests { &HashMap::new(), )); let (_, concentrator) = StatsConcentratorService::new(config, tags_provider); - let mut aggregator = StatsAggregator::new(640, concentrator); - // Payload below is 115 bytes + let mut aggregator = StatsAggregator::new(704, concentrator); + // Payload below is 352 bytes let payload = ClientStatsPayload { hostname: "hostname".to_string(), env: "dev".to_string(), @@ -186,6 +192,8 @@ mod tests { tags: vec![], git_commit_sha: "git_commit_sha".to_string(), image_tag: "image_tag".to_string(), + process_tags: "process_tags".to_string(), + process_tags_hash: 0, }; // Add 3 payloads diff --git a/bottlecap/src/traces/stats_concentrator.rs b/bottlecap/src/traces/stats_concentrator.rs deleted file mode 100644 index 8287d770f..000000000 --- a/bottlecap/src/traces/stats_concentrator.rs +++ /dev/null @@ -1,240 +0,0 @@ -use crate::config::Config; -use crate::tags::provider::Provider as TagProvider; -use datadog_trace_protobuf::pb; -use std::time::{SystemTime, UNIX_EPOCH}; -use std::{collections::HashMap, sync::Arc}; -use tracing::error; - -// Event sent to the stats concentrator -#[derive(Clone)] -pub struct StatsEvent { - pub time: u64, - pub aggregation_key: AggregationKey, - pub stats: Stats, -} - -#[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct AggregationKey { - pub env: String, - pub service: String, - // e.g. "aws.lambda.load", "aws.lambda.import" - pub name: String, - // e.g. "my-lambda-function-name", "datadog_lambda.handler", "urllib.request" - pub resource: String, - // e.g. "serverless" - pub r#type: String, - pub is_trace_root: bool, -} - -// Aggregated stats for a time interval across all the aggregation keys. -#[derive(Default)] -struct Bucket { - data: HashMap, -} - -#[derive(Clone, Debug, Default, Copy)] -pub struct Stats { - pub hits: i32, - // in nanoseconds - pub duration: i64, - // error count - pub errors: i32, - pub top_level_hits: f64, -} - -#[derive(Clone, Debug, Default)] -pub struct TracerMetadata { - // e.g. "python" - pub language: String, - // e.g. "3.11.0" - pub tracer_version: String, - // e.g. "f45568ad09d5480b99087d86ebda26e6" - pub runtime_id: String, - pub container_id: String, -} - -pub struct StatsConcentrator { - config: Arc, - tracer_metadata: TracerMetadata, - buckets: HashMap, - hostname: String, -} - -// The number of latest buckets to not flush when force_flush is false. -// For example, if we have buckets with timestamps 10, 20, 40, the current timestamp is 45, -// and NO_FLUSH_BUCKET_COUNT is 3, then we will flush bucket 10 but not bucket 20 or 40. -// Note that the bucket 30 is included in the 3 latest buckets even if it has no data. -// This is to reduce the chance of flushing stats that are still being collected to save some cost. -const NO_FLUSH_BUCKET_COUNT: u64 = 2; - -const S_TO_NS: u64 = 1_000_000_000; - -// The duration of a bucket in nanoseconds. -const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds - -// Aggregates stats into buckets, which are then pulled by the stats aggregator. -impl StatsConcentrator { - #[must_use] - pub fn new(config: Arc, tags_provider: Arc) -> Self { - let hostname = tags_provider.get_canonical_id().unwrap_or_default(); - Self { - config, - buckets: HashMap::new(), - tracer_metadata: TracerMetadata::default(), // to be set when a trace is processed - hostname, - } - } - - pub fn set_tracer_metadata(&mut self, tracer_metadata: &TracerMetadata) { - self.tracer_metadata = tracer_metadata.clone(); - } - - pub fn add(&mut self, stats_event: StatsEvent) { - let bucket_timestamp = Self::get_bucket_timestamp(stats_event.time); - let bucket = self.buckets.entry(bucket_timestamp).or_default(); - - let stats = bucket.data.entry(stats_event.aggregation_key).or_default(); - - stats.hits += stats_event.stats.hits; - stats.errors += stats_event.stats.errors; - stats.duration += stats_event.stats.duration; - stats.top_level_hits += stats_event.stats.top_level_hits; - } - - fn get_bucket_timestamp(timestamp: u64) -> u64 { - timestamp - timestamp % BUCKET_DURATION_NS - } - - // force: If true, flush all stats. If false, flush stats except for the few latest - // buckets, which may still be getting data. - #[must_use] - pub fn flush(&mut self, force: bool) -> Vec { - let current_timestamp: u64 = match SystemTime::now().duration_since(UNIX_EPOCH) { - Ok(duration) => u64::try_from(duration.as_nanos()).unwrap_or_default(), - Err(e) => { - error!("Failed to get current timestamp: {e}, skipping stats flush"); - return Vec::new(); - } - }; - - let mut ret = Vec::new(); - self.buckets.retain(|×tamp, bucket| { - if force || Self::should_flush_bucket(current_timestamp, timestamp) { - // Flush and remove this bucket - for (aggregation_key, stats) in &bucket.data { - ret.push(Self::construct_stats_payload( - &self.config, - timestamp, - aggregation_key, - *stats, - &self.tracer_metadata, - &self.hostname, - )); - } - false - } else { - // Keep this bucket - true - } - }); - - ret - } - - fn should_flush_bucket(current_timestamp: u64, bucket_timestamp: u64) -> bool { - current_timestamp - bucket_timestamp >= BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT - } - - #[allow(clippy::cast_possible_truncation)] - #[allow(clippy::cast_sign_loss)] - fn construct_stats_payload( - config: &Config, - timestamp: u64, - aggregation_key: &AggregationKey, - stats: Stats, - tracer_metadata: &TracerMetadata, - hostname: &str, - ) -> pb::ClientStatsPayload { - pb::ClientStatsPayload { - hostname: hostname.to_string(), - env: aggregation_key.env.clone(), - // Version is not in the trace payload. Need to read it from config. - version: config.version.clone().unwrap_or_default(), - lang: tracer_metadata.language.clone(), - tracer_version: tracer_metadata.tracer_version.clone(), - runtime_id: tracer_metadata.runtime_id.clone(), - // Not supported yet - sequence: 0, - // Not supported yet - agent_aggregation: String::new(), - service: aggregation_key.service.clone(), - container_id: tracer_metadata.container_id.clone(), - // Not supported yet - tags: vec![], - // Not supported yet - git_commit_sha: String::new(), - // Not supported yet - image_tag: String::new(), - stats: vec![pb::ClientStatsBucket { - start: timestamp, - duration: BUCKET_DURATION_NS, - stats: vec![pb::ClientGroupedStats { - service: aggregation_key.service.clone(), - name: aggregation_key.name.clone(), - resource: aggregation_key.resource.clone(), - // Not supported yet - http_status_code: 0, - r#type: aggregation_key.r#type.clone(), - // Reserved field, not currently used by tracers - db_type: String::new(), - hits: stats.hits.try_into().unwrap_or_default(), - errors: stats.errors.try_into().unwrap_or_default(), - duration: stats.duration.try_into().unwrap_or_default(), - // Not supported yet - ok_summary: vec![], - // Not supported yet - error_summary: vec![], - // Not supported yet - synthetics: false, - top_level_hits: stats.top_level_hits.round() as u64, - // Not supported yet - span_kind: String::new(), - // Not supported yet - peer_tags: vec![], - is_trace_root: if aggregation_key.is_trace_root { - pb::Trilean::True.into() - } else { - pb::Trilean::False.into() - }, - }], - // Not supported yet - agent_time_shift: 0, - }], - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_should_flush_bucket_false_when_not_enough_time_passed() { - let bucket_timestamp = 1_000_000_000; - let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT - 1; - assert!( - !StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), - "Should not flush when current_timestamp is less than threshold ahead" - ); - } - - #[test] - fn test_should_flush_bucket_true_when_later() { - let bucket_timestamp = 1_000_000_000; - let current_timestamp = bucket_timestamp + BUCKET_DURATION_NS * NO_FLUSH_BUCKET_COUNT + 1; - assert!( - StatsConcentrator::should_flush_bucket(current_timestamp, bucket_timestamp), - "Should flush when current_timestamp is greater than threshold" - ); - } -} diff --git a/bottlecap/src/traces/stats_concentrator_service.rs b/bottlecap/src/traces/stats_concentrator_service.rs index 9727cd9bc..22a39f088 100644 --- a/bottlecap/src/traces/stats_concentrator_service.rs +++ b/bottlecap/src/traces/stats_concentrator_service.rs @@ -2,17 +2,17 @@ use tokio::sync::{mpsc, oneshot}; use crate::config::Config; use crate::tags::provider::Provider as TagProvider; -use crate::traces::stats_concentrator::StatsConcentrator; -use crate::traces::stats_concentrator::StatsEvent; -use crate::traces::stats_concentrator::TracerMetadata; use datadog_trace_protobuf::pb; -use datadog_trace_protobuf::pb::TracerPayload; -use std::sync::{ - Arc, - atomic::{AtomicBool, Ordering}, -}; +use datadog_trace_protobuf::pb::{ClientStatsPayload, TracerPayload}; +use datadog_trace_stats::span_concentrator::SpanConcentrator; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::time::{Duration, SystemTime}; use tracing::error; +const S_TO_NS: u64 = 1_000_000_000; +const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds + #[derive(Debug, thiserror::Error)] pub enum StatsError { #[error("Failed to send command to concentrator: {0}")] @@ -21,10 +21,22 @@ pub enum StatsError { RecvError(oneshot::error::RecvError), } +#[derive(Clone, Debug, Default)] +pub struct TracerMetadata { + // e.g. "python" + pub language: String, + // e.g. "3.11.0" + pub tracer_version: String, + // e.g. "f45568ad09d5480b99087d86ebda26e6" + pub runtime_id: String, + pub container_id: String, +} + pub enum ConcentratorCommand { SetTracerMetadata(TracerMetadata), - Add(StatsEvent), - Flush(bool, oneshot::Sender>), + // Use a box to reduce the size of the command enum + Add(Box), + Flush(bool, oneshot::Sender>), } pub struct StatsConcentratorHandle { @@ -72,29 +84,28 @@ impl StatsConcentratorHandle { Ok(()) } - pub fn add(&self, stats_event: StatsEvent) -> Result<(), StatsError> { + pub fn add(&self, span: &pb::Span) -> Result<(), StatsError> { self.tx - .send(ConcentratorCommand::Add(stats_event)) + .send(ConcentratorCommand::Add(Box::new(span.clone()))) .map_err(StatsError::SendError)?; Ok(()) } - pub async fn flush( - &self, - force_flush: bool, - ) -> Result, StatsError> { + pub async fn flush(&self, force_flush: bool) -> Result, StatsError> { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(ConcentratorCommand::Flush(force_flush, response_tx)) .map_err(StatsError::SendError)?; - let stats = response_rx.await.map_err(StatsError::RecvError)?; - Ok(stats) + response_rx.await.map_err(StatsError::RecvError) } } pub struct StatsConcentratorService { - concentrator: StatsConcentrator, + concentrator: SpanConcentrator, rx: mpsc::UnboundedReceiver, + tracer_metadata: TracerMetadata, + hostname: String, + config: Arc, } // A service that handles add() and flush() requests in the same queue, @@ -107,8 +118,22 @@ impl StatsConcentratorService { ) -> (Self, StatsConcentratorHandle) { let (tx, rx) = mpsc::unbounded_channel(); let handle = StatsConcentratorHandle::new(tx); - let concentrator = StatsConcentrator::new(config, tags_provider); - let service: StatsConcentratorService = Self { concentrator, rx }; + // TODO: set span_kinds_stats_computed and peer_tag_keys + let concentrator = SpanConcentrator::new( + Duration::from_nanos(BUCKET_DURATION_NS), + SystemTime::now(), + vec![], + vec![], + ); + let hostname = tags_provider.get_canonical_id().unwrap_or_default(); + let service: StatsConcentratorService = Self { + concentrator, + rx, + // To be set when the first trace is received + tracer_metadata: TracerMetadata::default(), + hostname, + config, + }; (service, handle) } @@ -116,16 +141,55 @@ impl StatsConcentratorService { while let Some(command) = self.rx.recv().await { match command { ConcentratorCommand::SetTracerMetadata(tracer_metadata) => { - self.concentrator.set_tracer_metadata(&tracer_metadata); + self.tracer_metadata = tracer_metadata; } - ConcentratorCommand::Add(stats_event) => self.concentrator.add(stats_event), + ConcentratorCommand::Add(span) => self.concentrator.add_span(&*span), ConcentratorCommand::Flush(force_flush, response_tx) => { - let stats = self.concentrator.flush(force_flush); - if let Err(e) = response_tx.send(stats) { - error!("Failed to return trace stats: {e:?}"); - } + self.handle_flush(force_flush, response_tx); } } } } + + fn handle_flush( + &mut self, + force_flush: bool, + response_tx: oneshot::Sender>, + ) { + let stats_buckets = self.concentrator.flush(SystemTime::now(), force_flush); + let stats = if stats_buckets.is_empty() { + None + } else { + Some(ClientStatsPayload { + hostname: self.hostname.clone(), + env: self.config.env.clone().unwrap_or("unknown-env".to_string()), + // Version is not in the trace payload. Need to read it from config. + version: self.config.version.clone().unwrap_or_default(), + lang: self.tracer_metadata.language.clone(), + tracer_version: self.tracer_metadata.tracer_version.clone(), + runtime_id: self.tracer_metadata.runtime_id.clone(), + // Not supported yet + sequence: 0, + // Not supported yet + agent_aggregation: String::new(), + service: self.config.service.clone().unwrap_or_default(), + container_id: self.tracer_metadata.container_id.clone(), + // Not supported yet + tags: vec![], + // Not supported yet + git_commit_sha: String::new(), + // Not supported yet + image_tag: String::new(), + stats: stats_buckets, + // Not supported yet + process_tags: String::new(), + // Not supported yet + process_tags_hash: 0, + }) + }; + let response = response_tx.send(stats); + if let Err(e) = response { + error!("Failed to return trace stats: {e:?}"); + } + } } diff --git a/bottlecap/src/traces/stats_generator.rs b/bottlecap/src/traces/stats_generator.rs index 94c8a1bd2..63724291d 100644 --- a/bottlecap/src/traces/stats_generator.rs +++ b/bottlecap/src/traces/stats_generator.rs @@ -1,4 +1,3 @@ -use crate::traces::stats_concentrator::{AggregationKey, Stats, StatsEvent}; use crate::traces::stats_concentrator_service::StatsConcentratorHandle; use datadog_trace_utils::tracer_payload::TracerPayloadCollection; use tracing::error; @@ -36,31 +35,7 @@ impl StatsGenerator { // Generate stats for each span in the trace for chunk in &trace.chunks { for span in &chunk.spans { - let stats = StatsEvent { - time: span.start.try_into().unwrap_or_default(), - aggregation_key: AggregationKey { - env: span - .meta - .get("env") - .cloned() - .unwrap_or("unknown-env".to_string()), - service: span.service.clone(), - name: span.name.clone(), - resource: span.resource.clone(), - r#type: span.r#type.clone(), - is_trace_root: span.parent_id == 0, - }, - stats: Stats { - hits: 1, - errors: span.error, - duration: span.duration, - top_level_hits: span - .metrics - .get("_dd.top_level") - .map_or(0.0, |v| *v), - }, - }; - if let Err(err) = self.stats_concentrator.add(stats) { + if let Err(err) = self.stats_concentrator.add(span) { error!("Failed to send trace stats: {err}"); return Err(StatsGeneratorError::ConcentratorCommandError(err)); } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index ad5f83535..8618f6b70 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -545,6 +545,7 @@ mod tests { r#type: String::new(), meta_struct: HashMap::new(), span_links: vec![], + span_events: vec![], }; if is_top_level { span.metrics.insert("_top_level".to_string(), 1.0); @@ -806,6 +807,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let child_span = pb::Span { @@ -823,6 +825,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let mut chunk = pb::TraceChunk { @@ -888,6 +891,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let child_span = pb::Span { @@ -905,6 +909,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let mut chunk = pb::TraceChunk { @@ -970,6 +975,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let child_span = pb::Span { @@ -987,6 +993,7 @@ mod tests { r#type: String::new(), span_links: vec![], meta_struct: std::collections::HashMap::new(), + span_events: vec![], }; let mut chunk = pb::TraceChunk {