From 11d6035f5a7ad3b3ba0528c9337a876bb1263b47 Mon Sep 17 00:00:00 2001 From: Hayden Stainsby Date: Tue, 10 Oct 2023 16:14:05 +0200 Subject: [PATCH] core: fix race condition in `DefaultCallsite::register` There were two separate race conditions related to registration of callsites. In both cases, it was possible that `event` or `new_span` could be called before `register_callsite` had finished executing for all subscribers. The first case could be invoked when multiple (thread local) subscribers were registering the same callsite and could cause some subscribers to not receive a call to `register_callsite` at all. This case was fixed in #2938. The second case could be invoked when multiple threads reach the same event or span for the first time and can occur in the presence of only a single global default subscriber. The subscriber may receive calls to `event` or `new_span` before the call to `register_callsite` has finished executing. This may occur even with a relatively fast `register_callsite` implentation - although it is less likely. A slow implementation is more likely to trigger the error. This change fixes the race condition by forcing any calls to `DefaultCallsite::register` which run while another thread is registering the same callsite to wait until registration has completed. This is achieved with a loop around the check on the atomic representing the registration state for that callsite. It will hotloop until the registration is complete. Tests have been added to both `tracing-core` and `tracing` which invoke this error case and always fail when testing the previous code. Fixes: #2743 --- tracing-core/src/callsite.rs | 57 ++++---- ...sed_register_callsite_global_subscriber.rs | 130 ++++++++++++++++++ ...ed_register_callsite_local_subscribers.rs} | 0 ...sed_register_callsite_global_subscriber.rs | 93 +++++++++++++ ...ed_register_callsite_local_subscribers.rs} | 0 5 files changed, 255 insertions(+), 25 deletions(-) create mode 100644 tracing-core/tests/missed_register_callsite_global_subscriber.rs rename tracing-core/tests/{missed_register_callsite.rs => missed_register_callsite_local_subscribers.rs} (100%) create mode 100644 tracing/tests/missed_register_callsite_global_subscriber.rs rename tracing/tests/{missed_register_callsite.rs => missed_register_callsite_local_subscribers.rs} (100%) diff --git a/tracing-core/src/callsite.rs b/tracing-core/src/callsite.rs index 66bd0d349b..372e44ff05 100644 --- a/tracing-core/src/callsite.rs +++ b/tracing-core/src/callsite.rs @@ -306,31 +306,38 @@ impl DefaultCallsite { // This only happens once (or if the cached interest value was corrupted). #[cold] pub fn register(&'static self) -> Interest { - // Attempt to advance the registration state to `REGISTERING`... - match self.registration.compare_exchange( - Self::UNREGISTERED, - Self::REGISTERING, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Okay, we advanced the state, try to register the callsite. - CALLSITES.push_default(self); - rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); - self.registration.store(Self::REGISTERED, Ordering::Release); - } - // Great, the callsite is already registered! Just load its - // previous cached interest. - Err(Self::REGISTERED) => {} - // Someone else is registering... - Err(_state) => { - debug_assert_eq!( - _state, - Self::REGISTERING, - "weird callsite registration state" - ); - // Just hit `enabled` this time. - return Interest::sometimes(); + loop { + // Attempt to advance the registration state to `REGISTERING`... + let prev_state = self.registration.compare_exchange( + Self::UNREGISTERED, + Self::REGISTERING, + Ordering::AcqRel, + Ordering::Acquire, + ); + + match prev_state { + Ok(_) => { + // Okay, we advanced the state, try to register the callsite. + CALLSITES.push_default(self); + rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); + self.registration.store(Self::REGISTERED, Ordering::Release); + break; + } + // Great, the callsite is already registered! Just load its + // previous cached interest. + Err(Self::REGISTERED) => break, + // Someone else is registering... + Err(_state) => { + debug_assert_eq!( + _state, + Self::REGISTERING, + "weird callsite registration state: {_state}" + ); + // The callsite is being registered. We have to wait until + // registration is finished, otherwise the register_callsite + // call could be missed completely. + continue; + } } } diff --git a/tracing-core/tests/missed_register_callsite_global_subscriber.rs b/tracing-core/tests/missed_register_callsite_global_subscriber.rs new file mode 100644 index 0000000000..ff6e5137fe --- /dev/null +++ b/tracing-core/tests/missed_register_callsite_global_subscriber.rs @@ -0,0 +1,130 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread, + time::Duration, +}; + +use tracing_core::{ + callsite::{Callsite as _, DefaultCallsite}, + dispatcher, + field::{FieldSet, Value}, + span, Dispatch, Event, Kind, Level, Metadata, Subscriber, +}; + +struct TestSubscriber { + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + Self { + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +fn dispatch_event(idx: usize) { + static CALLSITE: DefaultCallsite = { + // The values of the metadata are unimportant + static META: Metadata<'static> = Metadata::new( + "event ", + "module::path", + Level::INFO, + None, + None, + None, + FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)), + Kind::EVENT, + ); + DefaultCallsite::new(&META) + }; + let _interest = CALLSITE.interest(); + + let meta = CALLSITE.metadata(); + let field = meta.fields().field("message").unwrap(); + let message = format!("event-from-{idx}", idx = idx); + let values = [(&field, Some(&message as &dyn Value))]; + let value_set = CALLSITE.metadata().fields().value_set(&values); + + Event::dispatch(meta, &value_set); +} + +/// Regression test for missing register_callsite call (#2743) +/// +/// This test provides the race condition which causes multiple threads to attempt to register the +/// same callsite in parallel. Previously, if one thread finds that another thread is already in +/// The process of registering a callsite, it would continue on to call `enabled` and then possible +/// `event` or `new_span` which could then be called before ``register_callsite` had completed on +/// the thread actually registering the callsite. +/// +/// Because the test depends on the interaction of multiple dispatchers in different threads, +/// it needs to be in a test file by itself. +#[test] +fn event_before_register() { + let register_sleep_micros = 100; + let subscriber = TestSubscriber::new(register_sleep_micros); + dispatcher::set_global_default(Dispatch::new(subscriber)).unwrap(); + + std::thread::scope(|s| { + for idx in 0..16 { + thread::Builder::new() + .name(format!("event-{idx}")) + .spawn_scoped(s, move || dispatch_event(idx)) + .expect("failed to spawn thread"); + } + }); + +// let subscriber_1_register_sleep_micros = 100; +// let subscriber_2_register_sleep_micros = 0; +// +// let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); +// +// // This delay ensures that the event callsite has interest() called first. +// thread::sleep(Duration::from_micros(50)); +// let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); +// +// jh1.join().expect("failed to join thread"); +// jh2.join().expect("failed to join thread"); +} + + diff --git a/tracing-core/tests/missed_register_callsite.rs b/tracing-core/tests/missed_register_callsite_local_subscribers.rs similarity index 100% rename from tracing-core/tests/missed_register_callsite.rs rename to tracing-core/tests/missed_register_callsite_local_subscribers.rs diff --git a/tracing/tests/missed_register_callsite_global_subscriber.rs b/tracing/tests/missed_register_callsite_global_subscriber.rs new file mode 100644 index 0000000000..c8bfe81f34 --- /dev/null +++ b/tracing/tests/missed_register_callsite_global_subscriber.rs @@ -0,0 +1,93 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread, + time::Duration, +}; + +use tracing::Subscriber; +use tracing_core::{span, Metadata}; + +struct TestSubscriber { + creator_thread: String, + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + let creator_thread = thread::current() + .name() + .unwrap_or("") + .to_owned(); + Self { + creator_thread, + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + println!( + "{creator} from {thread:?}: register_callsite: {callsite:#?}", + creator = self.creator_thread, + callsite = metadata as *const _, + thread = thread::current().name(), + ); + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + println!( + "{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})", + creator = self.creator_thread, + thread = thread::current().name(), + ); + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +#[test] +fn event_before_register() { + let register_sleep_micros = 100; + let subscriber = TestSubscriber::new(register_sleep_micros); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + std::thread::scope(|s| { + for idx in 0..16 { + thread::Builder::new() + .name(format!("event-{idx}")) + .spawn_scoped(s, move || tracing::info!("Thread {} started", idx)) + .expect("failed to spawn thread"); + } + }); +} diff --git a/tracing/tests/missed_register_callsite.rs b/tracing/tests/missed_register_callsite_local_subscribers.rs similarity index 100% rename from tracing/tests/missed_register_callsite.rs rename to tracing/tests/missed_register_callsite_local_subscribers.rs