Skip to content

Commit fc6205d

Browse files
committed
Collect and export with zero allocations and clones
1 parent e2280f7 commit fc6205d

File tree

9 files changed

+156
-131
lines changed

9 files changed

+156
-131
lines changed

opentelemetry-proto/src/transform/metrics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ pub mod tonic {
113113
impl From<ResourceMetrics<'_>> for ExportMetricsServiceRequest {
114114
fn from(mut rm: ResourceMetrics<'_>) -> Self {
115115
let mut scope_metrics = Vec::new();
116-
while let Some(scope_metric) = rm.scope_metrics.next() {
116+
while let Some(scope_metric) = rm.scope_metrics.next_scope_metric() {
117117
scope_metrics.push(scope_metric.into());
118118
}
119119
ExportMetricsServiceRequest {
@@ -138,7 +138,7 @@ pub mod tonic {
138138
impl From<ScopeMetrics<'_>> for TonicScopeMetrics {
139139
fn from(mut sm: ScopeMetrics<'_>) -> Self {
140140
let mut metrics = Vec::new();
141-
while let Some(metric) = sm.metrics.next() {
141+
while let Some(metric) = sm.metrics.next_metric() {
142142
metrics.push(metric.into());
143143
}
144144
TonicScopeMetrics {

opentelemetry-sdk/benches/metric.rs

+4-2
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ use opentelemetry::{
66
use opentelemetry_sdk::{
77
error::OTelSdkResult,
88
metrics::{
9-
data::ResourceMetricsData, new_view, reader::MetricReader, Aggregation, Instrument,
10-
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View,
9+
new_view,
10+
reader::{MetricReader, ResourceMetricsData},
11+
Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream,
12+
Temporality, View,
1113
},
1214
Resource,
1315
};

opentelemetry-sdk/src/metrics/exporter.rs

+49-30
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,15 @@
33
use opentelemetry::InstrumentationScope;
44

55
use crate::{error::OTelSdkResult, Resource};
6-
use std::{fmt::Debug, slice::Iter, time::Duration};
6+
use std::{
7+
fmt::Debug,
8+
slice::Iter,
9+
time::{Duration, SystemTime},
10+
};
711

812
use super::{
9-
data::AggregatedMetrics,
10-
reader::{MetricsData, ResourceMetricsData, ScopeMetricsData},
13+
data::{AggregatedMetrics, Sum},
14+
pipeline::InstrumentSync,
1115
InstrumentInfo, Temporality,
1216
};
1317

@@ -23,7 +27,7 @@ pub struct ResourceMetrics<'a> {
2327
/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics.
2428
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
2529
pub struct ScopeMetricsLendingIter<'a> {
26-
iter: Iter<'a, ScopeMetricsData>,
30+
iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec<InstrumentSync>>,
2731
}
2832

2933
/// A collection of metrics produced by a [`InstrumentationScope`] meter.
@@ -38,7 +42,9 @@ pub struct ScopeMetrics<'a> {
3842
/// Iterator over aggregations created by the meter.
3943
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
4044
pub struct MetricsLendingIter<'a> {
41-
iter: Iter<'a, MetricsData>,
45+
// for optimization purposes
46+
aggr: AggregatedMetrics,
47+
iter: Iter<'a, InstrumentSync>,
4248
}
4349

4450
/// A collection of one or more aggregated time series from an [Instrument].
@@ -53,23 +59,13 @@ pub struct Metric<'a> {
5359
}
5460

5561
impl<'a> ResourceMetrics<'a> {
56-
pub(crate) fn new(rm: &'a ResourceMetricsData) -> Self {
57-
Self {
58-
resource: &rm.resource,
59-
scope_metrics: ScopeMetricsLendingIter {
60-
iter: rm.scope_metrics.iter(),
61-
},
62-
}
63-
}
64-
}
65-
66-
impl<'a> ScopeMetrics<'a> {
67-
fn new(sm: &'a ScopeMetricsData) -> Self {
62+
pub(crate) fn new(
63+
resource: &'a Resource,
64+
iter: std::collections::hash_map::Iter<'a, InstrumentationScope, Vec<InstrumentSync>>,
65+
) -> Self {
6866
Self {
69-
scope: &sm.scope,
70-
metrics: MetricsLendingIter {
71-
iter: sm.metrics.iter(),
72-
},
67+
resource,
68+
scope_metrics: ScopeMetricsLendingIter { iter },
7369
}
7470
}
7571
}
@@ -81,19 +77,42 @@ impl Debug for ScopeMetricsLendingIter<'_> {
8177
}
8278

8379
impl ScopeMetricsLendingIter<'_> {
84-
/// Advances the iterator and returns the next value.
85-
pub fn next(&mut self) -> Option<ScopeMetrics<'_>> {
86-
self.iter.next().map(ScopeMetrics::new)
80+
/// Advances the iterator and returns the next value.
81+
pub fn next_scope_metric(&mut self) -> Option<ScopeMetrics<'_>> {
82+
self.iter.next().map(|(scope, instruments)| ScopeMetrics {
83+
scope,
84+
metrics: MetricsLendingIter {
85+
// doesn't matter what we initialize this with,
86+
// it's purpose is to be reused between collections
87+
aggr: AggregatedMetrics::F64(super::data::MetricData::Sum(Sum {
88+
is_monotonic: true,
89+
data_points: Vec::new(),
90+
start_time: SystemTime::now(),
91+
time: SystemTime::now(),
92+
temporality: Temporality::Cumulative,
93+
})),
94+
iter: instruments.iter(),
95+
},
96+
})
8797
}
8898
}
8999

90100
impl MetricsLendingIter<'_> {
91-
/// Advances the iterator and returns the next value.
92-
pub fn next(&mut self) -> Option<Metric<'_>> {
93-
self.iter.next().map(|metric| Metric {
94-
instrument: &metric.instrument,
95-
data: &metric.data,
96-
})
101+
/// Advances the iterator and returns the next value.
102+
pub fn next_metric(&mut self) -> Option<Metric<'_>> {
103+
loop {
104+
let inst = self.iter.next()?;
105+
let (len, data) = inst.comp_agg.call(Some(&mut self.aggr));
106+
if len > 0 {
107+
if let Some(new_aggr) = data {
108+
self.aggr = new_aggr;
109+
}
110+
return Some(Metric {
111+
instrument: &inst.info,
112+
data: &self.aggr,
113+
});
114+
}
115+
}
97116
}
98117
}
99118

opentelemetry-sdk/src/metrics/in_memory_exporter.rs

+18-12
Original file line numberDiff line numberDiff line change
@@ -194,24 +194,30 @@ impl InMemoryMetricExporter {
194194
.map(|mut metrics_guard| metrics_guard.clear());
195195
}
196196

197-
fn clone_metrics(mut metric: ResourceMetrics<'_>) -> ResourceMetricsData {
197+
fn clone_metrics(mut metric: ResourceMetrics<'_>) -> Option<ResourceMetricsData> {
198198
let mut scope_metrics = Vec::new();
199-
while let Some(mut scope_metric) = metric.scope_metrics.next() {
199+
while let Some(mut scope_metric) = metric.scope_metrics.next_scope_metric() {
200200
let mut metrics = Vec::new();
201-
while let Some(metric) = scope_metric.metrics.next() {
201+
while let Some(metric) = scope_metric.metrics.next_metric() {
202202
metrics.push(MetricsData {
203203
instrument: metric.instrument.clone(),
204-
data: Self::clone_data(&metric.data),
204+
data: Self::clone_data(metric.data),
205+
});
206+
}
207+
if !metrics.is_empty() {
208+
scope_metrics.push(ScopeMetricsData {
209+
scope: scope_metric.scope.clone(),
210+
metrics,
205211
});
206212
}
207-
scope_metrics.push(ScopeMetricsData {
208-
scope: scope_metric.scope.clone(),
209-
metrics,
210-
});
211213
}
212-
ResourceMetricsData {
213-
resource: metric.resource.clone(),
214-
scope_metrics,
214+
if !scope_metrics.is_empty() {
215+
Some(ResourceMetricsData {
216+
resource: metric.resource.clone(),
217+
scope_metrics,
218+
})
219+
} else {
220+
None
215221
}
216222
}
217223

@@ -261,7 +267,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
261267
self.metrics
262268
.lock()
263269
.map(|mut metrics_guard| {
264-
metrics_guard.push_back(InMemoryMetricExporter::clone_metrics(metrics))
270+
metrics_guard.extend(InMemoryMetricExporter::clone_metrics(metrics))
265271
})
266272
.map_err(|_| OTelSdkError::InternalFailure("Failed to lock metrics".to_string()))
267273
}

opentelemetry-sdk/src/metrics/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ pub use periodic_reader::*;
8282
#[cfg(feature = "experimental_metrics_custom_reader")]
8383
pub use pipeline::Pipeline;
8484

85-
#[cfg(feature = "experimental_metrics_custom_reader")]
86-
pub use instrument::InstrumentKind;
85+
pub use instrument::{InstrumentInfo, InstrumentKind};
8786

8887
#[cfg(feature = "spec_unstable_metrics_views")]
8988
pub use instrument::*;

opentelemetry-sdk/src/metrics/periodic_reader.rs

+37-29
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ use crate::{
1616
exporter::{PushMetricExporter, ResourceMetrics},
1717
reader::SdkProducer,
1818
},
19-
Resource,
2019
};
2120

2221
use super::{
@@ -350,11 +349,11 @@ impl<E: PushMetricExporter> fmt::Debug for PeriodicReader<E> {
350349
struct PeriodicReaderInner<E: PushMetricExporter> {
351350
exporter: Arc<E>,
352351
message_sender: mpsc::Sender<Message>,
353-
producer: Mutex<Option<Weak<dyn SdkProducer>>>,
352+
producer: Mutex<Option<Weak<Pipeline>>>,
354353
}
355354

356355
impl<E: PushMetricExporter> PeriodicReaderInner<E> {
357-
fn register_pipeline(&self, producer: Weak<dyn SdkProducer>) {
356+
fn register_pipeline(&self, producer: Weak<Pipeline>) {
358357
let mut inner = self.producer.lock().expect("lock poisoned");
359358
*inner = Some(producer);
360359
}
@@ -384,39 +383,48 @@ impl<E: PushMetricExporter> PeriodicReaderInner<E> {
384383
}
385384

386385
fn collect_and_export(&self) -> OTelSdkResult {
387-
// TODO: Reuse the internal vectors. Or refactor to avoid needing any
388-
// owned data structures to be passed to exporters.
389-
let mut rm = ResourceMetricsData {
390-
resource: Resource::empty(),
391-
scope_metrics: Vec::new(),
386+
let producer = self.producer.lock().expect("lock poisoned");
387+
let pipeline = if let Some(p) = producer.as_ref() {
388+
p.upgrade().ok_or(OTelSdkError::AlreadyShutdown)?
389+
} else {
390+
otel_warn!(
391+
name: "PeriodReader.MeterProviderNotRegistered",
392+
message = "PeriodicReader is not registered with MeterProvider. Metrics will not be collected. \
393+
This occurs when a periodic reader is created but not associated with a MeterProvider \
394+
by calling `.with_reader(reader)` on MeterProviderBuilder."
395+
);
396+
return Err(OTelSdkError::InternalFailure(
397+
"MeterProvider is not registered".into(),
398+
));
392399
};
393-
400+
drop(producer);
401+
let Ok(inner) = pipeline.inner.lock() else {
402+
otel_warn!(
403+
name: "PeriodReader.PipelineLockPoisoned",
404+
message = "Failed to acquire lock for collect and export"
405+
);
406+
return Err(OTelSdkError::InternalFailure(
407+
"Paniced while holding a pipeline lock".into(),
408+
));
409+
};
410+
for cb in &inner.callbacks {
411+
cb();
412+
}
394413
let current_time = Instant::now();
395-
let collect_result = self.collect(&mut rm);
396-
let time_taken_for_collect = current_time.elapsed();
414+
// Relying on futures executor to execute async call.
415+
let res = futures_executor::block_on(self.exporter.export(ResourceMetrics::new(
416+
&pipeline.resource,
417+
inner.aggregations.iter(),
418+
)));
419+
otel_debug!(name: "PeriodicReaderMetricsCollected", time_taken_in_millis = current_time.elapsed().as_millis());
397420

398-
#[allow(clippy::question_mark)]
399-
if let Err(e) = collect_result {
421+
if let Err(err) = &res {
400422
otel_warn!(
401423
name: "PeriodReaderCollectError",
402-
error = format!("{:?}", e)
424+
error = format!("{:?}", err)
403425
);
404-
return Err(OTelSdkError::InternalFailure(e.to_string()));
405426
}
406-
407-
if rm.scope_metrics.is_empty() {
408-
otel_debug!(name: "NoMetricsCollected");
409-
return Ok(());
410-
}
411-
412-
let metrics_count = rm.scope_metrics.iter().fold(0, |count, scope_metrics| {
413-
count + scope_metrics.metrics.len()
414-
});
415-
otel_debug!(name: "PeriodicReaderMetricsCollected", count = metrics_count, time_taken_in_millis = time_taken_for_collect.as_millis());
416-
417-
// Relying on futures executor to execute async call.
418-
// TODO: Pass timeout to exporter
419-
futures_executor::block_on(self.exporter.export(ResourceMetrics::new(&rm)))
427+
res
420428
}
421429

422430
fn force_flush(&self) -> OTelSdkResult {

0 commit comments

Comments
 (0)