Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
348 changes: 345 additions & 3 deletions dragonfly-client/src/bin/dfget/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use path_absolutize::*;
use percent_encoding::percent_decode_str;
use std::collections::HashSet;
use std::path::{Component, Path, PathBuf};
use std::process::Command as StdCommand;
use std::sync::Arc;
use std::time::Duration;
use std::{cmp::min, fmt::Write};
Expand Down Expand Up @@ -346,6 +347,25 @@ struct Args {
)]
no_progress: bool,

#[arg(
long = "daemon",
default_value_t = false,
help = "Automatically start a dfdaemon process if one is not already running. \
If a dfdaemon configuration file exists, it will be used to start the daemon. \
If no configuration file exists, a default configuration will be generated. \
Use --daemon-manager-addr to specify the manager address for the generated configuration"
)]
daemon: bool,

#[arg(
long = "daemon-manager-addr",
help = "Specify the manager address for the dfdaemon when using --daemon. \
If a dfdaemon configuration file already exists, the address in the \
configuration file will be used and a warning will be printed if this \
value differs from the configured one"
)]
daemon_manager_addr: Option<String>,

#[arg(
short = 'l',
long,
Expand Down Expand Up @@ -414,8 +434,101 @@ async fn main() -> anyhow::Result<()> {
std::process::exit(1);
}

// Get dfdaemon download client.
let dfdaemon_download_client =
// Get dfdaemon download client. If --daemon is enabled, try to connect first
// and automatically start a dfdaemon process if it is not running.
let dfdaemon_download_client = if args.daemon {
match get_dfdaemon_download_client(args.endpoint.to_path_buf()).await {
Ok(client) => {
info!("connected to existing dfdaemon");
client
}
Err(err) => {
info!(
"dfdaemon is not available: {}, attempting to start dfdaemon automatically",
err
);

if let Err(err) =
start_dfdaemon(args.daemon_manager_addr.as_deref(), &args.endpoint).await
{
println!(
"{}{}{}Start Dfdaemon Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);

println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);

println!(
"{}{}{}Message:{} {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
);

println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);

std::process::exit(1);
}

match get_dfdaemon_download_client(args.endpoint.to_path_buf()).await {
Ok(client) => client,
Err(err) => {
println!(
"{}{}{}Connect Dfdaemon Failed!{}",
color::Fg(color::Red),
style::Italic,
style::Bold,
style::Reset
);

println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);

println!(
"{}{}{}Message:{} failed to connect to dfdaemon after starting it: {}",
color::Fg(color::Cyan),
style::Italic,
style::Bold,
style::Reset,
err,
);

println!(
"{}{}{}****************************************{}",
color::Fg(color::Black),
style::Italic,
style::Bold,
style::Reset
);

std::process::exit(1);
}
}
}
}
} else {
match get_dfdaemon_download_client(args.endpoint.to_path_buf()).await {
Ok(client) => client,
Err(err) => {
Expand Down Expand Up @@ -455,7 +568,8 @@ async fn main() -> anyhow::Result<()> {

std::process::exit(1);
}
};
}
};

// Run dfget command.
if let Err(err) = run(args, dfdaemon_download_client).await {
Expand Down Expand Up @@ -1283,6 +1397,234 @@ fn make_output_by_entry(url: Url, output: &Path, entry: DirEntry) -> Result<Path
.into())
}

/// The default timeout for waiting for dfdaemon to become ready.
const DFDAEMON_READY_TIMEOUT: Duration = Duration::from_secs(30);

/// The interval between health check polls when waiting for dfdaemon to become ready.
const DFDAEMON_READY_POLL_INTERVAL: Duration = Duration::from_millis(500);

/// Finds the dfdaemon binary path.
///
/// This function searches for the dfdaemon binary in the following order:
/// 1. The same directory as the current dfget binary.
/// 2. The system PATH.
///
/// Returns an error if the dfdaemon binary cannot be found in any location.
fn find_dfdaemon_binary() -> Result<PathBuf> {
// Try the same directory as the current dfget binary first.
if let Ok(current_exe) = std::env::current_exe() {
if let Some(dir) = current_exe.parent() {
let dfdaemon_path = dir.join("dfdaemon");
if dfdaemon_path.exists() {
info!(
"found dfdaemon binary in the same directory as dfget: {}",
dfdaemon_path.display()
);
return Ok(dfdaemon_path);
}
}
}

// Fall back to searching in the system PATH.
if let Ok(output) = StdCommand::new("which").arg("dfdaemon").output() {
if output.status.success() {
let path = String::from_utf8_lossy(&output.stdout).trim().to_string();
if !path.is_empty() {
info!("found dfdaemon binary in PATH: {}", path);
return Ok(PathBuf::from(path));
}
}
}

Err(Error::Unknown(
"dfdaemon binary not found, please ensure dfdaemon is installed and available in PATH or the same directory as dfget".to_string(),
))
}

