Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ignore: gracefully quit worker threads upon panic in ParallelVisitor #3010

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 101 additions & 16 deletions crates/ignore/src/walk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,15 @@ enum Sorter {
}

#[derive(Clone)]
struct Filter(Arc<dyn Fn(&DirEntry) -> bool + Send + Sync + 'static>);
struct Filter(
Arc<
dyn Fn(&DirEntry) -> bool
+ std::panic::UnwindSafe
+ Send
+ Sync
+ 'static,
>,
);

impl std::fmt::Debug for WalkBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
Expand Down Expand Up @@ -896,7 +904,15 @@ impl WalkBuilder {
/// predicate will still be yielded.
pub fn filter_entry<P>(&mut self, filter: P) -> &mut WalkBuilder
where
P: Fn(&DirEntry) -> bool + Send + Sync + 'static,
P: Fn(&DirEntry) -> bool
// NB: this is a breaking API change that will disallow some valid filter methods, if
// used within a context where panic="abort", for example. This will allow *most*
// filter methods, however, and users can simply use AssertUnwindSafe to circumvent
// this check.
+ std::panic::UnwindSafe
+ Send
+ Sync
+ 'static,
{
self.filter = Some(Filter(Arc::new(filter)));
self
Expand Down Expand Up @@ -1281,26 +1297,73 @@ impl WalkParallel {
let quit_now = Arc::new(AtomicBool::new(false));
let active_workers = Arc::new(AtomicUsize::new(threads));
let stacks = Stack::new_for_each_thread(threads, stack);

let workers: Vec<_> = stacks
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
})
.collect();

// Retain panic objects and re-throw them outside the thread scope.
let mut err: Option<Box<dyn std::any::Any + Send>> = None;
std::thread::scope(|s| {
let handles: Vec<_> = stacks
// Spawn a thread per worker, and catch any inner panics.
let handles: Vec<_> = workers
.into_iter()
.map(|stack| Worker {
visitor: builder.build(),
stack,
quit_now: quit_now.clone(),
active_workers: active_workers.clone(),
max_depth: self.max_depth,
max_filesize: self.max_filesize,
follow_links: self.follow_links,
skip: self.skip.clone(),
filter: self.filter.clone(),
.map(|worker| {
let quit_now = quit_now.clone();
s.spawn(move || {
// This is safe because we consume the worker in this closure and never
// touch it again. Any thread-local data is never read again, and any shared
// data is only accessed through safe atomic operations.
// NB: this requires the filter method to be UnwindSafe for two reasons:
// (1) If WalkBuilder#filter_entry() is called, and *that* callback panics,
// then the given `Fn(&DirEntry) -> bool` may be left in an
// inconsistent state.
// (2) This is only an issue because WalkParallel#visit() does not consume
// the provided ParallelVisitorBuilder! This means the builder may still
// be accessible when this function returns, e.g. if the user wraps
// .visit() with panic::catch_unwind().
// I'm pretty sure this can only be resolved with a breaking change to the
// API. We currently remain safe by adding an UnwindSafe requirement to the
// method provided to .filter_entry(), but we should probably be consuming
// the builder as well.
let mut worker = std::panic::AssertUnwindSafe(worker);
let result =
std::panic::catch_unwind(move || worker.run());
if let Err(e) = result {
// Send the quit flag to all remaining workers, which overrides any
// other work.
quit_now.store(true, AtomicOrdering::SeqCst);
std::panic::resume_unwind(e)
}
})
})
.map(|worker| s.spawn(|| worker.run()))
.collect();
// Iterate through the threads in the order they were spawned.
for handle in handles {
handle.join().unwrap();
if let Err(e) = handle.join() {
// If any panic occurs, pick whichever one we get first and discard the rest.
// NB: this *doesn't* correspond to the first thread to actually panic, but just
// the first thread in the spawn order. If one thread panics as the *result*
// of another thread panic, we won't have provided complete information.
let _ = err.get_or_insert(e);
}
}
});
// Re-throw any panic.
if let Some(e) = err {
std::panic::resume_unwind(e);
}
}

fn threads(&self) -> usize {
Expand Down Expand Up @@ -1495,7 +1558,11 @@ impl<'s> Worker<'s> {
///
/// The worker will call the caller's callback for all entries that aren't
/// skipped by the ignore matcher.
fn run(mut self) {
///
/// This method is `&mut self` instead of consuming with `mut self` in order to be used within
/// [`AssertUnwindSafe`](std::panic::AssertUnwindSafe), which seems to require dereferencing
/// a mutable reference to avoid triggering the unwind safety check.
fn run(&mut self) {
while let Some(work) = self.get_work() {
if let WalkState::Quit = self.run_one(work) {
self.quit_now();
Expand Down Expand Up @@ -1693,6 +1760,12 @@ impl<'s> Worker<'s> {
}
// Wait for next `Work` or `Quit` message.
loop {
// While in this busy loop, also ensure we check for the global quit flag.
// This ensures we don't loop forever if the worker stack that was supposed
// to alert us exited with a panic.
if self.is_quit_now() {
return None;
}
if let Some(v) = self.recv() {
self.activate_worker();
value = Some(v);
Expand Down Expand Up @@ -2221,6 +2294,18 @@ mod tests {
assert!(!dents[0].path_is_symlink());
}

#[test]
#[should_panic(expected = "oops!")]
fn panic_in_parallel() {
let td = tmpdir();
wfile(td.path().join("foo.txt"), "");

WalkBuilder::new(td.path())
.threads(40)
.build_parallel()
.run(|| Box::new(|_| panic!("oops!")));
}

#[cfg(unix)] // because symlinks on windows are weird
#[test]
fn symlink_loop() {
Expand Down