diff --git a/tracing-core/src/callsite.rs b/tracing-core/src/callsite.rs index 66bd0d349b..372e44ff05 100644 --- a/tracing-core/src/callsite.rs +++ b/tracing-core/src/callsite.rs @@ -306,31 +306,38 @@ impl DefaultCallsite { // This only happens once (or if the cached interest value was corrupted). #[cold] pub fn register(&'static self) -> Interest { - // Attempt to advance the registration state to `REGISTERING`... - match self.registration.compare_exchange( - Self::UNREGISTERED, - Self::REGISTERING, - Ordering::AcqRel, - Ordering::Acquire, - ) { - Ok(_) => { - // Okay, we advanced the state, try to register the callsite. - CALLSITES.push_default(self); - rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); - self.registration.store(Self::REGISTERED, Ordering::Release); - } - // Great, the callsite is already registered! Just load its - // previous cached interest. - Err(Self::REGISTERED) => {} - // Someone else is registering... - Err(_state) => { - debug_assert_eq!( - _state, - Self::REGISTERING, - "weird callsite registration state" - ); - // Just hit `enabled` this time. - return Interest::sometimes(); + loop { + // Attempt to advance the registration state to `REGISTERING`... + let prev_state = self.registration.compare_exchange( + Self::UNREGISTERED, + Self::REGISTERING, + Ordering::AcqRel, + Ordering::Acquire, + ); + + match prev_state { + Ok(_) => { + // Okay, we advanced the state, try to register the callsite. + CALLSITES.push_default(self); + rebuild_callsite_interest(self, &DISPATCHERS.rebuilder()); + self.registration.store(Self::REGISTERED, Ordering::Release); + break; + } + // Great, the callsite is already registered! Just load its + // previous cached interest. + Err(Self::REGISTERED) => break, + // Someone else is registering... + Err(_state) => { + debug_assert_eq!( + _state, + Self::REGISTERING, + "weird callsite registration state: {_state}" + ); + // The callsite is being registered. We have to wait until + // registration is finished, otherwise the register_callsite + // call could be missed completely. + continue; + } } } diff --git a/tracing-core/tests/missed_register_callsite_global_subscriber.rs b/tracing-core/tests/missed_register_callsite_global_subscriber.rs new file mode 100644 index 0000000000..ff6e5137fe --- /dev/null +++ b/tracing-core/tests/missed_register_callsite_global_subscriber.rs @@ -0,0 +1,130 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread, + time::Duration, +}; + +use tracing_core::{ + callsite::{Callsite as _, DefaultCallsite}, + dispatcher, + field::{FieldSet, Value}, + span, Dispatch, Event, Kind, Level, Metadata, Subscriber, +}; + +struct TestSubscriber { + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + Self { + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +fn dispatch_event(idx: usize) { + static CALLSITE: DefaultCallsite = { + // The values of the metadata are unimportant + static META: Metadata<'static> = Metadata::new( + "event ", + "module::path", + Level::INFO, + None, + None, + None, + FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)), + Kind::EVENT, + ); + DefaultCallsite::new(&META) + }; + let _interest = CALLSITE.interest(); + + let meta = CALLSITE.metadata(); + let field = meta.fields().field("message").unwrap(); + let message = format!("event-from-{idx}", idx = idx); + let values = [(&field, Some(&message as &dyn Value))]; + let value_set = CALLSITE.metadata().fields().value_set(&values); + + Event::dispatch(meta, &value_set); +} + +/// Regression test for missing register_callsite call (#2743) +/// +/// This test provides the race condition which causes multiple threads to attempt to register the +/// same callsite in parallel. Previously, if one thread finds that another thread is already in +/// The process of registering a callsite, it would continue on to call `enabled` and then possible +/// `event` or `new_span` which could then be called before ``register_callsite` had completed on +/// the thread actually registering the callsite. +/// +/// Because the test depends on the interaction of multiple dispatchers in different threads, +/// it needs to be in a test file by itself. +#[test] +fn event_before_register() { + let register_sleep_micros = 100; + let subscriber = TestSubscriber::new(register_sleep_micros); + dispatcher::set_global_default(Dispatch::new(subscriber)).unwrap(); + + std::thread::scope(|s| { + for idx in 0..16 { + thread::Builder::new() + .name(format!("event-{idx}")) + .spawn_scoped(s, move || dispatch_event(idx)) + .expect("failed to spawn thread"); + } + }); + +// let subscriber_1_register_sleep_micros = 100; +// let subscriber_2_register_sleep_micros = 0; +// +// let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros); +// +// // This delay ensures that the event callsite has interest() called first. +// thread::sleep(Duration::from_micros(50)); +// let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros); +// +// jh1.join().expect("failed to join thread"); +// jh2.join().expect("failed to join thread"); +} + + diff --git a/tracing-core/tests/missed_register_callsite.rs b/tracing-core/tests/missed_register_callsite_local_subscribers.rs similarity index 100% rename from tracing-core/tests/missed_register_callsite.rs rename to tracing-core/tests/missed_register_callsite_local_subscribers.rs diff --git a/tracing/tests/missed_register_callsite_global_subscriber.rs b/tracing/tests/missed_register_callsite_global_subscriber.rs new file mode 100644 index 0000000000..c8bfe81f34 --- /dev/null +++ b/tracing/tests/missed_register_callsite_global_subscriber.rs @@ -0,0 +1,93 @@ +use std::{ + ptr, + sync::atomic::{AtomicPtr, Ordering}, + thread, + time::Duration, +}; + +use tracing::Subscriber; +use tracing_core::{span, Metadata}; + +struct TestSubscriber { + creator_thread: String, + sleep: Duration, + callsite: AtomicPtr>, +} + +impl TestSubscriber { + fn new(sleep_micros: u64) -> Self { + let creator_thread = thread::current() + .name() + .unwrap_or("") + .to_owned(); + Self { + creator_thread, + sleep: Duration::from_micros(sleep_micros), + callsite: AtomicPtr::new(ptr::null_mut()), + } + } +} + +impl Subscriber for TestSubscriber { + fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest { + if !self.sleep.is_zero() { + thread::sleep(self.sleep); + } + + self.callsite + .store(metadata as *const _ as *mut _, Ordering::SeqCst); + println!( + "{creator} from {thread:?}: register_callsite: {callsite:#?}", + creator = self.creator_thread, + callsite = metadata as *const _, + thread = thread::current().name(), + ); + tracing_core::Interest::always() + } + + fn event(&self, event: &tracing_core::Event<'_>) { + let stored_callsite = self.callsite.load(Ordering::SeqCst); + let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _; + + println!( + "{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})", + creator = self.creator_thread, + thread = thread::current().name(), + ); + + // This assert is the actual test. + assert_eq!( + stored_callsite, event_callsite, + "stored callsite: {stored_callsite:#?} does not match event \ + callsite: {event_callsite:#?}. Was `event` called before \ + `register_callsite`?" + ); + } + + fn enabled(&self, _metadata: &Metadata<'_>) -> bool { + true + } + fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id { + span::Id::from_u64(0) + } + fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {} + fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {} + fn enter(&self, _span: &tracing_core::span::Id) {} + fn exit(&self, _span: &tracing_core::span::Id) {} +} + +#[test] +fn event_before_register() { + let register_sleep_micros = 100; + let subscriber = TestSubscriber::new(register_sleep_micros); + tracing::subscriber::set_global_default(subscriber).unwrap(); + + std::thread::scope(|s| { + for idx in 0..16 { + thread::Builder::new() + .name(format!("event-{idx}")) + .spawn_scoped(s, move || tracing::info!("Thread {} started", idx)) + .expect("failed to spawn thread"); + } + }); +} diff --git a/tracing/tests/missed_register_callsite.rs b/tracing/tests/missed_register_callsite_local_subscribers.rs similarity index 100% rename from tracing/tests/missed_register_callsite.rs rename to tracing/tests/missed_register_callsite_local_subscribers.rs