Skip to content

Commit 2971467

Browse files
cijothomaslalitb
andauthored
feat: Add Suppression flag to context (#2821)
Co-authored-by: Lalit Kumar Bhasin <[email protected]>
1 parent f3e93a0 commit 2971467

File tree

5 files changed

+325
-6
lines changed

5 files changed

+325
-6
lines changed

opentelemetry/CHANGELOG.md

+20
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,26 @@
22

33
## vNext
44

5+
Added the ability to prevent recursive telemetry generation through new
6+
context-based suppression mechanisms. This feature helps prevent feedback loops
7+
and excessive telemetry when OpenTelemetry components perform their own
8+
operations.
9+
10+
New methods added to `Context`:
11+
12+
- `is_telemetry_suppressed()` - Checks if telemetry is suppressed in this
13+
context
14+
- `with_telemetry_suppressed()` - Creates a new context with telemetry
15+
suppression enabled
16+
- `is_current_telemetry_suppressed()` - Efficiently checks if the current thread's context
17+
has telemetry suppressed
18+
- `enter_telemetry_suppressed_scope()` - Convenience method to enter a scope where telemetry is
19+
suppressed
20+
21+
These methods allow SDK components, exporters, and processors to temporarily
22+
disable telemetry generation during their internal operations, ensuring more
23+
predictable and efficient observability pipelines.
24+
525
## 0.29.0
626

727
Released 2025-Mar-21

opentelemetry/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@ name = "context_attach"
6464
harness = false
6565
required-features = ["tracing"]
6666

67+
[[bench]]
68+
name = "context_suppression"
69+
harness = false
70+
6771
[[bench]]
6872
name = "baggage"
6973
harness = false
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
use criterion::{black_box, criterion_group, criterion_main, Criterion};
2+
use opentelemetry::Context;
3+
4+
// Run this benchmark with:
5+
// cargo bench --bench context_suppression
6+
7+
// The benchmark results:
8+
// criterion = "0.5.1"
9+
// Hardware: Apple M4 Pro
10+
// Total Number of Cores:   14 (10 performance and 4 efficiency)
11+
// | Benchmark | Time |
12+
// |---------------------------------------|--------|
13+
// | enter_telemetry_suppressed_scope | 8.3 ns |
14+
// | normal_attach | 9.1 ns |
15+
// | is_current_telemetry_suppressed_false | 750 ps |
16+
// | is_current_telemetry_suppressed_true | 750 ps |
17+
18+
fn criterion_benchmark(c: &mut Criterion) {
19+
let mut group = c.benchmark_group("telemetry_suppression");
20+
21+
// Benchmark the cost of entering a suppressed scope
22+
group.bench_function("enter_telemetry_suppressed_scope", |b| {
23+
b.iter(|| {
24+
let _guard = black_box(Context::enter_telemetry_suppressed_scope());
25+
});
26+
});
27+
28+
// For comparison - normal context attach
29+
group.bench_function("normal_attach", |b| {
30+
b.iter(|| {
31+
let _guard = black_box(Context::current().attach());
32+
});
33+
});
34+
35+
// Benchmark checking if current is suppressed (when not suppressed)
36+
group.bench_function("is_current_telemetry_suppressed_false", |b| {
37+
// Make sure we're in a non-suppressed context
38+
let _restore_ctx = Context::current().attach();
39+
b.iter(|| {
40+
let is_suppressed = black_box(Context::is_current_telemetry_suppressed());
41+
black_box(is_suppressed);
42+
});
43+
});
44+
45+
// Benchmark checking if current is suppressed (when suppressed)
46+
group.bench_function("is_current_telemetry_suppressed_true", |b| {
47+
// Enter suppressed context for the duration of the benchmark
48+
let _suppressed_guard = Context::enter_telemetry_suppressed_scope();
49+
b.iter(|| {
50+
let is_suppressed = black_box(Context::is_current_telemetry_suppressed());
51+
black_box(is_suppressed);
52+
});
53+
});
54+
55+
group.finish();
56+
}
57+
58+
criterion_group!(benches, criterion_benchmark);
59+
criterion_main!(benches);

opentelemetry/src/context.rs

+240-4
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ pub struct Context {
9595
#[cfg(feature = "trace")]
9696
pub(crate) span: Option<Arc<SynchronizedSpan>>,
9797
entries: Option<Arc<EntryMap>>,
98+
suppress_telemetry: bool,
9899
}
99100

100101
type EntryMap = HashMap<TypeId, Arc<dyn Any + Sync + Send>, BuildHasherDefault<IdHasher>>;
@@ -242,6 +243,7 @@ impl Context {
242243
entries,
243244
#[cfg(feature = "trace")]
244245
span: self.span.clone(),
246+
suppress_telemetry: self.suppress_telemetry,
245247
}
246248
}
247249

@@ -328,19 +330,97 @@ impl Context {
328330
}
329331
}
330332

