Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions examples/common/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pub struct CommonArgs {
/// Number of threads to use
#[clap(long, default_value_t = 1)]
pub n_threads: usize,

/// Use FUSE_DEV_IOC_CLONE to give each thread its own fd (Linux 4.5+)
#[clap(long)]
pub clone_fd: bool,
}

impl CommonArgs {
Expand All @@ -36,6 +40,7 @@ impl CommonArgs {
config.acl = SessionACL::All;
}
config.n_threads = Some(self.n_threads);
config.clone_fd = self.clone_fd;
config
}
}
25 changes: 25 additions & 0 deletions src/channel.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use std::io;
use std::os::fd::AsFd;
use std::os::fd::AsRawFd;
use std::os::fd::BorrowedFd;
use std::sync::Arc;

use nix::errno::Errno;

use crate::dev_fuse::DevFuse;
use crate::ll::ioctl::fuse_dev_ioc_clone;
use crate::passthrough::BackingId;

/// A raw communication channel to the FUSE kernel driver
Expand Down Expand Up @@ -55,6 +57,29 @@ impl Channel {
// a sender by using the same file and use it in other threads.
ChannelSender(self.0.clone())
}

/// Clone the FUSE device fd using FUSE_DEV_IOC_CLONE ioctl.
///
/// This creates a new fd that can read FUSE requests independently,
/// enabling true parallel request processing. The kernel distributes
/// requests across all cloned fds.
///
/// Requires Linux 4.5+. Returns an error on older kernels or non-Linux.
#[cfg(target_os = "linux")]
pub(crate) fn clone_fd(&self) -> io::Result<Channel> {
// Open a new /dev/fuse fd
let new_dev = DevFuse::open()?;

// Clone the connection to the new fd
let mut source_fd = self.0.as_raw_fd() as u32;
// SAFETY: fuse_dev_ioc_clone is a valid ioctl for /dev/fuse
unsafe {
fuse_dev_ioc_clone(new_dev.as_raw_fd(), &mut source_fd)
.map_err(|e| io::Error::from_raw_os_error(e as i32))?;
}

Ok(Channel::new(Arc::new(new_dev)))
}
}

#[derive(Clone, Debug)]
Expand Down
1 change: 0 additions & 1 deletion src/ll/ioctl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ pub(crate) struct fuse_backing_map {
}

pub(crate) const FUSE_DEV_IOC_MAGIC: u8 = 229;
#[expect(dead_code)]
pub(crate) const FUSE_DEV_IOC_CLONE: u8 = 0;
pub(crate) const FUSE_DEV_IOC_BACKING_OPEN: u8 = 1;
pub(crate) const FUSE_DEV_IOC_BACKING_CLOSE: u8 = 2;
Expand Down
5 changes: 5 additions & 0 deletions src/mnt/mount_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ pub struct Config {
pub acl: SessionACL,
/// Number of event loop threads. If unspecified, one thread is used.
pub n_threads: Option<usize>,
/// Use FUSE_DEV_IOC_CLONE to give each worker thread its own fd.
/// This enables true parallel request processing from the kernel.
/// Requires Linux 4.5+. Falls back to shared fd on older kernels.
pub clone_fd: bool,
}

/// Mount options accepted by the FUSE filesystem type
Expand Down Expand Up @@ -210,6 +214,7 @@ pub(crate) fn parse_options_from_args(args: &[&OsStr]) -> io::Result<Config> {
mount_options: out,
acl,
n_threads: None,
clone_fd: false,
})
}

