From 35ee426c8e6d34712854a53b35d375bc682e2638 Mon Sep 17 00:00:00 2001 From: ejc3 Date: Mon, 2 Feb 2026 01:57:46 +0000 Subject: [PATCH 1/3] Add FUSE_DEV_IOC_CLONE support for true parallel request processing Replace ch.clone() (which just clones the Arc, sharing one fd) with actual FUSE_DEV_IOC_CLONE ioctl that creates independent fds. Without clone_fd: all threads serialize on one fd With clone_fd: kernel distributes requests across fds (true parallelism) - Add Channel::clone_fd() using FUSE_DEV_IOC_CLONE ioctl - Update run() to use clone_fd() on Linux - Fall back to ch.clone() on non-Linux (no true parallelism) --- src/channel.rs | 36 ++++++++++ src/session.rs | 7 +- tests/clone_fd_test.rs | 150 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 192 insertions(+), 1 deletion(-) create mode 100644 tests/clone_fd_test.rs diff --git a/src/channel.rs b/src/channel.rs index 7051c053..e2a19a80 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,5 +1,7 @@ +use std::fs::OpenOptions; use std::io; use std::os::fd::AsFd; +use std::os::fd::AsRawFd; use std::os::fd::BorrowedFd; use std::sync::Arc; @@ -55,6 +57,40 @@ impl Channel { // a sender by using the same file and use it in other threads. ChannelSender(self.0.clone()) } + + /// Clone the FUSE device fd using FUSE_DEV_IOC_CLONE ioctl. + /// + /// This creates a new fd that can read FUSE requests independently, + /// enabling true parallel request processing. The kernel distributes + /// requests across all cloned fds. + /// + /// Only available on Linux. + #[cfg(target_os = "linux")] + pub(crate) fn clone_fd(&self) -> io::Result { + let new_fuse = OpenOptions::new() + .read(true) + .write(true) + .open(DevFuse::PATH)?; + + // FUSE_DEV_IOC_CLONE = _IOC(_IOC_WRITE, 229, 0, 4) + // Use libc::Ioctl to support both glibc (c_ulong) and musl (i32) + #[allow(overflowing_literals)] + const FUSE_DEV_IOC_CLONE: libc::Ioctl = 0x8004_e500u32 as libc::Ioctl; + + let src_fd = self.0.as_raw_fd(); + let ret = unsafe { + libc::ioctl( + new_fuse.as_raw_fd(), + FUSE_DEV_IOC_CLONE, + &src_fd as *const _, + ) + }; + if ret < 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Channel::new(Arc::new(DevFuse(new_fuse)))) + } } #[derive(Clone, Debug)] diff --git a/src/session.rs b/src/session.rs index abcf5930..db3e5713 100644 --- a/src/session.rs +++ b/src/session.rs @@ -266,8 +266,13 @@ impl Session { let mut filesystem = Arc::new(filesystem); let mut channels = Vec::with_capacity(n_threads); + #[cfg(target_os = "linux")] for _ in 0..n_threads_minus_one { - // TODO: fuse_dev_ioc_clone + channels.push(ch.clone_fd()?); + } + #[cfg(not(target_os = "linux"))] + for _ in 0..n_threads_minus_one { + // On non-Linux, fall back to sharing the fd (no true parallelism) channels.push(ch.clone()); } channels.push(ch); diff --git a/tests/clone_fd_test.rs b/tests/clone_fd_test.rs new file mode 100644 index 00000000..d9298f50 --- /dev/null +++ b/tests/clone_fd_test.rs @@ -0,0 +1,150 @@ +//! Tests for clone_fd multi-reader support +//! +//! Verifies that n_threads config with clone_fd enables multiple +//! FUSE worker threads. + +use std::collections::HashSet; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use fuser::{Config, Errno, FileType, Filesystem, Generation, INodeNo, ReplyAttr, ReplyEntry, Request, Session}; + +/// Filesystem that records which thread handled each request +struct ThreadTrackingFS { + threads: Arc>>, + getattr_count: Arc, +} + +impl Filesystem for ThreadTrackingFS { + fn getattr(&self, _req: &Request, ino: INodeNo, _fh: Option, reply: ReplyAttr) { + let thread_name = thread::current().name().unwrap_or("unknown").to_string(); + self.threads.lock().unwrap().insert(thread_name); + self.getattr_count.fetch_add(1, Ordering::SeqCst); + + if ino == INodeNo::ROOT { + reply.attr( + &Duration::from_secs(1), + &fuser::FileAttr { + ino: INodeNo::ROOT, + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::Directory, + perm: 0o755, + nlink: 2, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + ); + } else { + reply.error(Errno::ENOENT); + } + } + + fn lookup(&self, _req: &Request, parent: INodeNo, name: &std::ffi::OsStr, reply: ReplyEntry) { + let thread_name = thread::current().name().unwrap_or("unknown").to_string(); + self.threads.lock().unwrap().insert(thread_name); + + // Simulate work to increase chance of parallel execution + thread::sleep(Duration::from_millis(50)); + + if parent == INodeNo::ROOT && name.to_str() == Some("test") { + reply.entry( + &Duration::from_secs(1), + &fuser::FileAttr { + ino: INodeNo(2), + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::RegularFile, + perm: 0o644, + nlink: 1, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + Generation(0), + ); + } else { + reply.error(Errno::ENOENT); + } + } +} + +/// Test that n_threads config creates multiple fuser-N worker threads +#[cfg(target_os = "linux")] +#[test] +fn n_threads_creates_multiple_workers() { + let tmpdir = tempfile::tempdir().unwrap(); + let mount_point = tmpdir.path(); + + let threads = Arc::new(Mutex::new(HashSet::new())); + let getattr_count = Arc::new(AtomicUsize::new(0)); + + let fs = ThreadTrackingFS { + threads: threads.clone(), + getattr_count: getattr_count.clone(), + }; + + let mut config = Config::default(); + config.n_threads = Some(4); + + let session = Session::new(fs, mount_point, &config).unwrap(); + let bg = session.spawn().unwrap(); + + // Wait for mount + thread::sleep(Duration::from_millis(100)); + + // Send parallel requests + let mp = mount_point.to_path_buf(); + let client_threads: Vec<_> = (0..8) + .map(|_| { + let mp = mp.clone(); + thread::spawn(move || { + let _ = std::fs::metadata(&mp); + let _ = std::fs::metadata(mp.join("test")); + }) + }) + .collect(); + + for t in client_threads { + let _ = t.join(); + } + + thread::sleep(Duration::from_millis(100)); + + // Drop the background session (triggers unmount and cleanup) + drop(bg); + + // Check results + let count = getattr_count.load(Ordering::SeqCst); + let thread_set = threads.lock().unwrap(); + + eprintln!("getattr_count: {}", count); + eprintln!("Threads that handled requests: {:?}", thread_set); + + // Should have fuser-N threads + let fuser_threads: Vec<_> = thread_set + .iter() + .filter(|t| t.starts_with("fuser-")) + .collect(); + + assert!( + !fuser_threads.is_empty(), + "Expected fuser-N threads, got: {:?}", + thread_set + ); +} From fdbade6955867e7a086542293c9302ffda8570bb Mon Sep 17 00:00:00 2001 From: ejc3 Date: Thu, 5 Feb 2026 05:04:37 +0000 Subject: [PATCH 2/3] Address review feedback - Use existing fuse_dev_ioc_clone from ll/ioctl.rs instead of defining the ioctl inline - Make clone_fd opt-in via Config.clone_fd (default false) - Remove tests/clone_fd_test.rs (tests live in fuser-tests, examples/hello.rs already demonstrates the feature) --- examples/common/args.rs | 5 ++ src/channel.rs | 33 +++------ src/ll/ioctl.rs | 1 - src/mnt/mount_options.rs | 5 ++ src/session.rs | 14 ++-- tests/clone_fd_test.rs | 150 --------------------------------------- 6 files changed, 31 insertions(+), 177 deletions(-) delete mode 100644 tests/clone_fd_test.rs diff --git a/examples/common/args.rs b/examples/common/args.rs index 103eb7ab..a3eee20d 100644 --- a/examples/common/args.rs +++ b/examples/common/args.rs @@ -19,6 +19,10 @@ pub struct CommonArgs { /// Number of threads to use #[clap(long, default_value_t = 1)] pub n_threads: usize, + + /// Use FUSE_DEV_IOC_CLONE to give each thread its own fd (Linux 4.5+) + #[clap(long)] + pub clone_fd: bool, } impl CommonArgs { @@ -36,6 +40,7 @@ impl CommonArgs { config.acl = SessionACL::All; } config.n_threads = Some(self.n_threads); + config.clone_fd = self.clone_fd; config } } diff --git a/src/channel.rs b/src/channel.rs index e2a19a80..af5b3ca7 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,3 @@ -use std::fs::OpenOptions; use std::io; use std::os::fd::AsFd; use std::os::fd::AsRawFd; @@ -8,6 +7,7 @@ use std::sync::Arc; use nix::errno::Errno; use crate::dev_fuse::DevFuse; +use crate::ll::ioctl::fuse_dev_ioc_clone; use crate::passthrough::BackingId; /// A raw communication channel to the FUSE kernel driver @@ -64,32 +64,21 @@ impl Channel { /// enabling true parallel request processing. The kernel distributes /// requests across all cloned fds. /// - /// Only available on Linux. + /// Requires Linux 4.5+. Returns an error on older kernels or non-Linux. #[cfg(target_os = "linux")] pub(crate) fn clone_fd(&self) -> io::Result { - let new_fuse = OpenOptions::new() - .read(true) - .write(true) - .open(DevFuse::PATH)?; + // Open a new /dev/fuse fd + let new_dev = DevFuse::open()?; - // FUSE_DEV_IOC_CLONE = _IOC(_IOC_WRITE, 229, 0, 4) - // Use libc::Ioctl to support both glibc (c_ulong) and musl (i32) - #[allow(overflowing_literals)] - const FUSE_DEV_IOC_CLONE: libc::Ioctl = 0x8004_e500u32 as libc::Ioctl; - - let src_fd = self.0.as_raw_fd(); - let ret = unsafe { - libc::ioctl( - new_fuse.as_raw_fd(), - FUSE_DEV_IOC_CLONE, - &src_fd as *const _, - ) - }; - if ret < 0 { - return Err(io::Error::last_os_error()); + // Clone the connection to the new fd + let mut source_fd = self.0.as_raw_fd() as u32; + // SAFETY: fuse_dev_ioc_clone is a valid ioctl for /dev/fuse + unsafe { + fuse_dev_ioc_clone(new_dev.as_raw_fd(), &mut source_fd) + .map_err(|e| io::Error::from_raw_os_error(e as i32))?; } - Ok(Channel::new(Arc::new(DevFuse(new_fuse)))) + Ok(Channel::new(Arc::new(new_dev))) } } diff --git a/src/ll/ioctl.rs b/src/ll/ioctl.rs index 48a15802..134306e7 100644 --- a/src/ll/ioctl.rs +++ b/src/ll/ioctl.rs @@ -6,7 +6,6 @@ pub(crate) struct fuse_backing_map { } pub(crate) const FUSE_DEV_IOC_MAGIC: u8 = 229; -#[expect(dead_code)] pub(crate) const FUSE_DEV_IOC_CLONE: u8 = 0; pub(crate) const FUSE_DEV_IOC_BACKING_OPEN: u8 = 1; pub(crate) const FUSE_DEV_IOC_BACKING_CLOSE: u8 = 2; diff --git a/src/mnt/mount_options.rs b/src/mnt/mount_options.rs index 1e18ee05..9e0aa13c 100644 --- a/src/mnt/mount_options.rs +++ b/src/mnt/mount_options.rs @@ -15,6 +15,10 @@ pub struct Config { pub acl: SessionACL, /// Number of event loop threads. If unspecified, one thread is used. pub n_threads: Option, + /// Use FUSE_DEV_IOC_CLONE to give each worker thread its own fd. + /// This enables true parallel request processing from the kernel. + /// Requires Linux 4.5+. Falls back to shared fd on older kernels. + pub clone_fd: bool, } /// Mount options accepted by the FUSE filesystem type @@ -210,6 +214,7 @@ pub(crate) fn parse_options_from_args(args: &[&OsStr]) -> io::Result { mount_options: out, acl, n_threads: None, + clone_fd: false, }) } diff --git a/src/session.rs b/src/session.rs index db3e5713..bc83ff46 100644 --- a/src/session.rs +++ b/src/session.rs @@ -267,12 +267,18 @@ impl Session { let mut channels = Vec::with_capacity(n_threads); #[cfg(target_os = "linux")] - for _ in 0..n_threads_minus_one { - channels.push(ch.clone_fd()?); - } + let use_clone_fd = config.clone_fd; #[cfg(not(target_os = "linux"))] + let use_clone_fd = false; + for _ in 0..n_threads_minus_one { - // On non-Linux, fall back to sharing the fd (no true parallelism) + if use_clone_fd { + #[cfg(target_os = "linux")] + { + channels.push(ch.clone_fd()?); + continue; + } + } channels.push(ch.clone()); } channels.push(ch); diff --git a/tests/clone_fd_test.rs b/tests/clone_fd_test.rs deleted file mode 100644 index d9298f50..00000000 --- a/tests/clone_fd_test.rs +++ /dev/null @@ -1,150 +0,0 @@ -//! Tests for clone_fd multi-reader support -//! -//! Verifies that n_threads config with clone_fd enables multiple -//! FUSE worker threads. - -use std::collections::HashSet; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - -use fuser::{Config, Errno, FileType, Filesystem, Generation, INodeNo, ReplyAttr, ReplyEntry, Request, Session}; - -/// Filesystem that records which thread handled each request -struct ThreadTrackingFS { - threads: Arc>>, - getattr_count: Arc, -} - -impl Filesystem for ThreadTrackingFS { - fn getattr(&self, _req: &Request, ino: INodeNo, _fh: Option, reply: ReplyAttr) { - let thread_name = thread::current().name().unwrap_or("unknown").to_string(); - self.threads.lock().unwrap().insert(thread_name); - self.getattr_count.fetch_add(1, Ordering::SeqCst); - - if ino == INodeNo::ROOT { - reply.attr( - &Duration::from_secs(1), - &fuser::FileAttr { - ino: INodeNo::ROOT, - size: 0, - blocks: 0, - atime: std::time::UNIX_EPOCH, - mtime: std::time::UNIX_EPOCH, - ctime: std::time::UNIX_EPOCH, - crtime: std::time::UNIX_EPOCH, - kind: FileType::Directory, - perm: 0o755, - nlink: 2, - uid: 0, - gid: 0, - rdev: 0, - blksize: 4096, - flags: 0, - }, - ); - } else { - reply.error(Errno::ENOENT); - } - } - - fn lookup(&self, _req: &Request, parent: INodeNo, name: &std::ffi::OsStr, reply: ReplyEntry) { - let thread_name = thread::current().name().unwrap_or("unknown").to_string(); - self.threads.lock().unwrap().insert(thread_name); - - // Simulate work to increase chance of parallel execution - thread::sleep(Duration::from_millis(50)); - - if parent == INodeNo::ROOT && name.to_str() == Some("test") { - reply.entry( - &Duration::from_secs(1), - &fuser::FileAttr { - ino: INodeNo(2), - size: 0, - blocks: 0, - atime: std::time::UNIX_EPOCH, - mtime: std::time::UNIX_EPOCH, - ctime: std::time::UNIX_EPOCH, - crtime: std::time::UNIX_EPOCH, - kind: FileType::RegularFile, - perm: 0o644, - nlink: 1, - uid: 0, - gid: 0, - rdev: 0, - blksize: 4096, - flags: 0, - }, - Generation(0), - ); - } else { - reply.error(Errno::ENOENT); - } - } -} - -/// Test that n_threads config creates multiple fuser-N worker threads -#[cfg(target_os = "linux")] -#[test] -fn n_threads_creates_multiple_workers() { - let tmpdir = tempfile::tempdir().unwrap(); - let mount_point = tmpdir.path(); - - let threads = Arc::new(Mutex::new(HashSet::new())); - let getattr_count = Arc::new(AtomicUsize::new(0)); - - let fs = ThreadTrackingFS { - threads: threads.clone(), - getattr_count: getattr_count.clone(), - }; - - let mut config = Config::default(); - config.n_threads = Some(4); - - let session = Session::new(fs, mount_point, &config).unwrap(); - let bg = session.spawn().unwrap(); - - // Wait for mount - thread::sleep(Duration::from_millis(100)); - - // Send parallel requests - let mp = mount_point.to_path_buf(); - let client_threads: Vec<_> = (0..8) - .map(|_| { - let mp = mp.clone(); - thread::spawn(move || { - let _ = std::fs::metadata(&mp); - let _ = std::fs::metadata(mp.join("test")); - }) - }) - .collect(); - - for t in client_threads { - let _ = t.join(); - } - - thread::sleep(Duration::from_millis(100)); - - // Drop the background session (triggers unmount and cleanup) - drop(bg); - - // Check results - let count = getattr_count.load(Ordering::SeqCst); - let thread_set = threads.lock().unwrap(); - - eprintln!("getattr_count: {}", count); - eprintln!("Threads that handled requests: {:?}", thread_set); - - // Should have fuser-N threads - let fuser_threads: Vec<_> = thread_set - .iter() - .filter(|t| t.starts_with("fuser-")) - .collect(); - - assert!( - !fuser_threads.is_empty(), - "Expected fuser-N threads, got: {:?}", - thread_set - ); -} From 33ddf025c3aca52beb296d6526f9e16c80eaeae3 Mon Sep 17 00:00:00 2001 From: ejc3 Date: Fri, 6 Feb 2026 08:03:37 +0000 Subject: [PATCH 3/3] Add integration test proving clone_fd enables parallel request handling Uses a barrier that requires N threads to arrive simultaneously: - 4 client threads lookup different files (barrier0, barrier1, etc.) - Server's lookup() waits at barrier before responding - Test passes only if 4 server threads handle requests concurrently Requires FUSE_PARALLEL_DIROPS flag to allow parallel directory ops. Uses different file names to avoid VFS dentry-level serialization. --- tests/clone_fd_test.rs | 191 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 191 insertions(+) create mode 100644 tests/clone_fd_test.rs diff --git a/tests/clone_fd_test.rs b/tests/clone_fd_test.rs new file mode 100644 index 00000000..9312d552 --- /dev/null +++ b/tests/clone_fd_test.rs @@ -0,0 +1,191 @@ +//! Test that clone_fd enables true parallel request handling. +//! +//! Uses a barrier to prove N threads are executing concurrently. +//! Without clone_fd, threads serialize on read() and barrier times out. + +use std::ffi::OsStr; +use std::io; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Barrier; +use std::thread; +use std::time::Duration; + +use fuser::Config; +use fuser::Errno; +use fuser::FileAttr; +use fuser::FileType; +use fuser::Filesystem; +use fuser::Generation; +use fuser::INodeNo; +use fuser::InitFlags; +use fuser::KernelConfig; +use fuser::ReplyAttr; +use fuser::ReplyEntry; +use fuser::Request; +use fuser::Session; +use tempfile::TempDir; + +const N_THREADS: usize = 4; + +struct BarrierFS { + barrier: Arc, + barrier_reached: Arc, +} + +impl Filesystem for BarrierFS { + fn init(&mut self, _req: &Request, config: &mut KernelConfig) -> io::Result<()> { + // Request FUSE_PARALLEL_DIROPS so the kernel allows parallel lookups. + // Without this, the kernel serializes directory operations, defeating clone_fd. + if let Err(unsupported) = config.add_capabilities(InitFlags::FUSE_PARALLEL_DIROPS) { + eprintln!( + "Warning: Kernel does not support FUSE_PARALLEL_DIROPS: {:?}", + unsupported + ); + } + Ok(()) + } + + fn getattr( + &self, + _req: &Request, + ino: INodeNo, + _fh: Option, + reply: ReplyAttr, + ) { + if ino == INodeNo::ROOT { + reply.attr( + &Duration::from_secs(0), + &FileAttr { + ino: INodeNo::ROOT, + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::Directory, + perm: 0o755, + nlink: 2, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + ); + } else { + reply.error(Errno::ENOENT); + } + } + + fn lookup(&self, _req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEntry) { + let thread_name = std::thread::current().name().unwrap_or("unknown").to_string(); + eprintln!("Server thread {} got lookup for {:?}", thread_name, name); + + // Accept any file starting with "barrier" (barrier0, barrier1, etc.) + let name_str = name.to_str().unwrap_or(""); + if parent == INodeNo::ROOT && name_str.starts_with("barrier") { + eprintln!("Server thread {} waiting at barrier...", thread_name); + // Wait at the barrier - requires N_THREADS concurrent threads + self.barrier.wait(); + eprintln!("Server thread {} passed barrier!", thread_name); + self.barrier_reached.store(true, Ordering::SeqCst); + + reply.entry( + &Duration::from_secs(0), + &FileAttr { + ino: INodeNo(2), + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::RegularFile, + perm: 0o644, + nlink: 1, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + Generation(0), + ); + } else { + reply.error(Errno::ENOENT); + } + } +} + +/// Test that clone_fd enables N threads to handle requests concurrently. +/// +/// The barrier requires exactly N_THREADS to arrive before any can proceed. +/// If clone_fd works: N threads read requests in parallel, all reach barrier, pass. +/// If clone_fd broken: only 1 thread in read() at a time, barrier never completes. +#[cfg(target_os = "linux")] +#[test] +fn clone_fd_enables_concurrent_handlers() { + let tmpdir = TempDir::new().unwrap(); + let mount_point = tmpdir.path(); + + let barrier = Arc::new(Barrier::new(N_THREADS)); + let barrier_reached = Arc::new(AtomicBool::new(false)); + + let fs = BarrierFS { + barrier: barrier.clone(), + barrier_reached: barrier_reached.clone(), + }; + + let mut config = Config::default(); + config.n_threads = Some(N_THREADS); + config.clone_fd = true; + + eprintln!("Creating session..."); + let session = Session::new(fs, mount_point, &config).unwrap(); + eprintln!("Spawning background session..."); + let bg = session.spawn().unwrap(); + + // Wait for mount + eprintln!("Waiting for mount..."); + thread::sleep(Duration::from_millis(200)); + eprintln!("Mount should be ready"); + + // Spawn N_THREADS client threads, each doing a lookup + // All must reach the barrier simultaneously for any to proceed + eprintln!("Spawning {} client threads...", N_THREADS); + let mp = mount_point.to_path_buf(); + let clients: Vec<_> = (0..N_THREADS) + .map(|i| { + let mp = mp.clone(); + thread::spawn(move || { + eprintln!("Client {} looking up barrier{}...", i, i); + // Each client looks up a different file to avoid dentry contention + let result = std::fs::metadata(mp.join(format!("barrier{}", i))); + eprintln!("Client {} done: {:?}", i, result.is_ok()); + }) + }) + .collect(); + eprintln!("All client threads spawned"); + + // Wait for clients with timeout + let start = std::time::Instant::now(); + for client in clients { + let remaining = Duration::from_secs(10).saturating_sub(start.elapsed()); + if remaining.is_zero() { + panic!("Timeout: barrier not reached - clone_fd may not be working"); + } + // Note: std thread::join doesn't have timeout, so we just join + // The barrier itself will block if clone_fd isn't working + client.join().expect("Client thread panicked"); + } + + assert!( + barrier_reached.load(Ordering::SeqCst), + "Barrier was never reached - clone_fd not enabling parallel execution" + ); + + drop(bg); +}