333+
/// Returns whether telemetry is suppressed in this context.
334+
#[inline]
335+
pub fn is_telemetry_suppressed(&self) -> bool {
336+
self.suppress_telemetry
337+
}
338+
339+
/// Returns a new context with telemetry suppression enabled.
340+
pub fn with_telemetry_suppressed(&self) -> Self {
341+
Context {
342+
entries: self.entries.clone(),
343+
#[cfg(feature = "trace")]
344+
span: self.span.clone(),
345+
suppress_telemetry: true,
346+
}
347+
}
348+
349+
/// Enters a scope where telemetry is suppressed.
350+
///
351+
/// This method is specifically designed for OpenTelemetry components (like Exporters,
352+
/// Processors etc.) to prevent generating recursive or self-referential
353+
/// telemetry data when performing their own operations.
354+
///
355+
/// Without suppression, we have a telemetry-induced-telemetry situation
356+
/// where, operations like exporting telemetry could generate new telemetry
357+
/// about the export process itself, potentially causing:
358+
/// - Infinite telemetry feedback loops
359+
/// - Excessive resource consumption
360+
///
361+
/// This method:
362+
/// 1. Takes the current context
363+
/// 2. Creates a new context from current, with `suppress_telemetry` set to `true`
364+
/// 3. Attaches it to the current thread
365+
/// 4. Returns a guard that restores the previous context when dropped
366+
///
367+
/// OTel SDK components would check `is_current_telemetry_suppressed()` before
368+
/// generating new telemetry, but not end users.
369+
///
370+
/// # Examples
371+
///
372+
/// ```
373+
/// use opentelemetry::Context;
374+
///
375+
/// // Example: Inside an exporter's implementation
376+
/// fn example_export_function() {
377+
/// // Prevent telemetry-generating operations from creating more telemetry
378+
/// let _guard = Context::enter_telemetry_suppressed_scope();
379+
///
380+
/// // Verify suppression is active
381+
/// assert_eq!(Context::is_current_telemetry_suppressed(), true);
382+
///
383+
/// // Here you would normally perform operations that might generate telemetry
384+
/// // but now they won't because the context has suppression enabled
385+
/// }
386+
///
387+
/// // Demonstrate the function
388+
/// example_export_function();
389+
/// ```
390+
pub fn enter_telemetry_suppressed_scope() -> ContextGuard {
391+
Self::map_current(|cx| cx.with_telemetry_suppressed()).attach()
392+
}
393+
394+
/// Returns whether telemetry is suppressed in the current context.
395+
///
396+
/// This method is used by OpenTelemetry components to determine whether they should
397+
/// generate new telemetry in the current execution context. It provides a performant
398+
/// way to check the suppression state.
399+
///
400+
/// End-users generally should not use this method directly, as it is primarily intended for
401+
/// OpenTelemetry SDK components.
402+
///
403+
///
404+
#[inline]
405+
pub fn is_current_telemetry_suppressed() -> bool {
406+
Self::map_current(|cx| cx.is_telemetry_suppressed())
407+
}
408+
331409
#[cfg(feature = "trace")]
332410
pub(crate) fn current_with_synchronized_span(value: SynchronizedSpan) -> Self {
333-
Context {
411+
Self::map_current(|cx| Context {
334412
span: Some(Arc::new(value)),
335-
entries: Context::map_current(|cx| cx.entries.clone()),
336-
}
413+
entries: cx.entries.clone(),
414+
suppress_telemetry: cx.suppress_telemetry,
415+
})
337416
}
338417

339418
#[cfg(feature = "trace")]
340419
pub(crate) fn with_synchronized_span(&self, value: SynchronizedSpan) -> Self {
341420
Context {
342421
span: Some(Arc::new(value)),
343422
entries: self.entries.clone(),
423+
suppress_telemetry: self.suppress_telemetry,
344424
}
345425
}
346426
}
@@ -359,7 +439,9 @@ impl fmt::Debug for Context {
359439
}
360440
}
361441

362-
dbg.field("entries count", &entries).finish()
442+
dbg.field("entries count", &entries)
443+
.field("suppress_telemetry", &self.suppress_telemetry)
444+
.finish()
363445
}
364446
}
365447

