diff --git a/examples/common/args.rs b/examples/common/args.rs index 103eb7ab..a3eee20d 100644 --- a/examples/common/args.rs +++ b/examples/common/args.rs @@ -19,6 +19,10 @@ pub struct CommonArgs { /// Number of threads to use #[clap(long, default_value_t = 1)] pub n_threads: usize, + + /// Use FUSE_DEV_IOC_CLONE to give each thread its own fd (Linux 4.5+) + #[clap(long)] + pub clone_fd: bool, } impl CommonArgs { @@ -36,6 +40,7 @@ impl CommonArgs { config.acl = SessionACL::All; } config.n_threads = Some(self.n_threads); + config.clone_fd = self.clone_fd; config } } diff --git a/src/channel.rs b/src/channel.rs index 7051c053..af5b3ca7 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,11 +1,13 @@ use std::io; use std::os::fd::AsFd; +use std::os::fd::AsRawFd; use std::os::fd::BorrowedFd; use std::sync::Arc; use nix::errno::Errno; use crate::dev_fuse::DevFuse; +use crate::ll::ioctl::fuse_dev_ioc_clone; use crate::passthrough::BackingId; /// A raw communication channel to the FUSE kernel driver @@ -55,6 +57,29 @@ impl Channel { // a sender by using the same file and use it in other threads. ChannelSender(self.0.clone()) } + + /// Clone the FUSE device fd using FUSE_DEV_IOC_CLONE ioctl. + /// + /// This creates a new fd that can read FUSE requests independently, + /// enabling true parallel request processing. The kernel distributes + /// requests across all cloned fds. + /// + /// Requires Linux 4.5+. Returns an error on older kernels or non-Linux. + #[cfg(target_os = "linux")] + pub(crate) fn clone_fd(&self) -> io::Result { + // Open a new /dev/fuse fd + let new_dev = DevFuse::open()?; + + // Clone the connection to the new fd + let mut source_fd = self.0.as_raw_fd() as u32; + // SAFETY: fuse_dev_ioc_clone is a valid ioctl for /dev/fuse + unsafe { + fuse_dev_ioc_clone(new_dev.as_raw_fd(), &mut source_fd) + .map_err(|e| io::Error::from_raw_os_error(e as i32))?; + } + + Ok(Channel::new(Arc::new(new_dev))) + } } #[derive(Clone, Debug)] diff --git a/src/ll/ioctl.rs b/src/ll/ioctl.rs index 48a15802..134306e7 100644 --- a/src/ll/ioctl.rs +++ b/src/ll/ioctl.rs @@ -6,7 +6,6 @@ pub(crate) struct fuse_backing_map { } pub(crate) const FUSE_DEV_IOC_MAGIC: u8 = 229; -#[expect(dead_code)] pub(crate) const FUSE_DEV_IOC_CLONE: u8 = 0; pub(crate) const FUSE_DEV_IOC_BACKING_OPEN: u8 = 1; pub(crate) const FUSE_DEV_IOC_BACKING_CLOSE: u8 = 2; diff --git a/src/mnt/mount_options.rs b/src/mnt/mount_options.rs index 1e18ee05..9e0aa13c 100644 --- a/src/mnt/mount_options.rs +++ b/src/mnt/mount_options.rs @@ -15,6 +15,10 @@ pub struct Config { pub acl: SessionACL, /// Number of event loop threads. If unspecified, one thread is used. pub n_threads: Option, + /// Use FUSE_DEV_IOC_CLONE to give each worker thread its own fd. + /// This enables true parallel request processing from the kernel. + /// Requires Linux 4.5+. Falls back to shared fd on older kernels. + pub clone_fd: bool, } /// Mount options accepted by the FUSE filesystem type @@ -210,6 +214,7 @@ pub(crate) fn parse_options_from_args(args: &[&OsStr]) -> io::Result { mount_options: out, acl, n_threads: None, + clone_fd: false, }) } diff --git a/src/session.rs b/src/session.rs index abcf5930..bc83ff46 100644 --- a/src/session.rs +++ b/src/session.rs @@ -266,8 +266,19 @@ impl Session { let mut filesystem = Arc::new(filesystem); let mut channels = Vec::with_capacity(n_threads); + #[cfg(target_os = "linux")] + let use_clone_fd = config.clone_fd; + #[cfg(not(target_os = "linux"))] + let use_clone_fd = false; + for _ in 0..n_threads_minus_one { - // TODO: fuse_dev_ioc_clone + if use_clone_fd { + #[cfg(target_os = "linux")] + { + channels.push(ch.clone_fd()?); + continue; + } + } channels.push(ch.clone()); } channels.push(ch); diff --git a/tests/clone_fd_test.rs b/tests/clone_fd_test.rs new file mode 100644 index 00000000..9312d552 --- /dev/null +++ b/tests/clone_fd_test.rs @@ -0,0 +1,191 @@ +//! Test that clone_fd enables true parallel request handling. +//! +//! Uses a barrier to prove N threads are executing concurrently. +//! Without clone_fd, threads serialize on read() and barrier times out. + +use std::ffi::OsStr; +use std::io; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::sync::Barrier; +use std::thread; +use std::time::Duration; + +use fuser::Config; +use fuser::Errno; +use fuser::FileAttr; +use fuser::FileType; +use fuser::Filesystem; +use fuser::Generation; +use fuser::INodeNo; +use fuser::InitFlags; +use fuser::KernelConfig; +use fuser::ReplyAttr; +use fuser::ReplyEntry; +use fuser::Request; +use fuser::Session; +use tempfile::TempDir; + +const N_THREADS: usize = 4; + +struct BarrierFS { + barrier: Arc, + barrier_reached: Arc, +} + +impl Filesystem for BarrierFS { + fn init(&mut self, _req: &Request, config: &mut KernelConfig) -> io::Result<()> { + // Request FUSE_PARALLEL_DIROPS so the kernel allows parallel lookups. + // Without this, the kernel serializes directory operations, defeating clone_fd. + if let Err(unsupported) = config.add_capabilities(InitFlags::FUSE_PARALLEL_DIROPS) { + eprintln!( + "Warning: Kernel does not support FUSE_PARALLEL_DIROPS: {:?}", + unsupported + ); + } + Ok(()) + } + + fn getattr( + &self, + _req: &Request, + ino: INodeNo, + _fh: Option, + reply: ReplyAttr, + ) { + if ino == INodeNo::ROOT { + reply.attr( + &Duration::from_secs(0), + &FileAttr { + ino: INodeNo::ROOT, + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::Directory, + perm: 0o755, + nlink: 2, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + ); + } else { + reply.error(Errno::ENOENT); + } + } + + fn lookup(&self, _req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEntry) { + let thread_name = std::thread::current().name().unwrap_or("unknown").to_string(); + eprintln!("Server thread {} got lookup for {:?}", thread_name, name); + + // Accept any file starting with "barrier" (barrier0, barrier1, etc.) + let name_str = name.to_str().unwrap_or(""); + if parent == INodeNo::ROOT && name_str.starts_with("barrier") { + eprintln!("Server thread {} waiting at barrier...", thread_name); + // Wait at the barrier - requires N_THREADS concurrent threads + self.barrier.wait(); + eprintln!("Server thread {} passed barrier!", thread_name); + self.barrier_reached.store(true, Ordering::SeqCst); + + reply.entry( + &Duration::from_secs(0), + &FileAttr { + ino: INodeNo(2), + size: 0, + blocks: 0, + atime: std::time::UNIX_EPOCH, + mtime: std::time::UNIX_EPOCH, + ctime: std::time::UNIX_EPOCH, + crtime: std::time::UNIX_EPOCH, + kind: FileType::RegularFile, + perm: 0o644, + nlink: 1, + uid: 0, + gid: 0, + rdev: 0, + blksize: 4096, + flags: 0, + }, + Generation(0), + ); + } else { + reply.error(Errno::ENOENT); + } + } +} + +/// Test that clone_fd enables N threads to handle requests concurrently. +/// +/// The barrier requires exactly N_THREADS to arrive before any can proceed. +/// If clone_fd works: N threads read requests in parallel, all reach barrier, pass. +/// If clone_fd broken: only 1 thread in read() at a time, barrier never completes. +#[cfg(target_os = "linux")] +#[test] +fn clone_fd_enables_concurrent_handlers() { + let tmpdir = TempDir::new().unwrap(); + let mount_point = tmpdir.path(); + + let barrier = Arc::new(Barrier::new(N_THREADS)); + let barrier_reached = Arc::new(AtomicBool::new(false)); + + let fs = BarrierFS { + barrier: barrier.clone(), + barrier_reached: barrier_reached.clone(), + }; + + let mut config = Config::default(); + config.n_threads = Some(N_THREADS); + config.clone_fd = true; + + eprintln!("Creating session..."); + let session = Session::new(fs, mount_point, &config).unwrap(); + eprintln!("Spawning background session..."); + let bg = session.spawn().unwrap(); + + // Wait for mount + eprintln!("Waiting for mount..."); + thread::sleep(Duration::from_millis(200)); + eprintln!("Mount should be ready"); + + // Spawn N_THREADS client threads, each doing a lookup + // All must reach the barrier simultaneously for any to proceed + eprintln!("Spawning {} client threads...", N_THREADS); + let mp = mount_point.to_path_buf(); + let clients: Vec<_> = (0..N_THREADS) + .map(|i| { + let mp = mp.clone(); + thread::spawn(move || { + eprintln!("Client {} looking up barrier{}...", i, i); + // Each client looks up a different file to avoid dentry contention + let result = std::fs::metadata(mp.join(format!("barrier{}", i))); + eprintln!("Client {} done: {:?}", i, result.is_ok()); + }) + }) + .collect(); + eprintln!("All client threads spawned"); + + // Wait for clients with timeout + let start = std::time::Instant::now(); + for client in clients { + let remaining = Duration::from_secs(10).saturating_sub(start.elapsed()); + if remaining.is_zero() { + panic!("Timeout: barrier not reached - clone_fd may not be working"); + } + // Note: std thread::join doesn't have timeout, so we just join + // The barrier itself will block if clone_fd isn't working + client.join().expect("Client thread panicked"); + } + + assert!( + barrier_reached.load(Ordering::SeqCst), + "Barrier was never reached - clone_fd not enabling parallel execution" + ); + + drop(bg); +}