/// Ensures a dfdaemon configuration file exists and returns its path.
///
/// This function handles the following scenarios:
/// 1. If the configuration file already exists and --daemon-manager-addr is specified
/// with a different address, a warning is printed.
/// 2. If the configuration file does not exist and --daemon-manager-addr is specified,
/// a default configuration with the specified manager address is generated.
/// 3. If the configuration file does not exist and --daemon-manager-addr is not specified,
/// a default configuration with an empty manager address is generated.
async fn ensure_dfdaemon_config(daemon_manager_addr: Option<&str>) -> Result<PathBuf> {
let config_path = dfdaemon::default_dfdaemon_config_path();

if config_path.exists() {
info!("using existing dfdaemon config: {}", config_path.display());

// If --daemon-manager-addr is specified, check if it matches the configured address.
if let Some(addr) = daemon_manager_addr {
match dfdaemon::Config::load(&config_path).await {
Ok(config) => {
if !config.manager.addr.is_empty() && config.manager.addr != addr {
warn!(
"--daemon-manager-addr={} differs from the address in config file: {}. \
Using the address from the config file. To change the manager address, \
please modify the config file: {}",
addr,
config.manager.addr,
config_path.display(),
);
}
}
Err(err) => {
warn!(
"failed to parse dfdaemon config {}: {}, proceeding with existing config",
config_path.display(),
err,
);
}
}
}

return Ok(config_path);
}

// Generate a default configuration file.
let manager_addr = daemon_manager_addr.unwrap_or_default();
info!(
"dfdaemon config not found at {}, generating default config with manager addr: {}",
config_path.display(),
if manager_addr.is_empty() {
"<empty>"
} else {
manager_addr
},
);

if let Some(parent) = config_path.parent() {
tokio::fs::create_dir_all(parent).await.map_err(|err| {
error!(
"failed to create config directory {}: {}",
parent.display(),
err
);
Error::Unknown(format!("failed to create config directory: {}", err))
})?;
}

let config_content = format!("manager:\n addr: {}\n", manager_addr,);

tokio::fs::write(&config_path, &config_content)
.await
.map_err(|err| {
error!(
"failed to write default dfdaemon config to {}: {}",
config_path.display(),
err
);
Error::Unknown(format!("failed to write default dfdaemon config: {}", err))
})?;

info!(
"generated default dfdaemon config at: {}",
config_path.display()
);

Ok(config_path)
}

/// Starts a dfdaemon process in the background and waits for it to become ready.
///
/// This function performs the following steps:
/// 1. Ensures a valid dfdaemon configuration file exists.
/// 2. Locates the dfdaemon binary.
/// 3. Removes any stale Unix socket file at the endpoint path.
/// 4. Spawns the dfdaemon process as a background daemon with the configuration.
/// 5. Waits for the dfdaemon to become ready by polling its health check endpoint.
///
/// The spawned dfdaemon process will continue running after dfget exits, avoiding
/// cold start overhead for subsequent downloads.
async fn start_dfdaemon(daemon_manager_addr: Option<&str>, endpoint: &Path) -> Result<()> {
let config_path = ensure_dfdaemon_config(daemon_manager_addr).await?;

let dfdaemon_bin = find_dfdaemon_binary()?;
info!(
"starting dfdaemon with binary: {}, config: {}",
dfdaemon_bin.display(),
config_path.display()
);

// Remove stale socket file if it exists, so dfdaemon can bind to the socket path cleanly.
if endpoint.exists() {
info!("removing stale socket file: {}", endpoint.display());
tokio::fs::remove_file(endpoint).await.ok();
}

let child = StdCommand::new(&dfdaemon_bin)
.arg("--config")
.arg(&config_path)
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.map_err(|err| {
error!(
"failed to start dfdaemon with binary {}: {}",
dfdaemon_bin.display(),
err
);
Error::Unknown(format!("failed to start dfdaemon: {}", err))
})?;

info!(
"dfdaemon process started with pid: {}, waiting for it to become ready",
child.id()
);

wait_for_dfdaemon_ready(endpoint).await
}

/// Waits for the dfdaemon to become ready by polling its health check endpoint.
///
/// This function repeatedly attempts to connect to the dfdaemon's Unix domain socket
/// and performs a health check until the service is ready or the timeout is exceeded.
/// It uses a fixed polling interval to avoid overwhelming the system during startup.
async fn wait_for_dfdaemon_ready(endpoint: &Path) -> Result<()> {
let start = std::time::Instant::now();

loop {
if start.elapsed() > DFDAEMON_READY_TIMEOUT {
error!(
"timeout waiting for dfdaemon to become ready after {:?}",
DFDAEMON_READY_TIMEOUT
);
return Err(Error::Unknown(format!(
"timeout waiting for dfdaemon to become ready after {:?}, please check dfdaemon logs for errors",
DFDAEMON_READY_TIMEOUT,
)));
}

// Try to connect and check health.
match HealthClient::new_unix(endpoint.to_path_buf()).await {
Ok(health_client) => {
if health_client.check_dfdaemon_download().await.is_ok() {
info!("dfdaemon is ready after {:?}", start.elapsed());
return Ok(());
}

debug!(
"dfdaemon health check failed, retrying in {:?}",
DFDAEMON_READY_POLL_INTERVAL
);
}
Err(_) => {
debug!(
"dfdaemon is not yet available at {}, retrying in {:?}",
endpoint.display(),
DFDAEMON_READY_POLL_INTERVAL
);
}
}

tokio::time::sleep(DFDAEMON_READY_POLL_INTERVAL).await;
}
}

/// Creates and validates a dfdaemon download client with health checking.
///
/// This function establishes a connection to the dfdaemon service via Unix domain socket
Expand Down
Loading