Skip to content

refactor: PushMetricExporter hide implementation for ResourceMetrics #2957

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use crate::metric::MetricsClient;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;

use super::OtlpHttpClient;

impl MetricsClient for OtlpHttpClient {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
let client = self
.client
.lock()
Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::time::Duration;
mod metrics;

#[cfg(feature = "metrics")]
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;

#[cfg(feature = "logs")]
pub(crate) mod logs;
Expand Down Expand Up @@ -326,7 +326,7 @@ impl OtlpHttpClient {
#[cfg(feature = "metrics")]
fn build_metrics_export_body(
&self,
metrics: &ResourceMetrics,
metrics: ResourceMetrics<'_>,
) -> Option<(Vec<u8>, &'static str)> {
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;

Expand Down
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};

use super::BoxInterceptor;
Expand Down Expand Up @@ -52,7 +52,7 @@ impl TonicMetricsClient {
}

impl MetricsClient for TonicMetricsClient {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
let (mut client, metadata, extensions) = self
.inner
.lock()
Expand Down
9 changes: 4 additions & 5 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ use crate::{ExporterBuildError, NoExporterBuilderSet};
use core::fmt;
use opentelemetry_sdk::error::OTelSdkResult;

use opentelemetry_sdk::metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, Temporality,
};
use opentelemetry_sdk::metrics::exporter::ResourceMetrics;
use opentelemetry_sdk::metrics::{exporter::PushMetricExporter, Temporality};
use std::fmt::{Debug, Formatter};
use std::time::Duration;

Expand Down Expand Up @@ -123,7 +122,7 @@ impl HasHttpConfig for MetricExporterBuilder<HttpExporterBuilderSet> {
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
fn export(
&self,
metrics: &ResourceMetrics,
metrics: ResourceMetrics<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send;
fn shutdown(&self) -> OTelSdkResult;
}
Expand All @@ -149,7 +148,7 @@ impl Debug for MetricExporter {
}

impl PushMetricExporter for MetricExporter {
async fn export(&self, metrics: &ResourceMetrics) -> OTelSdkResult {
async fn export(&self, metrics: ResourceMetrics<'_>) -> OTelSdkResult {
match &self.client {
#[cfg(feature = "grpc-tonic")]
SupportedTransportClient::Tonic(client) => client.export(metrics).await,
Expand Down
40 changes: 25 additions & 15 deletions opentelemetry-proto/src/transform/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ pub mod tonic {
use opentelemetry_sdk::metrics::data::{
AggregatedMetrics, Exemplar as SdkExemplar,
ExponentialHistogram as SdkExponentialHistogram, Gauge as SdkGauge,
Histogram as SdkHistogram, Metric as SdkMetric, MetricData, ResourceMetrics,
ScopeMetrics as SdkScopeMetrics, Sum as SdkSum,
Histogram as SdkHistogram, MetricData, Sum as SdkSum,
};
use opentelemetry_sdk::metrics::exporter::{Metric, ResourceMetrics, ScopeMetrics};
use opentelemetry_sdk::metrics::Temporality;
use opentelemetry_sdk::Resource as SdkResource;

Expand Down Expand Up @@ -110,12 +110,18 @@ pub mod tonic {
}
}

impl From<&ResourceMetrics> for ExportMetricsServiceRequest {
fn from(rm: &ResourceMetrics) -> Self {
impl From<ResourceMetrics<'_>> for ExportMetricsServiceRequest {
fn from(rm: ResourceMetrics<'_>) -> Self {
let mut scope_metrics = Vec::new();
rm.scope_metrics.collect(|mut iter| {
while let Some(scope_metric) = iter.next_scope_metrics() {
scope_metrics.push(scope_metric.into());
}
});
ExportMetricsServiceRequest {
resource_metrics: vec![TonicResourceMetrics {
resource: Some((&rm.resource).into()),
scope_metrics: rm.scope_metrics.iter().map(Into::into).collect(),
resource: Some(rm.resource.into()),
scope_metrics,
schema_url: rm.resource.schema_url().map(Into::into).unwrap_or_default(),
}],
}
Expand All @@ -131,11 +137,15 @@ pub mod tonic {
}
}

impl From<&SdkScopeMetrics> for TonicScopeMetrics {
fn from(sm: &SdkScopeMetrics) -> Self {
impl From<ScopeMetrics<'_>> for TonicScopeMetrics {
fn from(mut sm: ScopeMetrics<'_>) -> Self {
let mut metrics = Vec::new();
while let Some(metric) = sm.metrics.next_metric() {
metrics.push(metric.into());
}
TonicScopeMetrics {
scope: Some((&sm.scope, None).into()),
metrics: sm.metrics.iter().map(Into::into).collect(),
scope: Some((sm.scope, None).into()),
metrics,
schema_url: sm
.scope
.schema_url()
Expand All @@ -145,12 +155,12 @@ pub mod tonic {
}
}

impl From<&SdkMetric> for TonicMetric {
fn from(metric: &SdkMetric) -> Self {
impl From<Metric<'_>> for TonicMetric {
fn from(metric: Metric<'_>) -> Self {
TonicMetric {
name: metric.name.to_string(),
description: metric.description.to_string(),
unit: metric.unit.to_string(),
name: metric.instrument.name.to_string(),
description: metric.instrument.description.to_string(),
unit: metric.instrument.unit.to_string(),
metadata: vec![], // internal and currently unused
data: Some(match &metric.data {
AggregatedMetrics::F64(data) => data.into(),
Expand Down
10 changes: 10 additions & 0 deletions opentelemetry-sdk/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

## vNext

- *Breaking* change for `PushMetricExporter::export` from accepting
`metrics: &ResourceMetrics`, to accepting `metrics: ResourceMetrics<'_>`.
In addition, `ResourceMetrics` was also changed to allow improving underlying
metric collection without any allocations in the future.
[#2957](https://github.com/open-telemetry/opentelemetry-rust/pull/2957)
- *Breaking* change for `Metric::data` field: From dynamic `Box<dyn Aggregation>`
to new enum `AggregatedMetrics`.
[#2857](https://github.com/open-telemetry/opentelemetry-rust/pull/2857)

- **Feature**: Added context based telemetry suppression. [#2868](https://github.com/open-telemetry/opentelemetry-rust/pull/2868)
- `SdkLogger`, `SdkTracer` modified to respect telemetry suppression based on
`Context`. In other words, if the current context has telemetry suppression
Expand Down Expand Up @@ -48,6 +57,7 @@ also modified to suppress telemetry before invoking exporters.
- The `export` method on `PushMetricExporter` now accepts `&ResourceMetrics`
instead of `&mut ResourceMetrics`.


## 0.29.0

Released 2025-Mar-21
Expand Down
12 changes: 7 additions & 5 deletions opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ use opentelemetry::{
use opentelemetry_sdk::{
error::OTelSdkResult,
metrics::{
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream, Temporality, View,
new_view,
reader::{MetricReader, ResourceMetricsData},
Aggregation, Instrument, InstrumentKind, ManualReader, Pipeline, SdkMeterProvider, Stream,
Temporality, View,
},
Resource,
};
Expand All @@ -23,7 +25,7 @@ impl MetricReader for SharedReader {
self.0.register_pipeline(pipeline)
}

fn collect(&self, rm: &mut ResourceMetrics) -> OTelSdkResult {
fn collect(&self, rm: &mut ResourceMetricsData) -> OTelSdkResult {
self.0.collect(rm)
}

Expand Down Expand Up @@ -240,7 +242,7 @@ fn counters(c: &mut Criterion) {
});

let (rdr, cntr) = bench_counter(None, "cumulative");
let mut rm = ResourceMetrics {
let mut rm = ResourceMetricsData {
resource: Resource::builder_empty().build(),
scope_metrics: Vec::new(),
};
Expand Down Expand Up @@ -337,7 +339,7 @@ fn benchmark_collect_histogram(b: &mut Bencher, n: usize) {
h.record(1, &[]);
}

let mut rm = ResourceMetrics {
let mut rm = ResourceMetricsData {
resource: Resource::builder_empty().build(),
scope_metrics: Vec::new(),
};
Expand Down
39 changes: 2 additions & 37 deletions opentelemetry-sdk/src/metrics/data/mod.rs
Original file line number Diff line number Diff line change
@@ -1,46 +1,11 @@
//! Types for delivery of pre-aggregated metric time series data.

use std::{borrow::Cow, time::SystemTime};
use std::time::SystemTime;

use opentelemetry::{InstrumentationScope, KeyValue};

use crate::Resource;
use opentelemetry::KeyValue;

use super::Temporality;

/// A collection of [ScopeMetrics] and the associated [Resource] that created them.
#[derive(Debug)]
pub struct ResourceMetrics {
/// The entity that collected the metrics.
pub resource: Resource,
/// The collection of metrics with unique [InstrumentationScope]s.
pub scope_metrics: Vec<ScopeMetrics>,
}

/// A collection of metrics produced by a meter.
#[derive(Default, Debug)]
pub struct ScopeMetrics {
/// The [InstrumentationScope] that the meter was created with.
pub scope: InstrumentationScope,
/// The list of aggregations created by the meter.
pub metrics: Vec<Metric>,
}

/// A collection of one or more aggregated time series from an [Instrument].
///
/// [Instrument]: crate::metrics::Instrument
#[derive(Debug)]
pub struct Metric {
/// The name of the instrument that created this data.
pub name: Cow<'static, str>,
/// The description of the instrument, which can be used in documentation.
pub description: Cow<'static, str>,
/// The unit in which the instrument reports.
pub unit: Cow<'static, str>,
/// The aggregated data from an instrument.
pub data: AggregatedMetrics,
}

/// Aggregated metrics data from an instrument
#[derive(Debug)]
pub enum AggregatedMetrics {
Expand Down
115 changes: 110 additions & 5 deletions opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,116 @@
//! Interfaces for exporting metrics

use crate::error::OTelSdkResult;
use std::time::Duration;
use opentelemetry::InstrumentationScope;

use crate::metrics::data::ResourceMetrics;
use crate::{error::OTelSdkResult, Resource};
use std::{fmt::Debug, slice::Iter, time::Duration};

use super::Temporality;
use super::{
data::AggregatedMetrics,
reader::{ResourceMetricsData, ScopeMetricsData},
InstrumentInfo, Temporality,
};

/// Stores borrowed metrics and provide a way to collect them
#[derive(Debug)]
pub struct ScopeMetricsCollector<'a> {
iter: ScopeMetricsLendingIter<'a>,
}

impl ScopeMetricsCollector<'_> {
/// Start collecting all metrics
pub fn collect(self, process: impl FnOnce(ScopeMetricsLendingIter<'_>)) {
process(self.iter)
}
}

/// A collection of [`ScopeMetricsCollector`] and the associated [Resource] that created them.
#[derive(Debug)]
pub struct ResourceMetrics<'a> {
/// The entity that collected the metrics.
pub resource: &'a Resource,
/// The collection of metrics with unique [InstrumentationScope]s.
pub scope_metrics: ScopeMetricsCollector<'a>,
}

/// Iterator over libraries instrumentation scopes ([`InstrumentationScope`]) together with metrics.
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
pub struct ScopeMetricsLendingIter<'a> {
iter: Iter<'a, ScopeMetricsData>,
}

/// A collection of metrics produced by a [`InstrumentationScope`] meter.
#[derive(Debug)]
pub struct ScopeMetrics<'a> {
/// The [InstrumentationScope] that the meter was created with.
pub scope: &'a InstrumentationScope,
/// The list of aggregations created by the meter.
pub metrics: MetricsLendingIter<'a>,
}

/// Iterator over aggregations created by the meter.
/// Doesn't implement standard [`Iterator`], because it returns borrowed items. AKA "LendingIterator".
pub struct MetricsLendingIter<'a> {
iter: Iter<'a, super::reader::MetricsData>,
}

/// A collection of one or more aggregated time series from an [Instrument].
///
/// [Instrument]: crate::metrics::Instrument
#[derive(Debug)]
pub struct Metric<'a> {
/// The name of the instrument that created this data.
pub instrument: &'a InstrumentInfo,
/// The aggregated data from an instrument.
pub data: &'a AggregatedMetrics,
}

impl<'a> ResourceMetrics<'a> {
pub(crate) fn new(data: &'a ResourceMetricsData) -> Self {
Self {
resource: &data.resource,
scope_metrics: ScopeMetricsCollector {
iter: ScopeMetricsLendingIter {
iter: data.scope_metrics.iter(),
},
},
}
}
}

impl Debug for ScopeMetricsLendingIter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchScopeMetrics").finish()
}
}

impl ScopeMetricsLendingIter<'_> {
/// Advances the iterator and returns the next value.
pub fn next_scope_metrics(&mut self) -> Option<ScopeMetrics<'_>> {
self.iter.next().map(|item| ScopeMetrics {
scope: &item.scope,
metrics: MetricsLendingIter {
iter: item.metrics.iter(),
},
})
}
}

impl MetricsLendingIter<'_> {
/// Advances the iterator and returns the next value.
pub fn next_metric(&mut self) -> Option<Metric<'_>> {
self.iter.next().map(|item| Metric {
instrument: &item.instrument,
data: &item.data,
})
}
}

impl Debug for MetricsLendingIter<'_> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BatchMetrics").finish()
}
}

/// Exporter handles the delivery of metric data to external receivers.
///
Expand All @@ -18,7 +123,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
/// considered unrecoverable and will be logged.
fn export(
&self,
metrics: &ResourceMetrics,
metrics: ResourceMetrics<'_>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Flushes any metric data held by an exporter.
Expand Down
Loading
Loading