@@ -897,4 +979,158 @@ mod tests {
897979
assert_eq!(Context::current().get::<ValueA>(), None);
898980
assert_eq!(Context::current().get::<ValueB>(), None);
899981
}
982+
983+
#[test]
984+
fn test_is_telemetry_suppressed() {
985+
// Default context has suppression disabled
986+
let cx = Context::new();
987+
assert!(!cx.is_telemetry_suppressed());
988+
989+
// With suppression enabled
990+
let suppressed = cx.with_telemetry_suppressed();
991+
assert!(suppressed.is_telemetry_suppressed());
992+
}
993+
994+
#[test]
995+
fn test_with_telemetry_suppressed() {
996+
// Start with a normal context
997+
let cx = Context::new();
998+
assert!(!cx.is_telemetry_suppressed());
999+
1000+
// Create a suppressed context
1001+
let suppressed = cx.with_telemetry_suppressed();
1002+
1003+
// Original should remain unchanged
1004+
assert!(!cx.is_telemetry_suppressed());
1005+
1006+
// New context should be suppressed
1007+
assert!(suppressed.is_telemetry_suppressed());
1008+
1009+
// Test with values to ensure they're preserved
1010+
let cx_with_value = cx.with_value(ValueA(42));
1011+
let suppressed_with_value = cx_with_value.with_telemetry_suppressed();
1012+
1013+
assert!(!cx_with_value.is_telemetry_suppressed());
1014+
assert!(suppressed_with_value.is_telemetry_suppressed());
1015+
assert_eq!(suppressed_with_value.get::<ValueA>(), Some(&ValueA(42)));
1016+
}
1017+
1018+
#[test]
1019+
fn test_enter_telemetry_suppressed_scope() {
1020+
// Ensure we start with a clean context
1021+
let _reset_guard = Context::new().attach();
1022+
1023+
// Default context should not be suppressed
1024+
assert!(!Context::is_current_telemetry_suppressed());
1025+
1026+
// Add an entry to the current context
1027+
let cx_with_value = Context::current().with_value(ValueA(42));
1028+
let _guard_with_value = cx_with_value.attach();
1029+
1030+
// Verify the entry is present and context is not suppressed
1031+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1032+
assert!(!Context::is_current_telemetry_suppressed());
1033+
1034+
// Enter a suppressed scope
1035+
{
1036+
let _guard = Context::enter_telemetry_suppressed_scope();
1037+
1038+
// Verify suppression is active and the entry is still present
1039+
assert!(Context::is_current_telemetry_suppressed());
1040+
assert!(Context::current().is_telemetry_suppressed());
1041+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1042+
}
1043+
1044+
// After guard is dropped, should be back to unsuppressed and entry should still be present
1045+
assert!(!Context::is_current_telemetry_suppressed());
1046+
assert!(!Context::current().is_telemetry_suppressed());
1047+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(42)));
1048+
}
1049+
1050+
#[test]
1051+
fn test_nested_suppression_scopes() {
1052+
// Ensure we start with a clean context
1053+
let _reset_guard = Context::new().attach();
1054+
1055+
// Default context should not be suppressed
1056+
assert!(!Context::is_current_telemetry_suppressed());
1057+
1058+
// First level suppression
1059+
{
1060+
let _outer = Context::enter_telemetry_suppressed_scope();
1061+
assert!(Context::is_current_telemetry_suppressed());
1062+
1063+
// Second level. This component is unaware of Suppression,
1064+
// and just attaches a new context. Since it is from current,
1065+
// it'll already have suppression enabled.
1066+
{
1067+
let _inner = Context::current().with_value(ValueA(1)).attach();
1068+
assert!(Context::is_current_telemetry_suppressed());
1069+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(1)));
1070+
}
1071+
1072+
// Another scenario. This component is unaware of Suppression,
1073+
// and just attaches a new context, not from Current. Since it is
1074+
// not from current it will not have suppression enabled.
1075+
{
1076+
let _inner = Context::new().with_value(ValueA(1)).attach();
1077+
assert!(!Context::is_current_telemetry_suppressed());
1078+
assert_eq!(Context::current().get::<ValueA>(), Some(&ValueA(1)));
1079+
}
1080+
1081+
// Still suppressed after inner scope
1082+
assert!(Context::is_current_telemetry_suppressed());
1083+
}
1084+
1085+
// Back to unsuppressed
1086+
assert!(!Context::is_current_telemetry_suppressed());
1087+
}
1088+
1089+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1090+
async fn test_async_suppression() {
1091+
async fn nested_operation() {
1092+
assert!(Context::is_current_telemetry_suppressed());
1093+
1094+
let cx_with_additional_value = Context::current().with_value(ValueB(24));
1095+
1096+
async {
1097+
assert_eq!(
1098+
Context::current().get::<ValueB>(),
1099+
Some(&ValueB(24)),
1100+
"Parent value should still be available after adding new value"
1101+
);
1102+
assert!(Context::is_current_telemetry_suppressed());
1103+
1104+
// Do some async work to simulate real-world scenario
1105+
sleep(Duration::from_millis(10)).await;
1106+
1107+
// Values should still be available after async work
1108+
assert_eq!(
1109+
Context::current().get::<ValueB>(),
1110+
Some(&ValueB(24)),
1111+
"Parent value should still be available after adding new value"
1112+
);
1113+
assert!(Context::is_current_telemetry_suppressed());
1114+
}
1115+
.with_context(cx_with_additional_value)
1116+
.await;
1117+
}
1118+
1119+
// Set up suppressed context, but don't attach it to current
1120+
let suppressed_parent = Context::new().with_telemetry_suppressed();
1121+
// Current should not be suppressed as we haven't attached it
1122+
assert!(!Context::is_current_telemetry_suppressed());
1123+
1124+
// Create and run async operation with the suppressed context explicitly propagated
1125+
nested_operation()
1126+
.with_context(suppressed_parent.clone())
1127+
.await;
1128+
1129+
// After async operation completes:
1130+
// Suppression should be active
1131+
assert!(suppressed_parent.is_telemetry_suppressed());
1132+
1133+
// Current should still be not suppressed
1134+
assert!(!Context::is_current_telemetry_suppressed());
1135+
}
9001136
}

0 commit comments

Comments
 (0)