diff --git a/Cargo.lock b/Cargo.lock index 31f355e9..3a8a077f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5886,9 +5886,9 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vortex-protocol" -version = "0.1.3" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "168a69e99721884cbefd1cbed9e5019244c7b4be889a85b6dd0550dc22e43b0f" +checksum = "b4c967af430b0b6db1d1c5beae962c1965485aa7fdf229bcc301720b3360ab3c" dependencies = [ "bytes", "chrono", diff --git a/Cargo.toml b/Cargo.toml index 89227545..912ac224 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -111,7 +111,7 @@ bytes = "1.10" local-ip-address = "0.6.5" sysinfo = { version = "0.32.1", default-features = false, features = ["component", "disk", "network", "system", "user"] } leaky-bucket = "1.1.2" -vortex-protocol = "0.1.3" +vortex-protocol = "0.1.4" dashmap = "6.1.0" hostname = "^0.4" tonic-health = "0.12.3" diff --git a/dragonfly-client-storage/src/client/quic.rs b/dragonfly-client-storage/src/client/quic.rs index 6c1584f3..43fe23ed 100644 --- a/dragonfly-client-storage/src/client/quic.rs +++ b/dragonfly-client-storage/src/client/quic.rs @@ -30,6 +30,7 @@ use tokio::time; use tracing::{error, instrument, Span}; use vortex_protocol::{ tlv::{ + cache_piece_content, download_cache_piece::DownloadCachePiece, download_persistent_cache_piece::DownloadPersistentCachePiece, download_piece::DownloadPiece, error::Error as VortexError, persistent_cache_piece_content, piece_content, Tag, @@ -167,6 +168,63 @@ impl QUICClient { } } + /// Downloads a cache piece from the server using the vortex protocol. + /// + /// This is the main entry point for downloading a piece. It applies + /// a timeout based on the configuration and handles connection timeouts gracefully. + #[instrument(skip_all)] + pub async fn download_cache_piece( + &self, + number: u32, + task_id: &str, + ) -> ClientResult<(impl AsyncRead, u64, String)> { + time::timeout( + self.config.download.piece_timeout, + self.handle_download_cache_piece(number, task_id), + ) + .await + .inspect_err(|err| { + error!("connect timeout to {}: {}", self.addr, err); + })? + } + /// Internal handler for downloading a cache piece. + /// + /// This method performs the actual protocol communication: + /// 1. Creates a download piece request. + /// 2. Establishes QUIC connection and sends the request. + /// 3. Reads and validates the response header. + /// 4. Processes the piece content based on the response type. + #[instrument(skip_all)] + async fn handle_download_cache_piece( + &self, + number: u32, + task_id: &str, + ) -> ClientResult<(impl AsyncRead, u64, String)> { + let request: Bytes = Vortex::DownloadCachePiece( + Header::new_download_cache_piece(), + DownloadCachePiece::new(task_id.to_string(), number), + ) + .into(); + + let (mut reader, _writer) = self.connect_and_write_request(request).await?; + let header = self.read_header(&mut reader).await?; + match header.tag() { + Tag::CachePieceContent => { + let cache_piece_content: cache_piece_content::CachePieceContent = self + .read_piece_content(&mut reader, cache_piece_content::METADATA_LENGTH_SIZE) + .await?; + + let metadata = cache_piece_content.metadata(); + Ok((reader, metadata.offset, metadata.digest)) + } + Tag::Error => Err(self.read_error(&mut reader, header.length() as usize).await), + _ => Err(ClientError::Unknown(format!( + "unexpected tag: {:?}", + header.tag() + ))), + } + } + /// Establishes QUIC connection and writes a vortex protocol request. /// /// This is a low-level utility function that handles the QUIC connection diff --git a/dragonfly-client-storage/src/client/tcp.rs b/dragonfly-client-storage/src/client/tcp.rs index d4c5c243..4c7177eb 100644 --- a/dragonfly-client-storage/src/client/tcp.rs +++ b/dragonfly-client-storage/src/client/tcp.rs @@ -25,6 +25,7 @@ use tokio::time; use tracing::{error, instrument, Span}; use vortex_protocol::{ tlv::{ + cache_piece_content, download_cache_piece::DownloadCachePiece, download_persistent_cache_piece::DownloadPersistentCachePiece, download_piece::DownloadPiece, error::Error as VortexError, persistent_cache_piece_content, piece_content, Tag, @@ -162,6 +163,63 @@ impl TCPClient { } } + /// Downloads a cache piece from the server using the vortex protocol. + /// + /// This is the main entry point for downloading a piece. It applies + /// a timeout based on the configuration and handles connection timeouts gracefully. + #[instrument(skip_all)] + pub async fn download_cache_piece( + &self, + number: u32, + task_id: &str, + ) -> ClientResult<(impl AsyncRead, u64, String)> { + time::timeout( + self.config.download.piece_timeout, + self.handle_download_cache_piece(number, task_id), + ) + .await + .inspect_err(|err| { + error!("connect timeout to {}: {}", self.addr, err); + })? + } + /// Internal handler for downloading a cache piece. + /// + /// This method performs the actual protocol communication: + /// 1. Creates a download piece request. + /// 2. Establishes TCP connection and sends the request. + /// 3. Reads and validates the response header. + /// 4. Processes the piece content based on the response type. + #[instrument(skip_all)] + async fn handle_download_cache_piece( + &self, + number: u32, + task_id: &str, + ) -> ClientResult<(impl AsyncRead, u64, String)> { + let request: Bytes = Vortex::DownloadCachePiece( + Header::new_download_cache_piece(), + DownloadCachePiece::new(task_id.to_string(), number), + ) + .into(); + + let (mut reader, _writer) = self.connect_and_write_request(request).await?; + let header = self.read_header(&mut reader).await?; + match header.tag() { + Tag::CachePieceContent => { + let cache_piece_content: cache_piece_content::CachePieceContent = self + .read_piece_content(&mut reader, cache_piece_content::METADATA_LENGTH_SIZE) + .await?; + + let metadata = cache_piece_content.metadata(); + Ok((reader, metadata.offset, metadata.digest)) + } + Tag::Error => Err(self.read_error(&mut reader, header.length() as usize).await), + _ => Err(ClientError::Unknown(format!( + "unexpected tag: {:?}", + header.tag() + ))), + } + } + /// Establishes TCP connection and writes a vortex protocol request. /// /// This is a low-level utility function that handles the TCP connection diff --git a/dragonfly-client/src/grpc/dfdaemon_upload.rs b/dragonfly-client/src/grpc/dfdaemon_upload.rs index c9619b01..bbcb66e8 100644 --- a/dragonfly-client/src/grpc/dfdaemon_upload.rs +++ b/dragonfly-client/src/grpc/dfdaemon_upload.rs @@ -2071,6 +2071,16 @@ impl DfdaemonUploadClient { Ok(response) } + /// download_cache_piece provides the cache piece content for parent. + #[instrument(skip_all)] + pub async fn download_cache_piece( + &self, + _request: DownloadCachePieceRequest, + _timeout: Duration, + ) -> ClientResult { + todo!(); + } + /// download_persistent_cache_task downloads the persistent cache task. #[instrument(skip_all)] pub async fn download_persistent_cache_task( diff --git a/dragonfly-client/src/resource/piece.rs b/dragonfly-client/src/resource/piece.rs index aa471886..172c3908 100644 --- a/dragonfly-client/src/resource/piece.rs +++ b/dragonfly-client/src/resource/piece.rs @@ -377,6 +377,69 @@ impl Piece { self.storage.upload_piece(piece_id, task_id, range).await } + /// upload_cache_from_local_into_async_read uploads a single cache piece from local cache. + #[instrument(skip_all, fields(piece_id))] + pub async fn upload_cache_from_local_into_async_read( + &self, + piece_id: &str, + task_id: &str, + length: u64, + range: Option, + disable_rate_limit: bool, + ) -> Result { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + Span::current().record("piece_length", length); + + // Acquire the upload rate limiter. + if !disable_rate_limit { + self.upload_rate_limiter.acquire(length as usize).await; + } + + // Upload the cache piece content. + self.storage + .upload_cache_piece(piece_id, task_id, range) + .await + .inspect(|_| { + collect_upload_piece_traffic_metrics( + self.id_generator.task_type(task_id) as i32, + length, + ); + }) + } + + /// download_cache_from_local_into_async_read downloads a single piece from local cache. + #[instrument(skip_all, fields(piece_id))] + pub async fn download_cache_from_local_into_async_read( + &self, + piece_id: &str, + task_id: &str, + length: u64, + range: Option, + disable_rate_limit: bool, + is_prefetch: bool, + ) -> Result { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + Span::current().record("piece_length", length); + + // Acquire the download rate limiter. + if !disable_rate_limit { + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } + } + + // Upload the piece content. + self.storage + .upload_cache_piece(piece_id, task_id, range) + .await + } + /// download_from_local downloads a single piece from local cache. Fake the download piece /// from the local cache, just collect the metrics. #[instrument(skip_all)] @@ -673,6 +736,319 @@ impl Piece { } } + /// id generates a new cache piece id. + #[inline] + pub fn cache_id(&self, task_id: &str, number: u32) -> String { + self.storage.cache_piece_id(task_id, number) + } + + /// get_cache gets a cache piece from the local storage. + pub fn get_cache(&self, piece_id: &str) -> Result> { + self.storage.get_cache_piece(piece_id) + } + + /// download_cache_from_parent downloads a single piece from a parent. + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(piece_id))] + pub async fn download_cache_from_parent( + &self, + piece_id: &str, + host_id: &str, + task_id: &str, + number: u32, + length: u64, + parent: piece_collector::CollectedParent, + is_prefetch: bool, + ) -> Result { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + Span::current().record("piece_length", length); + + // Clean up residual piece metadata if error occurred. + let guard = scopeguard::guard((), |_| { + if let Some(err) = self.storage.download_cache_piece_failed(piece_id).err() { + error!("set cache piece metadata failed: {}", err) + }; + }); + + // Record the start of downloading piece. + let piece = self + .storage + .download_cache_piece_started(piece_id, number) + .await?; + + // If the piece is downloaded by the other thread, + // return the piece directly. + if piece.is_finished() { + info!("finished piece {} from local", piece_id); + scopeguard::ScopeGuard::into_inner(guard); + return Ok(piece); + } + + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } + + let (mut reader, offset, digest) = match ( + self.config.download.protocol.as_str(), + parent.download_ip, + parent.download_tcp_port, + parent.download_quic_port, + ) { + ("tcp", Some(ip), Some(port), _) => { + self.tcp_downloader + .download_cache_piece( + format!("{}:{}", ip, port).as_str(), + number, + host_id, + task_id, + ) + .await? + } + ("quic", Some(ip), _, Some(port)) => { + self.quic_downloader + .download_cache_piece( + format!("{}:{}", ip, port).as_str(), + number, + host_id, + task_id, + ) + .await? + } + _ => { + warn!("fall back to grpc downloader"); + let host = parent.host.clone().ok_or_else(|| { + error!("parent host is empty"); + Error::InvalidPeer(parent.id.clone()) + })?; + + self.grpc_downloader + .download_cache_piece( + format!("{}:{}", host.ip, host.port).as_str(), + number, + host_id, + task_id, + ) + .await + .inspect_err(|err| { + error!("download cache piece failed: {}", err); + })? + } + }; + + // Record the finish of downloading piece. + match self + .storage + .download_cache_piece_from_parent_finished( + piece_id, + task_id, + offset, + length, + digest.as_str(), + parent.id.as_str(), + &mut reader, + self.config.storage.write_piece_timeout, + ) + .await + { + Ok(piece) => { + collect_download_piece_traffic_metrics( + &TrafficType::RemotePeer, + self.id_generator.task_type(task_id) as i32, + length, + ); + + scopeguard::ScopeGuard::into_inner(guard); + Ok(piece) + } + Err(err) => { + error!("download cache piece finished: {}", err); + Err(err) + } + } + } + + /// download_cache_from_source downloads a single piece from the source. + #[allow(clippy::too_many_arguments)] + #[instrument(skip_all, fields(piece_id))] + pub async fn download_cache_from_source( + &self, + piece_id: &str, + task_id: &str, + number: u32, + url: &str, + offset: u64, + length: u64, + request_header: HeaderMap, + is_prefetch: bool, + object_storage: Option, + hdfs: Option, + ) -> Result { + // Span record the piece_id. + Span::current().record("piece_id", piece_id); + Span::current().record("piece_length", length); + + // Clean up residual piece metadata if error occurred. + let guard = scopeguard::guard((), |_| { + if let Some(err) = self.storage.download_cache_piece_failed(piece_id).err() { + error!("set cache piece metadata failed: {}", err) + }; + }); + + // Record the start of downloading piece. + let piece = self + .storage + .download_cache_piece_started(piece_id, number) + .await?; + + // If the piece is downloaded by the other thread, + // return the piece directly. + if piece.is_finished() { + info!("finished piece {} from local", piece_id); + return Ok(piece); + } + + if is_prefetch { + // Acquire the prefetch rate limiter. + self.prefetch_rate_limiter.acquire(length as usize).await; + } else { + // Acquire the download rate limiter. + self.download_rate_limiter.acquire(length as usize).await; + } + + // Add range header to the request by offset and length. + let mut request_header = request_header.clone(); + request_header.insert( + header::RANGE, + format!("bytes={}-{}", offset, offset + length - 1) + .parse() + .unwrap(), + ); + + // Download the piece from the source. + let backend = self.backend_factory.build(url).inspect_err(|err| { + error!("build backend failed: {}", err); + if let Some(err) = self.storage.download_cache_piece_failed(piece_id).err() { + error!("set piece metadata failed: {}", err) + }; + })?; + + // Record the start time. + let start_time = Instant::now(); + + // Collect the backend request started metrics. + collect_backend_request_started_metrics( + backend.scheme().as_str(), + http::Method::GET.as_str(), + ); + let mut response = backend + .get(GetRequest { + task_id: task_id.to_string(), + piece_id: piece_id.to_string(), + url: url.to_string(), + range: Some(Range { + start: offset, + length, + }), + http_header: Some(request_header), + timeout: self.config.download.piece_timeout, + client_cert: None, + object_storage, + hdfs, + }) + .await + .inspect_err(|err| { + // Collect the backend request failure metrics. + collect_backend_request_failure_metrics( + backend.scheme().as_str(), + http::Method::GET.as_str(), + ); + + // if the request is failed. + error!("backend get failed: {}", err); + if let Some(err) = self.storage.download_cache_piece_failed(piece_id).err() { + error!("set cache piece metadata failed: {}", err) + }; + })?; + + if !response.success { + // Collect the backend request failure metrics. + collect_backend_request_failure_metrics( + backend.scheme().as_str(), + http::Method::GET.as_str(), + ); + + // if the status code is not OK. + let mut buffer = String::new(); + response + .reader + .read_to_string(&mut buffer) + .await + .unwrap_or_default(); + + let error_message = response.error_message.unwrap_or_default(); + error!("backend get failed: {} {}", error_message, buffer.as_str()); + + self.storage.download_cache_piece_failed(piece_id)?; + return Err(Error::BackendError(Box::new(BackendError { + message: error_message, + status_code: Some(response.http_status_code.unwrap_or_default()), + header: Some(response.http_header.unwrap_or_default()), + }))); + } + + // Collect the backend request finished metrics. + collect_backend_request_finished_metrics( + backend.scheme().as_str(), + http::Method::GET.as_str(), + start_time.elapsed(), + ); + + // Record the finish of downloading piece. + match self + .storage + .download_cache_piece_from_source_finished( + piece_id, + task_id, + offset, + length, + &mut response.reader, + self.config.storage.write_piece_timeout, + ) + .await + { + Ok(piece) => { + collect_download_piece_traffic_metrics( + &TrafficType::BackToSource, + self.id_generator.task_type(task_id) as i32, + length, + ); + + scopeguard::ScopeGuard::into_inner(guard); + Ok(piece) + } + Err(err) => { + error!("download cache piece finished: {}", err); + Err(err) + } + } + } + + /// download_cache_from_local downloads a single piece from local cache. Fake the download piece + /// from the local cache, just collect the metrics. + #[instrument(skip_all)] + pub fn download_cache_from_local(&self, task_id: &str, length: u64) { + collect_download_piece_traffic_metrics( + &TrafficType::LocalPeer, + self.id_generator.task_type(task_id) as i32, + length, + ); + } + /// persistent_cache_id generates a new persistent cache piece id. #[inline] pub fn persistent_cache_id(&self, task_id: &str, number: u32) -> String { diff --git a/dragonfly-client/src/resource/piece_downloader.rs b/dragonfly-client/src/resource/piece_downloader.rs index 3c5a8920..6e95fc20 100644 --- a/dragonfly-client/src/resource/piece_downloader.rs +++ b/dragonfly-client/src/resource/piece_downloader.rs @@ -15,7 +15,9 @@ */ use crate::grpc::dfdaemon_upload::DfdaemonUploadClient; -use dragonfly_api::dfdaemon::v2::{DownloadPersistentCachePieceRequest, DownloadPieceRequest}; +use dragonfly_api::dfdaemon::v2::{ + DownloadCachePieceRequest, DownloadPersistentCachePieceRequest, DownloadPieceRequest, +}; use dragonfly_client_config::dfdaemon::Config; use dragonfly_client_core::{Error, Result}; use dragonfly_client_storage::{client::quic::QUICClient, client::tcp::TCPClient, metadata}; @@ -45,6 +47,15 @@ pub trait Downloader: Send + Sync { task_id: &str, ) -> Result<(Box, u64, String)>; + /// download_cache_piece downloads a cache piece from the other peer by different protocols. + async fn download_cache_piece( + &self, + addr: &str, + number: u32, + host_id: &str, + task_id: &str, + ) -> Result<(Box, u64, String)>; + /// download_persistent_cache_piece downloads a persistent cache piece from the other peer by different /// protocols. async fn download_persistent_cache_piece( @@ -238,6 +249,74 @@ impl Downloader for GRPCDownloader { Ok((Box::new(Cursor::new(content)), piece.offset, piece.digest)) } + /// download_cache_piece downloads a cache piece from the other peer by the gRPC protocol. + #[instrument(skip_all)] + async fn download_cache_piece( + &self, + addr: &str, + number: u32, + host_id: &str, + task_id: &str, + ) -> Result<(Box, u64, String)> { + let key = self.get_entry_key(addr); + let entry = self.get_client_entry(key.clone(), addr.to_string()).await?; + let request_guard = entry.request_guard(); + + let response = match entry + .client + .download_cache_piece( + DownloadCachePieceRequest { + host_id: host_id.to_string(), + task_id: task_id.to_string(), + piece_number: number, + }, + self.config.download.piece_timeout, + ) + .await + { + Ok(response) => response, + Err(err) => { + // If the request fails, it will drop the request guard and remove the client + // entry to avoid using the invalid client. + drop(request_guard); + self.remove_client_entry(key).await; + return Err(err); + } + }; + + let Some(piece) = response.piece else { + error!("resource piece is missing"); + return Err(Error::InvalidParameter); + }; + + let Some(content) = piece.content else { + error!("resource piece content is missing"); + return Err(Error::InvalidParameter); + }; + + // Calculate the digest of the piece metadata and compare it with the expected digest, + // it verifies the integrity of the piece metadata. + let piece_metadata = metadata::Piece { + number, + length: piece.length, + offset: piece.offset, + digest: piece.digest.clone(), + ..Default::default() + }; + + if let Some(expected_digest) = response.digest { + let digest = piece_metadata.calculate_digest(); + if expected_digest != digest { + return Err(Error::DigestMismatch( + expected_digest.to_string(), + digest.to_string(), + )); + } + } + + Ok((Box::new(Cursor::new(content)), piece.offset, piece.digest)) + } + /// download_persistent_cache_piece downloads a persistent cache piece from the other peer by /// the gRPC protocol. #[instrument(skip_all)] @@ -398,6 +477,31 @@ impl Downloader for QUICDownloader { } } + /// download_cache_piece downloads a cache piece from the other peer by the QUIC protocol. + #[instrument(skip_all)] + async fn download_cache_piece( + &self, + addr: &str, + number: u32, + _host_id: &str, + task_id: &str, + ) -> Result<(Box, u64, String)> { + let key = self.get_entry_key(addr); + let entry = self.get_client_entry(key.clone(), addr.to_string()).await?; + let request_guard = entry.request_guard(); + + match entry.client.download_cache_piece(number, task_id).await { + Ok((reader, offset, digest)) => Ok((Box::new(reader), offset, digest)), + Err(err) => { + // If the request fails, it will drop the request guard and remove the client + // entry to avoid using the invalid client. + drop(request_guard); + self.remove_client_entry(key).await; + Err(err) + } + } + } + /// download_persistent_cache_piece downloads a persistent cache piece from the other peer by /// the QUIC protocol. #[instrument(skip_all)] @@ -520,6 +624,31 @@ impl Downloader for TCPDownloader { } } + /// download_cache_piece downloads a cache piece from the other peer by the TCP protocol. + #[instrument(skip_all)] + async fn download_cache_piece( + &self, + addr: &str, + number: u32, + _host_id: &str, + task_id: &str, + ) -> Result<(Box, u64, String)> { + let key = self.get_entry_key(addr); + let entry = self.get_client_entry(key.clone(), addr.to_string()).await?; + let request_guard = entry.request_guard(); + + match entry.client.download_cache_piece(number, task_id).await { + Ok((reader, offset, digest)) => Ok((Box::new(reader), offset, digest)), + Err(err) => { + // If the request fails, it will drop the request guard and remove the client + // entry to avoid using the invalid client. + drop(request_guard); + self.remove_client_entry(key).await; + Err(err) + } + } + } + /// download_persistent_cache_piece downloads a persistent cache piece from the other peer by /// the TCP protocol. #[instrument(skip_all)]