Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 32 additions & 25 deletions tracing-core/src/callsite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,31 +306,38 @@ impl DefaultCallsite {
// This only happens once (or if the cached interest value was corrupted).
#[cold]
pub fn register(&'static self) -> Interest {
// Attempt to advance the registration state to `REGISTERING`...
match self.registration.compare_exchange(
Self::UNREGISTERED,
Self::REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
CALLSITES.push_default(self);
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
self.registration.store(Self::REGISTERED, Ordering::Release);
}
// Great, the callsite is already registered! Just load its
// previous cached interest.
Err(Self::REGISTERED) => {}
// Someone else is registering...
Err(_state) => {
debug_assert_eq!(
_state,
Self::REGISTERING,
"weird callsite registration state"
);
// Just hit `enabled` this time.
return Interest::sometimes();
loop {
// Attempt to advance the registration state to `REGISTERING`...
let prev_state = self.registration.compare_exchange(
Self::UNREGISTERED,
Self::REGISTERING,
Ordering::AcqRel,
Ordering::Acquire,
);

match prev_state {
Ok(_) => {
// Okay, we advanced the state, try to register the callsite.
CALLSITES.push_default(self);
rebuild_callsite_interest(self, &DISPATCHERS.rebuilder());
self.registration.store(Self::REGISTERED, Ordering::Release);
break;
}
// Great, the callsite is already registered! Just load its
// previous cached interest.
Err(Self::REGISTERED) => break,
// Someone else is registering...
Err(_state) => {
debug_assert_eq!(
_state,
Self::REGISTERING,
"weird callsite registration state: {_state}"
);
// The callsite is being registered. We have to wait until
// registration is finished, otherwise the register_callsite
// call could be missed completely.
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we must spin, then at least use std::hint::spin_loop().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the tip!

This currently causes a deadlock in the case of reentrant traces from inside a subscriber, so I'm trying to come up with a solution that avoids that issue.

}
}
}

Expand Down
130 changes: 130 additions & 0 deletions tracing-core/tests/missed_register_callsite_global_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
thread,
time::Duration,
};

use tracing_core::{
callsite::{Callsite as _, DefaultCallsite},
dispatcher,
field::{FieldSet, Value},
span, Dispatch, Event, Kind, Level, Metadata, Subscriber,
};

struct TestSubscriber {
sleep: Duration,
callsite: AtomicPtr<Metadata<'static>>,
}

impl TestSubscriber {
fn new(sleep_micros: u64) -> Self {
Self {
sleep: Duration::from_micros(sleep_micros),
callsite: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Subscriber for TestSubscriber {
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
if !self.sleep.is_zero() {
thread::sleep(self.sleep);
}

self.callsite
.store(metadata as *const _ as *mut _, Ordering::SeqCst);

tracing_core::Interest::always()
}

fn event(&self, event: &tracing_core::Event<'_>) {
let stored_callsite = self.callsite.load(Ordering::SeqCst);
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;

// This assert is the actual test.
assert_eq!(
stored_callsite, event_callsite,
"stored callsite: {stored_callsite:#?} does not match event \
callsite: {event_callsite:#?}. Was `event` called before \
`register_callsite`?"
);
}

fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(0)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn enter(&self, _span: &tracing_core::span::Id) {}
fn exit(&self, _span: &tracing_core::span::Id) {}
}

fn dispatch_event(idx: usize) {
static CALLSITE: DefaultCallsite = {
// The values of the metadata are unimportant
static META: Metadata<'static> = Metadata::new(
"event ",
"module::path",
Level::INFO,
None,
None,
None,
FieldSet::new(&["message"], tracing_core::callsite::Identifier(&CALLSITE)),
Kind::EVENT,
);
DefaultCallsite::new(&META)
};
let _interest = CALLSITE.interest();

let meta = CALLSITE.metadata();
let field = meta.fields().field("message").unwrap();
let message = format!("event-from-{idx}", idx = idx);
let values = [(&field, Some(&message as &dyn Value))];
let value_set = CALLSITE.metadata().fields().value_set(&values);

Event::dispatch(meta, &value_set);
}

/// Regression test for missing register_callsite call (#2743)
///
/// This test provides the race condition which causes multiple threads to attempt to register the
/// same callsite in parallel. Previously, if one thread finds that another thread is already in
/// The process of registering a callsite, it would continue on to call `enabled` and then possible
/// `event` or `new_span` which could then be called before ``register_callsite` had completed on
/// the thread actually registering the callsite.
///
/// Because the test depends on the interaction of multiple dispatchers in different threads,
/// it needs to be in a test file by itself.
#[test]
fn event_before_register() {
let register_sleep_micros = 100;
let subscriber = TestSubscriber::new(register_sleep_micros);
dispatcher::set_global_default(Dispatch::new(subscriber)).unwrap();

std::thread::scope(|s| {
for idx in 0..16 {
thread::Builder::new()
.name(format!("event-{idx}"))
.spawn_scoped(s, move || dispatch_event(idx))
.expect("failed to spawn thread");
}
});

// let subscriber_1_register_sleep_micros = 100;
// let subscriber_2_register_sleep_micros = 0;
//
// let jh1 = subscriber_thread(1, subscriber_1_register_sleep_micros);
//
// // This delay ensures that the event callsite has interest() called first.
// thread::sleep(Duration::from_micros(50));
// let jh2 = subscriber_thread(2, subscriber_2_register_sleep_micros);
//
// jh1.join().expect("failed to join thread");
// jh2.join().expect("failed to join thread");
}


93 changes: 93 additions & 0 deletions tracing/tests/missed_register_callsite_global_subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
use std::{
ptr,
sync::atomic::{AtomicPtr, Ordering},
thread,
time::Duration,
};

use tracing::Subscriber;
use tracing_core::{span, Metadata};

struct TestSubscriber {
creator_thread: String,
sleep: Duration,
callsite: AtomicPtr<Metadata<'static>>,
}

impl TestSubscriber {
fn new(sleep_micros: u64) -> Self {
let creator_thread = thread::current()
.name()
.unwrap_or("<unknown thread>")
.to_owned();
Self {
creator_thread,
sleep: Duration::from_micros(sleep_micros),
callsite: AtomicPtr::new(ptr::null_mut()),
}
}
}

impl Subscriber for TestSubscriber {
fn register_callsite(&self, metadata: &'static Metadata<'static>) -> tracing_core::Interest {
if !self.sleep.is_zero() {
thread::sleep(self.sleep);
}

self.callsite
.store(metadata as *const _ as *mut _, Ordering::SeqCst);
println!(
"{creator} from {thread:?}: register_callsite: {callsite:#?}",
creator = self.creator_thread,
callsite = metadata as *const _,
thread = thread::current().name(),
);
tracing_core::Interest::always()
}

fn event(&self, event: &tracing_core::Event<'_>) {
let stored_callsite = self.callsite.load(Ordering::SeqCst);
let event_callsite: *mut Metadata<'static> = event.metadata() as *const _ as *mut _;

println!(
"{creator} from {thread:?}: event (with callsite): {event_callsite:#?} (stored callsite: {stored_callsite:#?})",
creator = self.creator_thread,
thread = thread::current().name(),
);

// This assert is the actual test.
assert_eq!(
stored_callsite, event_callsite,
"stored callsite: {stored_callsite:#?} does not match event \
callsite: {event_callsite:#?}. Was `event` called before \
`register_callsite`?"
);
}

fn enabled(&self, _metadata: &Metadata<'_>) -> bool {
true
}
fn new_span(&self, _span: &span::Attributes<'_>) -> span::Id {
span::Id::from_u64(0)
}
fn record(&self, _span: &span::Id, _values: &span::Record<'_>) {}
fn record_follows_from(&self, _span: &span::Id, _follows: &span::Id) {}
fn enter(&self, _span: &tracing_core::span::Id) {}
fn exit(&self, _span: &tracing_core::span::Id) {}
}

#[test]
fn event_before_register() {
let register_sleep_micros = 100;
let subscriber = TestSubscriber::new(register_sleep_micros);
tracing::subscriber::set_global_default(subscriber).unwrap();

std::thread::scope(|s| {
for idx in 0..16 {
thread::Builder::new()
.name(format!("event-{idx}"))
.spawn_scoped(s, move || tracing::info!("Thread {} started", idx))
.expect("failed to spawn thread");
}
});
}
Loading