Expand Down
13 changes: 12 additions & 1 deletion src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,19 @@ impl<FS: Filesystem> Session<FS> {
let mut filesystem = Arc::new(filesystem);

let mut channels = Vec::with_capacity(n_threads);
#[cfg(target_os = "linux")]
let use_clone_fd = config.clone_fd;
#[cfg(not(target_os = "linux"))]
let use_clone_fd = false;

for _ in 0..n_threads_minus_one {
// TODO: fuse_dev_ioc_clone
if use_clone_fd {
#[cfg(target_os = "linux")]
{
channels.push(ch.clone_fd()?);
continue;
}
}
channels.push(ch.clone());
}
channels.push(ch);
Expand Down
191 changes: 191 additions & 0 deletions tests/clone_fd_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
//! Test that clone_fd enables true parallel request handling.
//!
//! Uses a barrier to prove N threads are executing concurrently.
//! Without clone_fd, threads serialize on read() and barrier times out.

use std::ffi::OsStr;
use std::io;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Barrier;
use std::thread;
use std::time::Duration;

use fuser::Config;
use fuser::Errno;
use fuser::FileAttr;
use fuser::FileType;
use fuser::Filesystem;
use fuser::Generation;
use fuser::INodeNo;
use fuser::InitFlags;
use fuser::KernelConfig;
use fuser::ReplyAttr;
use fuser::ReplyEntry;
use fuser::Request;
use fuser::Session;
use tempfile::TempDir;

const N_THREADS: usize = 4;

struct BarrierFS {
barrier: Arc<Barrier>,
barrier_reached: Arc<AtomicBool>,
}

impl Filesystem for BarrierFS {
fn init(&mut self, _req: &Request, config: &mut KernelConfig) -> io::Result<()> {
// Request FUSE_PARALLEL_DIROPS so the kernel allows parallel lookups.
// Without this, the kernel serializes directory operations, defeating clone_fd.
if let Err(unsupported) = config.add_capabilities(InitFlags::FUSE_PARALLEL_DIROPS) {
eprintln!(
"Warning: Kernel does not support FUSE_PARALLEL_DIROPS: {:?}",
unsupported
);
}
Ok(())
}

fn getattr(
&self,
_req: &Request,
ino: INodeNo,
_fh: Option<fuser::FileHandle>,
reply: ReplyAttr,
) {
if ino == INodeNo::ROOT {
reply.attr(
&Duration::from_secs(0),
&FileAttr {
ino: INodeNo::ROOT,
size: 0,
blocks: 0,
atime: std::time::UNIX_EPOCH,
mtime: std::time::UNIX_EPOCH,
ctime: std::time::UNIX_EPOCH,
crtime: std::time::UNIX_EPOCH,
kind: FileType::Directory,
perm: 0o755,
nlink: 2,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
},
);
} else {
reply.error(Errno::ENOENT);
}
}

fn lookup(&self, _req: &Request, parent: INodeNo, name: &OsStr, reply: ReplyEntry) {
let thread_name = std::thread::current().name().unwrap_or("unknown").to_string();
eprintln!("Server thread {} got lookup for {:?}", thread_name, name);

// Accept any file starting with "barrier" (barrier0, barrier1, etc.)
let name_str = name.to_str().unwrap_or("");
if parent == INodeNo::ROOT && name_str.starts_with("barrier") {
eprintln!("Server thread {} waiting at barrier...", thread_name);
// Wait at the barrier - requires N_THREADS concurrent threads
self.barrier.wait();
eprintln!("Server thread {} passed barrier!", thread_name);
self.barrier_reached.store(true, Ordering::SeqCst);

reply.entry(
&Duration::from_secs(0),
&FileAttr {
ino: INodeNo(2),
size: 0,
blocks: 0,
atime: std::time::UNIX_EPOCH,
mtime: std::time::UNIX_EPOCH,
ctime: std::time::UNIX_EPOCH,
crtime: std::time::UNIX_EPOCH,
kind: FileType::RegularFile,
perm: 0o644,
nlink: 1,
uid: 0,
gid: 0,
rdev: 0,
blksize: 4096,
flags: 0,
},
Generation(0),
);
} else {
reply.error(Errno::ENOENT);
}
}
}

/// Test that clone_fd enables N threads to handle requests concurrently.
///
/// The barrier requires exactly N_THREADS to arrive before any can proceed.
/// If clone_fd works: N threads read requests in parallel, all reach barrier, pass.
/// If clone_fd broken: only 1 thread in read() at a time, barrier never completes.
#[cfg(target_os = "linux")]
#[test]
fn clone_fd_enables_concurrent_handlers() {
let tmpdir = TempDir::new().unwrap();
let mount_point = tmpdir.path();

let barrier = Arc::new(Barrier::new(N_THREADS));
let barrier_reached = Arc::new(AtomicBool::new(false));

let fs = BarrierFS {
barrier: barrier.clone(),
barrier_reached: barrier_reached.clone(),
};

let mut config = Config::default();
config.n_threads = Some(N_THREADS);
config.clone_fd = true;

eprintln!("Creating session...");
let session = Session::new(fs, mount_point, &config).unwrap();
eprintln!("Spawning background session...");
let bg = session.spawn().unwrap();

// Wait for mount
eprintln!("Waiting for mount...");
thread::sleep(Duration::from_millis(200));
eprintln!("Mount should be ready");

// Spawn N_THREADS client threads, each doing a lookup
// All must reach the barrier simultaneously for any to proceed
eprintln!("Spawning {} client threads...", N_THREADS);
let mp = mount_point.to_path_buf();
let clients: Vec<_> = (0..N_THREADS)
.map(|i| {
let mp = mp.clone();
thread::spawn(move || {
eprintln!("Client {} looking up barrier{}...", i, i);
// Each client looks up a different file to avoid dentry contention
let result = std::fs::metadata(mp.join(format!("barrier{}", i)));
eprintln!("Client {} done: {:?}", i, result.is_ok());
})
})
.collect();
eprintln!("All client threads spawned");

// Wait for clients with timeout
let start = std::time::Instant::now();
for client in clients {
let remaining = Duration::from_secs(10).saturating_sub(start.elapsed());
if remaining.is_zero() {
panic!("Timeout: barrier not reached - clone_fd may not be working");
}
// Note: std thread::join doesn't have timeout, so we just join
// The barrier itself will block if clone_fd isn't working
client.join().expect("Client thread panicked");
}

assert!(
barrier_reached.load(Ordering::SeqCst),
"Barrier was never reached - clone_fd not enabling parallel execution"
);

drop(bg);
}
Loading