diff --git a/opentelemetry-sdk/benches/attribute_set.rs b/opentelemetry-sdk/benches/attribute_set.rs index d63b14ab10..d6d2887bba 100644 --- a/opentelemetry-sdk/benches/attribute_set.rs +++ b/opentelemetry-sdk/benches/attribute_set.rs @@ -1,6 +1,6 @@ use criterion::{criterion_group, criterion_main, Criterion}; use opentelemetry::KeyValue; -use opentelemetry_sdk::AttributeSet; +use opentelemetry_sdk::metrics::AttributeSet; // Run this benchmark with: // cargo bench --bench attribute_set --features=metrics diff --git a/opentelemetry-sdk/src/attributes/mod.rs b/opentelemetry-sdk/src/attributes/mod.rs deleted file mode 100644 index 1182e996fb..0000000000 --- a/opentelemetry-sdk/src/attributes/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod set; - -pub use set::AttributeSet; diff --git a/opentelemetry-sdk/src/attributes/set.rs b/opentelemetry-sdk/src/attributes/set.rs deleted file mode 100644 index 7775d20692..0000000000 --- a/opentelemetry-sdk/src/attributes/set.rs +++ /dev/null @@ -1,184 +0,0 @@ -use std::collections::hash_map::DefaultHasher; -use std::collections::HashSet; -use std::{ - cmp::Ordering, - hash::{Hash, Hasher}, -}; - -use opentelemetry::{Array, Key, KeyValue, Value}; -use ordered_float::OrderedFloat; - -#[derive(Clone, Debug)] -struct HashKeyValue(KeyValue); - -impl Hash for HashKeyValue { - fn hash(&self, state: &mut H) { - self.0.key.hash(state); - match &self.0.value { - Value::F64(f) => OrderedFloat(*f).hash(state), - Value::Array(a) => match a { - Array::Bool(b) => b.hash(state), - Array::I64(i) => i.hash(state), - Array::F64(f) => f.iter().for_each(|f| OrderedFloat(*f).hash(state)), - Array::String(s) => s.hash(state), - }, - Value::Bool(b) => b.hash(state), - Value::I64(i) => i.hash(state), - Value::String(s) => s.hash(state), - }; - } -} - -impl PartialOrd for HashKeyValue { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl Ord for HashKeyValue { - fn cmp(&self, other: &Self) -> Ordering { - self.0.key.cmp(&other.0.key) - } -} - -impl PartialEq for HashKeyValue { - fn eq(&self, other: &Self) -> bool { - self.0.key == other.0.key - && match (&self.0.value, &other.0.value) { - (Value::F64(f), Value::F64(of)) => OrderedFloat(*f).eq(&OrderedFloat(*of)), - (Value::Array(Array::F64(f)), Value::Array(Array::F64(of))) => { - f.len() == of.len() - && f.iter() - .zip(of.iter()) - .all(|(f, of)| OrderedFloat(*f).eq(&OrderedFloat(*of))) - } - (non_float, other_non_float) => non_float.eq(other_non_float), - } - } -} - -impl Eq for HashKeyValue {} - -/// A unique set of attributes that can be used as instrument identifiers. -/// -/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as -/// HashMap keys and other de-duplication methods. -#[derive(Clone, Default, Debug, PartialEq, Eq)] -pub struct AttributeSet(Vec, u64); - -impl From<&[KeyValue]> for AttributeSet { - fn from(values: &[KeyValue]) -> Self { - let mut seen_keys = HashSet::with_capacity(values.len()); - let vec = values - .iter() - .rev() - .filter_map(|kv| { - if seen_keys.insert(kv.key.clone()) { - Some(HashKeyValue(kv.clone())) - } else { - None - } - }) - .collect::>(); - - AttributeSet::new(vec) - } -} - -fn calculate_hash(values: &[HashKeyValue]) -> u64 { - let mut hasher = DefaultHasher::new(); - values.iter().fold(&mut hasher, |mut hasher, item| { - item.hash(&mut hasher); - hasher - }); - hasher.finish() -} - -impl AttributeSet { - fn new(mut values: Vec) -> Self { - values.sort_unstable(); - let hash = calculate_hash(&values); - AttributeSet(values, hash) - } - - /// Returns the number of elements in the set. - pub fn len(&self) -> usize { - self.0.len() - } - - /// Returns `true` if the set contains no elements. - pub fn is_empty(&self) -> bool { - self.0.is_empty() - } - - /// Retains only the attributes specified by the predicate. - pub fn retain(&mut self, f: F) - where - F: Fn(&KeyValue) -> bool, - { - self.0.retain(|kv| f(&kv.0)); - - // Recalculate the hash as elements are changed. - self.1 = calculate_hash(&self.0); - } - - /// Iterate over key value pairs in the set - pub fn iter(&self) -> impl Iterator { - self.0.iter().map(|kv| (&kv.0.key, &kv.0.value)) - } -} - -impl Hash for AttributeSet { - fn hash(&self, state: &mut H) { - state.write_u64(self.1) - } -} - -#[cfg(test)] -mod tests { - use std::hash::DefaultHasher; - use std::hash::{Hash, Hasher}; - - use crate::attributes::set::HashKeyValue; - use opentelemetry::KeyValue; - - #[test] - fn equality_kv_float() { - let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); - let kv2 = HashKeyValue(KeyValue::new("key", 1.0)); - assert_eq!(kv1, kv2); - - let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); - let kv2 = HashKeyValue(KeyValue::new("key", 1.01)); - assert_ne!(kv1, kv2); - - let kv1 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); - let kv2 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); - assert_eq!(kv1, kv2); - - let kv1 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); - let kv2 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); - assert_eq!(kv1, kv2); - } - - #[test] - fn hash_kv_float() { - let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); - let kv2 = HashKeyValue(KeyValue::new("key", 1.0)); - assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); - - let kv1 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); - let kv2 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); - assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); - - let kv1 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); - let kv2 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); - assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); - } - - fn hash_helper(item: &T) -> u64 { - let mut hasher = DefaultHasher::new(); - item.hash(&mut hasher); - hasher.finish() - } -} diff --git a/opentelemetry-sdk/src/lib.rs b/opentelemetry-sdk/src/lib.rs index 99bdd5f942..852f0b8327 100644 --- a/opentelemetry-sdk/src/lib.rs +++ b/opentelemetry-sdk/src/lib.rs @@ -120,7 +120,6 @@ )] #![cfg_attr(test, deny(warnings))] -pub(crate) mod attributes; pub mod export; mod instrumentation; #[cfg(feature = "logs")] @@ -146,7 +145,6 @@ pub mod trace; #[doc(hidden)] pub mod util; -pub use attributes::*; pub use instrumentation::{InstrumentationLibrary, Scope}; #[doc(inline)] pub use resource::Resource; diff --git a/opentelemetry-sdk/src/metrics/data/mod.rs b/opentelemetry-sdk/src/metrics/data/mod.rs index bd3800f254..fd4c85c35f 100644 --- a/opentelemetry-sdk/src/metrics/data/mod.rs +++ b/opentelemetry-sdk/src/metrics/data/mod.rs @@ -4,7 +4,7 @@ use std::{any, borrow::Cow, fmt, time::SystemTime}; use opentelemetry::{metrics::Unit, KeyValue}; -use crate::{attributes::AttributeSet, instrumentation::Scope, Resource}; +use crate::{instrumentation::Scope, Resource}; pub use self::temporality::Temporality; @@ -210,7 +210,7 @@ impl Aggregation for ExponentialHistogram #[derive(Debug)] pub struct ExponentialHistogramDataPoint { /// The set of key value pairs that uniquely identify the time series. - pub attributes: AttributeSet, + pub attributes: Vec, /// When the time series was started. pub start_time: SystemTime, /// The time when the time series was recorded. diff --git a/opentelemetry-sdk/src/metrics/instrument.rs b/opentelemetry-sdk/src/metrics/instrument.rs index 3ecae355b5..5b8ef5c6e5 100644 --- a/opentelemetry-sdk/src/metrics/instrument.rs +++ b/opentelemetry-sdk/src/metrics/instrument.rs @@ -9,8 +9,8 @@ use opentelemetry::{ }; use crate::{ - attributes::AttributeSet, instrumentation::Scope, + metrics::AttributeSet, metrics::{aggregation::Aggregation, internal::Measure}, }; diff --git a/opentelemetry-sdk/src/metrics/internal/aggregate.rs b/opentelemetry-sdk/src/metrics/internal/aggregate.rs index 4b4e7a6b07..13e63b33c3 100644 --- a/opentelemetry-sdk/src/metrics/internal/aggregate.rs +++ b/opentelemetry-sdk/src/metrics/internal/aggregate.rs @@ -5,7 +5,7 @@ use opentelemetry::KeyValue; use crate::{ metrics::data::{Aggregation, Gauge, Temporality}, - AttributeSet, + metrics::AttributeSet, }; use super::{ @@ -217,7 +217,7 @@ mod tests { DataPoint, ExponentialBucket, ExponentialHistogram, ExponentialHistogramDataPoint, Histogram, HistogramDataPoint, Sum, }; - use std::time::SystemTime; + use std::{time::SystemTime, vec}; use super::*; @@ -382,7 +382,7 @@ mod tests { .exponential_bucket_histogram(4, 20, true, true); let mut a = ExponentialHistogram { data_points: vec![ExponentialHistogramDataPoint { - attributes: AttributeSet::from(&[KeyValue::new("a2", 2)][..]), + attributes: vec![KeyValue::new("a1", 1)], start_time: SystemTime::now(), time: SystemTime::now(), count: 2, @@ -417,10 +417,7 @@ mod tests { assert!(new_agg.is_none()); assert_eq!(a.temporality, temporality); assert_eq!(a.data_points.len(), 1); - assert_eq!( - a.data_points[0].attributes, - AttributeSet::from(&new_attributes[..]) - ); + assert_eq!(a.data_points[0].attributes, new_attributes.to_vec()); assert_eq!(a.data_points[0].count, 1); assert_eq!(a.data_points[0].min, Some(3)); assert_eq!(a.data_points[0].max, Some(3)); diff --git a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs index 189b61c553..df58e556c4 100644 --- a/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/exponential_histogram.rs @@ -1,11 +1,11 @@ use std::{collections::HashMap, f64::consts::LOG2_E, sync::Mutex, time::SystemTime}; use once_cell::sync::Lazy; -use opentelemetry::metrics::MetricsError; +use opentelemetry::{metrics::MetricsError, KeyValue}; use crate::{ metrics::data::{self, Aggregation, Temporality}, - AttributeSet, + metrics::AttributeSet, }; use super::Number; @@ -396,7 +396,10 @@ impl> ExpoHistogram { for (a, b) in values.drain() { h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a, + attributes: a + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) + .collect(), start_time: start, time: t, count: b.count, @@ -474,7 +477,10 @@ impl> ExpoHistogram { // overload the system. for (a, b) in values.iter() { h.data_points.push(data::ExponentialHistogramDataPoint { - attributes: a.clone(), + attributes: a + .iter() + .map(|(k, v)| KeyValue::new(k.clone(), v.clone())) + .collect(), start_time: start, time: t, count: b.count, @@ -1260,7 +1266,7 @@ mod tests { want: data::ExponentialHistogram { temporality: Temporality::Delta, data_points: vec![data::ExponentialHistogramDataPoint { - attributes: AttributeSet::default(), + attributes: vec![], count: 6, min: Some(1.into()), max: Some(16.into()), @@ -1303,7 +1309,7 @@ mod tests { want: data::ExponentialHistogram { temporality: Temporality::Cumulative, data_points: vec![data::ExponentialHistogramDataPoint { - attributes: AttributeSet::default(), + attributes: vec![], count: 6, min: Some(1.into()), max: Some(16.into()), @@ -1349,7 +1355,7 @@ mod tests { want: data::ExponentialHistogram { temporality: Temporality::Delta, data_points: vec![data::ExponentialHistogramDataPoint { - attributes: AttributeSet::default(), + attributes: vec![], count: 6, min: Some(1.into()), max: Some(16.into()), @@ -1404,7 +1410,7 @@ mod tests { offset: -1, counts: vec![1, 6, 2], }, - attributes: AttributeSet::default(), + attributes: vec![], start_time: SystemTime::now(), time: SystemTime::now(), negative_bucket: data::ExponentialBucket { diff --git a/opentelemetry-sdk/src/metrics/internal/histogram.rs b/opentelemetry-sdk/src/metrics/internal/histogram.rs index b94d476a40..88cd34955f 100644 --- a/opentelemetry-sdk/src/metrics/internal/histogram.rs +++ b/opentelemetry-sdk/src/metrics/internal/histogram.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, sync::Mutex, time::SystemTime}; use crate::metrics::data::{self, Aggregation, Temporality}; -use crate::{attributes::AttributeSet, metrics::data::HistogramDataPoint}; +use crate::{metrics::data::HistogramDataPoint, metrics::AttributeSet}; use opentelemetry::KeyValue; use opentelemetry::{global, metrics::MetricsError}; diff --git a/opentelemetry-sdk/src/metrics/internal/last_value.rs b/opentelemetry-sdk/src/metrics/internal/last_value.rs index c734dd445f..d471654cd3 100644 --- a/opentelemetry-sdk/src/metrics/internal/last_value.rs +++ b/opentelemetry-sdk/src/metrics/internal/last_value.rs @@ -4,7 +4,7 @@ use std::{ time::SystemTime, }; -use crate::{attributes::AttributeSet, metrics::data::DataPoint}; +use crate::{metrics::data::DataPoint, metrics::AttributeSet}; use opentelemetry::{global, metrics::MetricsError, KeyValue}; use super::{ diff --git a/opentelemetry-sdk/src/metrics/internal/sum.rs b/opentelemetry-sdk/src/metrics/internal/sum.rs index fdf604389d..b46f4c952c 100644 --- a/opentelemetry-sdk/src/metrics/internal/sum.rs +++ b/opentelemetry-sdk/src/metrics/internal/sum.rs @@ -6,8 +6,8 @@ use std::{ time::SystemTime, }; -use crate::attributes::AttributeSet; use crate::metrics::data::{self, Aggregation, DataPoint, Temporality}; +use crate::metrics::AttributeSet; use opentelemetry::KeyValue; use opentelemetry::{global, metrics::MetricsError}; diff --git a/opentelemetry-sdk/src/metrics/mod.rs b/opentelemetry-sdk/src/metrics/mod.rs index 400473693b..534226ea53 100644 --- a/opentelemetry-sdk/src/metrics/mod.rs +++ b/opentelemetry-sdk/src/metrics/mod.rs @@ -64,6 +64,137 @@ pub use periodic_reader::*; pub use pipeline::Pipeline; pub use view::*; +use std::collections::hash_map::DefaultHasher; +use std::collections::HashSet; +use std::{ + cmp::Ordering, + hash::{Hash, Hasher}, +}; + +use opentelemetry::{Array, Key, KeyValue, Value}; +use ordered_float::OrderedFloat; + +#[derive(Clone, Debug)] +struct HashKeyValue(KeyValue); + +impl Hash for HashKeyValue { + fn hash(&self, state: &mut H) { + self.0.key.hash(state); + match &self.0.value { + Value::F64(f) => OrderedFloat(*f).hash(state), + Value::Array(a) => match a { + Array::Bool(b) => b.hash(state), + Array::I64(i) => i.hash(state), + Array::F64(f) => f.iter().for_each(|f| OrderedFloat(*f).hash(state)), + Array::String(s) => s.hash(state), + }, + Value::Bool(b) => b.hash(state), + Value::I64(i) => i.hash(state), + Value::String(s) => s.hash(state), + }; + } +} + +impl PartialOrd for HashKeyValue { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for HashKeyValue { + fn cmp(&self, other: &Self) -> Ordering { + self.0.key.cmp(&other.0.key) + } +} + +impl PartialEq for HashKeyValue { + fn eq(&self, other: &Self) -> bool { + self.0.key == other.0.key + && match (&self.0.value, &other.0.value) { + (Value::F64(f), Value::F64(of)) => OrderedFloat(*f).eq(&OrderedFloat(*of)), + (Value::Array(Array::F64(f)), Value::Array(Array::F64(of))) => { + f.len() == of.len() + && f.iter() + .zip(of.iter()) + .all(|(f, of)| OrderedFloat(*f).eq(&OrderedFloat(*of))) + } + (non_float, other_non_float) => non_float.eq(other_non_float), + } + } +} + +impl Eq for HashKeyValue {} + +/// A unique set of attributes that can be used as instrument identifiers. +/// +/// This must implement [Hash], [PartialEq], and [Eq] so it may be used as +/// HashMap keys and other de-duplication methods. +#[derive(Clone, Default, Debug, PartialEq, Eq)] +pub struct AttributeSet(Vec, u64); + +impl From<&[KeyValue]> for AttributeSet { + fn from(values: &[KeyValue]) -> Self { + let mut seen_keys = HashSet::with_capacity(values.len()); + let vec = values + .iter() + .rev() + .filter_map(|kv| { + if seen_keys.insert(kv.key.clone()) { + Some(HashKeyValue(kv.clone())) + } else { + None + } + }) + .collect::>(); + + AttributeSet::new(vec) + } +} + +fn calculate_hash(values: &[HashKeyValue]) -> u64 { + let mut hasher = DefaultHasher::new(); + values.iter().fold(&mut hasher, |mut hasher, item| { + item.hash(&mut hasher); + hasher + }); + hasher.finish() +} + +impl AttributeSet { + fn new(mut values: Vec) -> Self { + values.sort_unstable(); + let hash = calculate_hash(&values); + AttributeSet(values, hash) + } + + /// Returns `true` if the set contains no elements. + pub fn is_empty(&self) -> bool { + self.0.is_empty() + } + + /// Retains only the attributes specified by the predicate. + pub fn retain(&mut self, f: F) + where + F: Fn(&KeyValue) -> bool, + { + self.0.retain(|kv| f(&kv.0)); + + // Recalculate the hash as elements are changed. + self.1 = calculate_hash(&self.0); + } + + /// Iterate over key value pairs in the set + pub fn iter(&self) -> impl Iterator { + self.0.iter().map(|kv| (&kv.0.key, &kv.0.value)) + } +} + +impl Hash for AttributeSet { + fn hash(&self, state: &mut H) { + state.write_u64(self.1) + } +} + #[cfg(all(test, feature = "testing"))] mod tests { use self::data::{DataPoint, ScopeMetrics}; @@ -78,11 +209,53 @@ mod tests { KeyValue, }; use std::borrow::Cow; + use std::hash::DefaultHasher; + use std::hash::{Hash, Hasher}; // Run all tests in this mod // cargo test metrics::tests --features=metrics,testing - // Note for all tests in this mod: + #[test] + fn equality_kv_float() { + let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); + let kv2 = HashKeyValue(KeyValue::new("key", 1.0)); + assert_eq!(kv1, kv2); + + let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); + let kv2 = HashKeyValue(KeyValue::new("key", 1.01)); + assert_ne!(kv1, kv2); + + let kv1 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); + let kv2 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); + assert_eq!(kv1, kv2); + + let kv1 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); + let kv2 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); + assert_eq!(kv1, kv2); + } + + #[test] + fn hash_kv_float() { + let kv1 = HashKeyValue(KeyValue::new("key", 1.0)); + let kv2 = HashKeyValue(KeyValue::new("key", 1.0)); + assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); + + let kv1 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); + let kv2 = HashKeyValue(KeyValue::new("key", std::f64::NAN)); + assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); + + let kv1 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); + let kv2 = HashKeyValue(KeyValue::new("key", std::f64::INFINITY)); + assert_eq!(hash_helper(&kv1), hash_helper(&kv2)); + } + + fn hash_helper(item: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + item.hash(&mut hasher); + hasher.finish() + } + + // Note for all tests from this point onwards in this mod: // "multi_thread" tokio flavor must be used else flush won't // be able to make progress! diff --git a/opentelemetry-stdout/src/common.rs b/opentelemetry-stdout/src/common.rs index 9b313f9e38..bd6f178969 100644 --- a/opentelemetry-stdout/src/common.rs +++ b/opentelemetry-stdout/src/common.rs @@ -12,17 +12,6 @@ use serde::{Serialize, Serializer}; #[derive(Debug, Serialize, Clone, Hash, Eq, PartialEq)] pub(crate) struct AttributeSet(pub BTreeMap); -impl From<&opentelemetry_sdk::AttributeSet> for AttributeSet { - fn from(value: &opentelemetry_sdk::AttributeSet) -> Self { - AttributeSet( - value - .iter() - .map(|(key, value)| (Key::from(key.clone()), Value::from(value.clone()))) - .collect(), - ) - } -} - impl From<&opentelemetry_sdk::Resource> for AttributeSet { fn from(value: &opentelemetry_sdk::Resource) -> Self { AttributeSet( diff --git a/opentelemetry-stdout/src/metrics/transform.rs b/opentelemetry-stdout/src/metrics/transform.rs index 5f1b6c444c..49aa662696 100644 --- a/opentelemetry-stdout/src/metrics/transform.rs +++ b/opentelemetry-stdout/src/metrics/transform.rs @@ -1,4 +1,4 @@ -use crate::common::{AttributeSet, KeyValue, Resource, Scope}; +use crate::common::{KeyValue, Resource, Scope}; use opentelemetry::{global, metrics::MetricsError}; use opentelemetry_sdk::metrics::data; use serde::{Serialize, Serializer}; @@ -341,7 +341,7 @@ impl + Copy> From<&data::ExponentialHistogram> for Exponen #[derive(Serialize, Debug, Clone)] #[serde(rename_all = "camelCase")] struct ExponentialHistogramDataPoint { - attributes: AttributeSet, + attributes: Vec, #[serde(serialize_with = "as_unix_nano")] start_time_unix_nano: SystemTime, #[serde(serialize_with = "as_unix_nano")] @@ -368,7 +368,7 @@ impl + Copy> From<&data::ExponentialHistogramDataPoint> { fn from(value: &data::ExponentialHistogramDataPoint) -> Self { ExponentialHistogramDataPoint { - attributes: AttributeSet::from(&value.attributes), + attributes: value.attributes.iter().map(Into::into).collect(), start_time_unix_nano: value.start_time, time_unix_nano: value.time, start_time: value.start_time,