diff --git a/proto/sync.proto b/proto/sync.proto index 86bf7ef..d26c824 100755 --- a/proto/sync.proto +++ b/proto/sync.proto @@ -397,6 +397,7 @@ message UploadFileRequest { int64 expected_revision = 16; // Expected server revision (0 for new files, or current revision for updates) ConflictInfo.ResolutionStrategy conflict_resolution = 17; // How to resolve conflicts (default: MANUAL) int64 client_timestamp = 18; // Client file modification time (Unix epoch seconds) + optional string operation_type = 19; // "CREATE", "UPDATE", "DELETE", "RENAME", "MOVE" } message UploadFileResponse { @@ -430,8 +431,9 @@ message UploadFileChunk { uint64 seq = 14; // sequential index starting at 0 bool last = 15; // true for the final chunk optional uint32 unix_permissions = 16; - int64 expected_revision = 17; // Expected server revision for streaming uploads + optional string operation_type = 17; // "CREATE", "UPDATE", "DELETE", "RENAME", "MOVE" (first chunk only) optional int64 client_timestamp = 18; // Client modification time (first chunk only) + optional int64 expected_revision = 19; // Expected server revision for streaming uploads } message DownloadFileRequest { @@ -549,8 +551,9 @@ message RenameFileRequest { int32 watcher_id = 8; // Watcher ID int64 timestamp = 9; // Rename timestamp (Unix seconds) int64 expected_revision = 10; // Expected server revision (0 to skip check, or current revision for validation) - ConflictInfo.ResolutionStrategy conflict_resolution = 11; // How to resolve conflicts (default: MANUAL) - google.protobuf.Timestamp updated_time = 12; // Client's last known update time for timestamp-based conflict resolution + optional string operation_type = 11; // e.g., "RENAME" or "MOVE" + ConflictInfo.ResolutionStrategy conflict_resolution = 12; // How to resolve conflicts (default: MANUAL) + google.protobuf.Timestamp updated_time = 13; // Client's last known update time for timestamp-based conflict resolution } // NEW: Rename file response @@ -1141,4 +1144,3 @@ message BatchOperationsResponse { int32 successful_operations = 5; int32 failed_operations = 6; } - diff --git a/src/domain/entities.rs b/src/domain/entities.rs index 8797453..f2e1b26 100644 --- a/src/domain/entities.rs +++ b/src/domain/entities.rs @@ -434,7 +434,7 @@ impl From for Device { Self { device_hash: model.device_hash, account_hash: model.account_hash, - device_name: model.user_id.clone(), // Use user_id as device name + device_name: model.device_name.clone(), device_type, last_seen: model.last_sync.timestamp(), is_active: model.is_active, @@ -458,9 +458,9 @@ impl From for crate::models::device::Device { }; Self { - user_id: entity.device_name, account_hash: entity.account_hash, device_hash: entity.device_hash, + device_name: entity.device_name, updated_at: chrono::Utc::now(), // Default to now registered_at: chrono::Utc .timestamp_opt(entity.created_at, 0) diff --git a/src/handlers/auth_handler.rs b/src/handlers/auth_handler.rs index 38995c3..61ed46f 100644 --- a/src/handlers/auth_handler.rs +++ b/src/handlers/auth_handler.rs @@ -503,7 +503,8 @@ impl AuthHandler { let new_device = Device::new( account_hash.to_string(), device_hash.to_string(), - true, // activate + device_hash.to_string(), // Use device_hash as default device_name + true, // activate os_version.to_string(), app_version.to_string(), ); diff --git a/src/handlers/device_handler.rs b/src/handlers/device_handler.rs index 37a483f..fb1d2c1 100644 --- a/src/handlers/device_handler.rs +++ b/src/handlers/device_handler.rs @@ -161,6 +161,7 @@ impl DeviceHandler { let device = Device::new( server_account_hash.clone(), req.device_hash.clone(), + req.device_name.clone(), req.is_active, req.os_version.clone(), req.app_version.clone(), @@ -302,6 +303,7 @@ impl DeviceHandler { let new_device = Device::new( req.account_hash.clone(), req.device_hash.clone(), + req.device_name.clone(), req.is_active, req.os_version.clone(), req.app_version.clone(), @@ -374,7 +376,7 @@ impl DeviceHandler { seconds: device.last_sync.timestamp(), nanos: 0, }), - device_name: device.user_id.clone(), + device_name: device.device_name.clone(), }) .collect(); diff --git a/src/handlers/file/rename.rs b/src/handlers/file/rename.rs index fac9879..5f20762 100644 --- a/src/handlers/file/rename.rs +++ b/src/handlers/file/rename.rs @@ -50,6 +50,46 @@ pub async fn handle_rename_file( current_revision: 0, })); } + // Use normalized account hash for all downstream operations/broadcasts + let account_hash = server_account_hash; + + // Map client group/watcher IDs to server IDs for all downstream operations + let (server_group_id, server_watcher_id) = match handler + .app_state + .file + .convert_client_ids_to_server(&account_hash, req.group_id, req.watcher_id) + .await + { + Ok(Some(ids)) => ids, + Ok(None) => { + error!( + "Failed to map client watcher IDs to server IDs: account={}, client_group={}, client_watcher={}", + account_hash, req.group_id, req.watcher_id + ); + return Ok(Response::new(RenameFileResponse { + success: false, + return_message: "Watcher mapping not found on server".to_string(), + new_revision: 0, + conflict: None, + error_code: ErrorCode::DbError as i32, + current_revision: 0, + })); + } + Err(e) => { + error!( + "Error converting client IDs to server IDs: account={}, client_group={}, client_watcher={}, error={}", + account_hash, req.group_id, req.watcher_id, e + ); + return Ok(Response::new(RenameFileResponse { + success: false, + return_message: format!("Failed to resolve watcher mapping: {}", e), + new_revision: 0, + conflict: None, + error_code: ErrorCode::DbError as i32, + current_revision: 0, + })); + } + }; // 2. Check if file exists and verify old_file_path matches let file_info = match handler.app_state.file.get_file_info(req.file_id).await { @@ -108,13 +148,37 @@ pub async fn handle_rename_file( .app_state .file .find_file_by_path( - &req.account_hash, - req.group_id, - req.watcher_id, + &account_hash, + server_group_id, + server_watcher_id, &req.new_file_path, ) .await { + // Fix P2 #5: Idempotency check + // If file at target path is the SAME file_id, consider it a successful retry + if existing.file_id == req.file_id { + info!( + "Idempotency check: File {} is already at target path {}", + req.file_id, req.new_file_path + ); + + // If client provided expected_revision, check if we already met it or exceeded it + if req.expected_revision > 0 && existing.revision != req.expected_revision { + warn!("Idempotency warning: File is at target path but revision mismatch (current={}, expected={})", existing.revision, req.expected_revision); + // We can still treat it as success if we assume the client missed the response + } + + return Ok(Response::new(RenameFileResponse { + success: true, + return_message: "File renamed successfully (idempotent)".to_string(), + new_revision: existing.revision, + conflict: None, + error_code: ErrorCode::Success as i32, + current_revision: existing.revision, + })); + } + if existing.file_id != req.file_id { warn!( "Target path '{}' already exists (file_id: {})", @@ -138,12 +202,12 @@ pub async fn handle_rename_file( // 4. Determine if this is a cross-watcher MOVE let is_cross_watcher_move = - req.group_id != file_info.group_id || req.watcher_id != file_info.watcher_id; + server_group_id != file_info.group_id || server_watcher_id != file_info.watcher_id; if is_cross_watcher_move { info!( "Cross-watcher MOVE request: file_id={}, from group={}/watcher={} to group={}/watcher={}", - req.file_id, file_info.group_id, file_info.watcher_id, req.group_id, req.watcher_id + req.file_id, file_info.group_id, file_info.watcher_id, server_group_id, server_watcher_id ); } @@ -157,12 +221,12 @@ pub async fn handle_rename_file( // Pass new group_id/watcher_id for cross-watcher MOVE, None for same-watcher rename let new_group_id = if is_cross_watcher_move { - Some(req.group_id) + Some(server_group_id) } else { None }; let new_watcher_id = if is_cross_watcher_move { - Some(req.watcher_id) + Some(server_watcher_id) } else { None }; @@ -204,7 +268,7 @@ pub async fn handle_rename_file( req.file_id, &req.old_file_path, &req.new_file_path, - &req.account_hash, + &account_hash, &req.device_hash, req.timestamp, new_revision, @@ -216,15 +280,30 @@ pub async fn handle_rename_file( } // 6. Broadcast rename notification to other devices + // Fix P0 #3: Include updated file info with Server IDs + let mut broadcast_file_info = file_info.clone(); + broadcast_file_info.file_path = req.new_file_path.clone(); + broadcast_file_info.revision = new_revision; + broadcast_file_info.updated_time.seconds = req.timestamp; + broadcast_file_info.updated_time.nanos = 0; + + if let Some(gid) = new_group_id { + broadcast_file_info.group_id = gid; + } + if let Some(wid) = new_watcher_id { + broadcast_file_info.watcher_id = wid; + } + if let Err(e) = handler .broadcast_rename_notification( - &req.account_hash, + &account_hash, &req.device_hash, req.file_id, &req.old_file_path, &req.new_file_path, new_revision, req.timestamp, + Some(broadcast_file_info), ) .await { @@ -313,7 +392,7 @@ pub async fn handle_rename_file( req.file_id, &req.old_file_path, &req.new_file_path, - &req.account_hash, + &account_hash, &req.device_hash, req.timestamp, new_revision, @@ -323,16 +402,31 @@ pub async fn handle_rename_file( warn!("Failed to record rename history: {}", e); } + // Fix P0 #3: Include updated file info with Server IDs + let mut broadcast_file_info = file_info.clone(); + broadcast_file_info.file_path = req.new_file_path.clone(); + broadcast_file_info.revision = new_revision; + broadcast_file_info.updated_time.seconds = req.timestamp; + broadcast_file_info.updated_time.nanos = 0; + + if let Some(gid) = new_group_id { + broadcast_file_info.group_id = gid; + } + if let Some(wid) = new_watcher_id { + broadcast_file_info.watcher_id = wid; + } + // Broadcast rename notification to other devices if let Err(e) = handler .broadcast_rename_notification( - &req.account_hash, + &account_hash, &req.device_hash, req.file_id, &req.old_file_path, &req.new_file_path, new_revision, req.timestamp, + Some(broadcast_file_info), ) .await { @@ -414,19 +508,23 @@ pub async fn handle_rename_file( || error_msg.contains("File not found") || error_msg.contains("already deleted"); + let is_path_conflict = error_msg.contains("Path conflict"); + error!( "Rename operation failed - file_id: {}, old_path: {}, new_path: {}, error: {}, error_type: {:?}, is_permanent: {}", req.file_id, req.old_file_path, req.new_file_path, error_msg, - if is_permanent_error { "permanent" } else if is_not_found { "not_found" } else { "temporary" }, + if is_permanent_error { "permanent" } else if is_not_found { "not_found" } else if is_path_conflict { "path_conflict" } else { "temporary" }, is_permanent_error ); // Format error message for client let client_error_msg = if is_not_found { format!("File not found: already deleted") + } else if is_path_conflict { + format!("Target path already exists (concurrent operation)") } else if is_permanent_error { format!("DB schema error: column type mismatch - {}", error_msg) } else { @@ -435,6 +533,8 @@ pub async fn handle_rename_file( let error_code = if is_not_found { ErrorCode::FileNotFound + } else if is_path_conflict { + ErrorCode::PathConflict } else if is_permanent_error { ErrorCode::DbSchemaError } else { diff --git a/src/handlers/file_handler.rs b/src/handlers/file_handler.rs index e8fc804..362b2fc 100644 --- a/src/handlers/file_handler.rs +++ b/src/handlers/file_handler.rs @@ -45,6 +45,10 @@ impl FileHandler { info!(" watcher_id: {}", req.watcher_id); info!(" file_path: {}", req.file_path); info!(" file_data length: {} bytes", req.file_data.len()); + info!( + " operation_type: {}", + req.operation_type.as_deref().unwrap_or("") + ); let key_id_log = if req.key_id.is_empty() { "" } else { @@ -455,6 +459,7 @@ impl FileHandler { new_path: &str, revision: i64, timestamp: i64, + file_info: Option, ) -> Result<(), String> { use crate::sync::{FileUpdateNotification, RenameInfo}; @@ -470,7 +475,7 @@ impl FileHandler { let notification = FileUpdateNotification { account_hash: account_hash.to_string(), device_hash: device_hash.to_string(), - file_info: None, // Rename doesn't need full file info + file_info: file_info.map(|fi| fi.to_sync_file()), // Use provided file_info (with Server IDs) update_type: crate::sync::file_update_notification::UpdateType::Renamed as i32, timestamp, rename_info: Some(rename_info), diff --git a/src/models/device.rs b/src/models/device.rs index 0e5a142..482727d 100644 --- a/src/models/device.rs +++ b/src/models/device.rs @@ -5,11 +5,12 @@ use serde::{Deserialize, Serialize}; /// User device information #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct Device { - /// Device name (should be unique within the system) - pub user_id: String, + /// Account hash pub account_hash: String, /// Device unique identifier pub device_hash: String, + /// Device name (user-friendly name for the device) + pub device_name: String, pub updated_at: DateTime, // last updated time pub registered_at: DateTime, // Registered time /// Last time device synced data @@ -29,9 +30,9 @@ pub struct DeviceInfo { pub device_hash: String, pub account_hash: String, /// Device name + pub device_name: String, pub is_active: bool, - - /// Device hash + /// OS version pub os_version: String, pub app_version: String, pub registered_at: DateTime, @@ -44,6 +45,7 @@ impl From<&Device> for DeviceInfo { Self { device_hash: device.device_hash.clone(), account_hash: device.account_hash.clone(), + device_name: device.device_name.clone(), is_active: device.is_active, os_version: device.os_version.clone(), app_version: device.app_version.clone(), @@ -58,11 +60,11 @@ impl From<&DeviceInfo> for Device { fn from(info: &DeviceInfo) -> Self { let now = Utc::now(); Self { - user_id: String::new(), + account_hash: info.account_hash.clone(), + device_hash: info.device_hash.clone(), + device_name: info.device_name.clone(), updated_at: info.last_sync_time.unwrap_or(now), registered_at: info.registered_at, - device_hash: info.device_hash.clone(), - account_hash: info.account_hash.clone(), last_sync: info.last_sync_time.unwrap_or(now), is_active: info.is_active, os_version: info.os_version.clone(), @@ -94,11 +96,11 @@ impl From for Device { .unwrap_or(now); Self { - user_id: String::new(), + account_hash: info.account_hash, + device_hash: info.device_hash, + device_name: info.device_name, updated_at: last_sync, registered_at, - device_hash: info.device_hash, - account_hash: info.account_hash, last_sync, is_active: info.is_active, os_version: info.os_version, @@ -120,15 +122,16 @@ impl Device { pub fn new( account_hash: String, device_hash: String, + device_name: String, is_active: bool, os_version: String, app_version: String, ) -> Self { let now = Utc::now(); Self { - user_id: account_hash.clone(), account_hash, device_hash, + device_name, is_active, os_version, app_version, @@ -209,7 +212,7 @@ impl From<&Device> for sync::DeviceInfo { app_version: device.app_version.clone(), registered_at, last_sync_time, - device_name: device.user_id.clone(), + device_name: device.device_name.clone(), } } } diff --git a/src/models/file.rs b/src/models/file.rs index f4a0ce5..0517724 100644 --- a/src/models/file.rs +++ b/src/models/file.rs @@ -147,6 +147,7 @@ pub struct FileInfo { pub size: u64, pub key_id: Option, pub unix_permissions: Option, + pub operation_type: Option, } impl FileInfo { @@ -201,6 +202,7 @@ impl From for FileInfo { Some(proto.key_id) }, unix_permissions: proto.unix_permissions, + operation_type: None, } } } diff --git a/src/server/notification_manager.rs b/src/server/notification_manager.rs index 885a9b3..72b32cc 100644 --- a/src/server/notification_manager.rs +++ b/src/server/notification_manager.rs @@ -180,6 +180,28 @@ impl NotificationManager { notification: FileUpdateNotification, exclude_source: bool, ) -> Result { + // 안전장치: 최신 리비전만 전송 + if let Some(fi) = ¬ification.file_info { + if let Ok((exists, _is_deleted, latest_rev)) = + self.storage.latest_revision_state(fi.file_id).await + { + if !exists { + debug!( + "Skipping broadcast: file_id={} no longer exists in storage", + fi.file_id + ); + return Ok(0); + } + if fi.revision < latest_rev { + debug!( + "Skipping broadcast: stale revision (got {}, latest {}) for file_id={}", + fi.revision, latest_rev, fi.file_id + ); + return Ok(0); + } + } + } + let source_device = notification.device_hash.clone(); let account_hash = notification.account_hash.clone(); diff --git a/src/server/service.rs b/src/server/service.rs index 49bab97..97a3a81 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -496,9 +496,10 @@ impl SyncService for SyncServiceImpl { file_size: meta.file_size, key_id: meta.key_id, unix_permissions: meta.unix_permissions, - expected_revision: meta.expected_revision, + expected_revision: meta.expected_revision.unwrap_or(0), conflict_resolution: 0, // Default to MANUAL client_timestamp: meta.client_timestamp.unwrap_or(0), + operation_type: meta.operation_type, }; self.file_handler diff --git a/src/services/device_service.rs b/src/services/device_service.rs index f83ff76..6c7fc57 100644 --- a/src/services/device_service.rs +++ b/src/services/device_service.rs @@ -164,7 +164,7 @@ impl DeviceService { seconds: device.last_sync.timestamp(), nanos: 0, }), - device_name: device.user_id.clone(), + device_name: device.device_name.clone(), } } @@ -188,11 +188,11 @@ impl DeviceService { }) .unwrap_or(now); Device { - user_id: device_info.device_name, + account_hash: device_info.account_hash, + device_hash: device_info.device_hash, + device_name: device_info.device_name, updated_at: last_sync, registered_at, - device_hash: device_info.device_hash, - account_hash: device_info.account_hash, last_sync, is_active: device_info.is_active, os_version: device_info.os_version, diff --git a/src/services/file_service.rs b/src/services/file_service.rs index f8856b6..09c52fc 100644 --- a/src/services/file_service.rs +++ b/src/services/file_service.rs @@ -241,6 +241,13 @@ impl FileService { Some(req.key_id.clone()) }, unix_permissions: req.unix_permissions, + operation_type: req.operation_type.as_ref().and_then(|op| { + if op.is_empty() { + None + } else { + Some(op.clone()) + } + }), } } diff --git a/src/services/version_service.rs b/src/services/version_service.rs index e51de41..1f311fd 100644 --- a/src/services/version_service.rs +++ b/src/services/version_service.rs @@ -130,6 +130,7 @@ impl VersionServiceImpl { is_encrypted: file.is_encrypted, key_id: None, unix_permissions: None, // SyncFile doesn't have unix_permissions field + operation_type: None, } } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 07303a4..5a5aa67 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -1049,6 +1049,18 @@ impl Storage for MemoryStorage { } } + async fn latest_revision_state( + &self, + file_id: u64, + ) -> crate::storage::Result<(bool, bool, i64)> { + let data = self.data.lock().await; + if let Some(file) = data.files.get(&file_id) { + Ok((true, false, file.revision)) + } else { + Ok((false, false, 0)) + } + } + /// Rename file (stub implementation for MemoryStorage) async fn rename_file( &self, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 5c2189e..6a26a3f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -382,6 +382,9 @@ pub trait Storage: Sync + Send { ) -> Result>; async fn check_file_exists(&self, file_id: u64) -> Result<(bool, bool)>; + /// Get latest revision state (exists, is_deleted flag, latest revision number) for a file_id + async fn latest_revision_state(&self, file_id: u64) -> Result<(bool, bool, i64)>; + /// Rename file (update path, device_hash, and revision) /// If new_group_id/new_watcher_id are provided, also updates those (cross-watcher MOVE) async fn rename_file( diff --git a/src/storage/mysql.rs b/src/storage/mysql.rs index ace5b2a..8f143f5 100644 --- a/src/storage/mysql.rs +++ b/src/storage/mysql.rs @@ -581,6 +581,73 @@ impl MySqlStorage { .ok(); } + // Check if file_rename_history table exists (for improvement #7: Rename history index) + let has_rename_history: bool = sqlx::query_scalar( + r#"SELECT COUNT(*) > 0 FROM information_schema.tables + WHERE table_schema = DATABASE() AND table_name = 'file_rename_history'"#, + ) + .fetch_one(self.get_sqlx_pool()) + .await + .unwrap_or(false); + + if !has_rename_history { + info!("Creating file_rename_history table"); + let create_history_table = r#" + CREATE TABLE file_rename_history ( + id BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, + file_id BIGINT UNSIGNED NOT NULL, + old_path VARBINARY(2048) NOT NULL, + new_path VARBINARY(2048) NOT NULL, + account_hash VARCHAR(255) NOT NULL, + device_hash VARCHAR(255) NOT NULL, + rename_timestamp BIGINT NOT NULL, + revision BIGINT NOT NULL, + INDEX (file_id), + INDEX (account_hash), + INDEX (rename_timestamp), + INDEX (file_id, revision) + )"#; + sqlx::query(create_history_table) + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| { + StorageError::Database(format!( + "Failed to create file_rename_history table: {}", + e + )) + })?; + info!("✅ file_rename_history table created with indexes"); + } else { + // Ensure indexes exist for file_rename_history (Improvement #7) + let has_timestamp_idx: bool = sqlx::query_scalar( + r#"SELECT COUNT(*) > 0 FROM information_schema.statistics + WHERE table_schema = DATABASE() AND table_name = 'file_rename_history' AND index_name = 'rename_timestamp'"# + ).fetch_one(self.get_sqlx_pool()).await.unwrap_or(false); + + if !has_timestamp_idx { + info!("Adding index on rename_timestamp to file_rename_history"); + sqlx::query( + r#"CREATE INDEX rename_timestamp ON file_rename_history(rename_timestamp)"#, + ) + .execute(self.get_sqlx_pool()) + .await + .ok(); + } + + let has_revision_idx: bool = sqlx::query_scalar( + r#"SELECT COUNT(*) > 0 FROM information_schema.statistics + WHERE table_schema = DATABASE() AND table_name = 'file_rename_history' AND column_name = 'revision'"# + ).fetch_one(self.get_sqlx_pool()).await.unwrap_or(false); + + if !has_revision_idx { + info!("Adding composite index (file_id, revision) to file_rename_history"); + sqlx::query(r#"CREATE INDEX idx_history_file_revision ON file_rename_history(file_id, revision)"#) + .execute(self.get_sqlx_pool()) + .await + .ok(); + } + } + let has_server_group_id: bool = sqlx::query_scalar( r#"SELECT COUNT(*) > 0 FROM information_schema.columns WHERE table_schema = DATABASE() AND table_name = 'files' AND column_name = 'server_group_id'"# @@ -609,6 +676,35 @@ impl MySqlStorage { })?; } + // Optional backward link for rename history + let has_prev_file_id: bool = sqlx::query_scalar( + r#"SELECT COUNT(*) > 0 + FROM information_schema.columns + WHERE table_schema = DATABASE() + AND table_name = 'files' + AND column_name = 'prev_file_id'"#, + ) + .fetch_one(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to check prev_file_id column existence: {}", e); + StorageError::Database(format!( + "Failed to check prev_file_id column existence: {}", + e + )) + })?; + + if !has_prev_file_id { + info!("Adding prev_file_id column to files table"); + sqlx::query(r#"ALTER TABLE files ADD COLUMN prev_file_id BIGINT NULL"#) + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to add prev_file_id column: {}", e); + StorageError::Database(format!("Failed to add prev_file_id column: {}", e)) + })?; + } + // Modify file_path/filename column format (VARBINARY) let path_is_varbinary: bool = sqlx::query_scalar( r#"SELECT DATA_TYPE = 'varbinary' FROM information_schema.columns @@ -1665,6 +1761,11 @@ impl Storage for MySqlStorage { MySqlFileExt::check_file_exists(self, file_id).await } + /// Latest revision state helper + async fn latest_revision_state(&self, file_id: u64) -> Result<(bool, bool, i64)> { + MySqlFileExt::latest_revision_state(self, file_id).await + } + /// Rename file (update path, device_hash, and revision) async fn rename_file( &self, @@ -1678,12 +1779,18 @@ impl Storage for MySqlStorage { ) -> Result { use sqlx::Row; - // First, get file information to obtain account_hash, group_id, watcher_id for encryption and current revision + // 1. Fetch file info (Read-only, no transaction yet) - Fix P1 #4 (Transaction scope) let row_opt = sqlx::query( - r#"SELECT account_hash, group_id, watcher_id, revision FROM files WHERE file_id = ? LIMIT 1"#, + r#"SELECT account_hash, group_id, watcher_id, revision, file_path, filename, + device_hash, file_hash, size, key_id, unix_permissions, + server_group_id, server_watcher_id, eq_index, token_path, operation_type + FROM files + WHERE file_id = ? + ORDER BY revision DESC, id DESC + LIMIT 1"#, ) .bind(file_id) - .fetch_optional(&self.sqlx_pool) + .fetch_optional(self.get_sqlx_pool()) .await .map_err(|e| { StorageError::Database(format!( @@ -1692,13 +1799,60 @@ impl Storage for MySqlStorage { )) })?; - let (account_hash, current_group_id, current_watcher_id, current_revision) = match row_opt { + let ( + account_hash, + current_group_id, + current_watcher_id, + current_revision, + old_file_path_bytes, + old_filename_bytes, + old_device_hash, + old_file_hash, + old_size, + old_key_id, + old_unix_permissions, + old_server_group_id, + old_server_watcher_id, + old_eq_index, + old_token_path, + old_operation_type, + ) = match row_opt { Some(row) => { let account_hash: String = row.try_get("account_hash").unwrap_or_default(); let group_id: i32 = row.try_get("group_id").unwrap_or(0); let watcher_id: i32 = row.try_get("watcher_id").unwrap_or(0); let current_revision: i64 = row.try_get("revision").unwrap_or(0); - (account_hash, group_id, watcher_id, current_revision) + let file_path: Vec = row.try_get("file_path").unwrap_or_default(); + let filename: Vec = row.try_get("filename").unwrap_or_default(); + let device_hash_old: String = row.try_get("device_hash").unwrap_or_default(); + let file_hash: String = row.try_get("file_hash").unwrap_or_default(); + let size: i64 = row.try_get("size").unwrap_or(0); + let key_id: Option = row.try_get("key_id").ok(); + let unix_permissions: Option = row.try_get("unix_permissions").ok(); + let server_group_id: i32 = row.try_get("server_group_id").unwrap_or(0); + let server_watcher_id: i32 = row.try_get("server_watcher_id").unwrap_or(0); + let eq_index: Vec = row.try_get("eq_index").unwrap_or_default(); + let token_path: Vec = row.try_get("token_path").unwrap_or_default(); + let op_type: String = row.try_get("operation_type").unwrap_or_default(); + + ( + account_hash, + group_id, + watcher_id, + current_revision, + file_path, + filename, + device_hash_old, + file_hash, + size, + key_id, + unix_permissions, + server_group_id, + server_watcher_id, + eq_index, + token_path, + op_type, + ) } None => { return Ok(RenameFileResult::FileNotFound); @@ -1719,7 +1873,7 @@ impl Storage for MySqlStorage { ); } - // Validate revision if expected_revision is provided + // Validate revision if expected_revision is provided (Optimistic check before heavy work) if let Some(expected_rev) = expected_revision { if expected_rev > 0 && current_revision != expected_rev { warn!( @@ -1733,49 +1887,213 @@ impl Storage for MySqlStorage { } } - // Encrypt new_file_path for VARBINARY storage (use final group_id/watcher_id for AAD) + // 2. Compute encryption/indices (Cpu intensive, outside TX) - Fix P1 #4 let cfg = crate::server::app_state::AppState::get_config(); - let new_file_path_bytes = if let Some(kv) = cfg.server_encode_key.as_ref() { + let new_filename = new_file_path + .rsplit('/') + .next() + .unwrap_or(new_file_path) + .to_string(); + let (new_file_path_bytes, new_filename_bytes, eq_index, token_path) = if let Some(kv) = + cfg.server_encode_key.as_ref() + { if kv.len() == 32 { let key: &[u8; 32] = kv.as_slice().try_into().expect("len checked"); let aad = format!("{}:{}:{}", account_hash, final_group_id, final_watcher_id); - crate::utils::crypto::aead_encrypt(key, new_file_path.as_bytes(), aad.as_bytes()) + let ct_path = crate::utils::crypto::aead_encrypt( + key, + new_file_path.as_bytes(), + aad.as_bytes(), + ); + let ct_name = crate::utils::crypto::aead_encrypt( + key, + new_filename.as_bytes(), + aad.as_bytes(), + ); + let salt = crate::utils::crypto::derive_salt(key, "meta-index", &account_hash); + let eq_index = crate::utils::crypto::make_eq_index(&salt, new_file_path); + let token_path = crate::utils::crypto::make_token_path(&salt, new_file_path); + (ct_path, ct_name, eq_index, token_path) } else { - new_file_path.as_bytes().to_vec() + let pathb = new_file_path.as_bytes().to_vec(); + let nameb = new_filename.as_bytes().to_vec(); + let eq = + crate::utils::crypto::make_eq_index(account_hash.as_bytes(), new_file_path); + let tp = + crate::utils::crypto::make_token_path(account_hash.as_bytes(), new_file_path); + (pathb, nameb, eq, tp) } } else { - new_file_path.as_bytes().to_vec() + let pathb = new_file_path.as_bytes().to_vec(); + let nameb = new_filename.as_bytes().to_vec(); + let eq = crate::utils::crypto::make_eq_index(account_hash.as_bytes(), new_file_path); + let tp = crate::utils::crypto::make_token_path(account_hash.as_bytes(), new_file_path); + (pathb, nameb, eq, tp) }; - // Update file_path, group_id, watcher_id with encrypted bytes (VARBINARY) and set operation_type - let operation_type = if is_cross_watcher_move { + // 3. Start transaction + let mut tx = self + .sqlx_pool + .begin() + .await + .map_err(|e| StorageError::Database(format!("Transaction start failed: {}", e)))?; + + // 4. Verify Revision (Double Check inside TX) + let current_rev_check: Option = sqlx::query_scalar( + "SELECT revision FROM files WHERE file_id = ? ORDER BY revision DESC LIMIT 1 FOR UPDATE" + ) + .bind(file_id) + .fetch_optional(&mut *tx) + .await + .map_err(|e| StorageError::Database(format!("Failed to verify revision: {}", e)))?; + + if let Some(rev) = current_rev_check { + if rev != current_revision { + let _ = tx.rollback().await; + return Ok(RenameFileResult::RevisionConflict { + server_revision: rev, + client_revision: current_revision, // What client sent (or what we thought it was) + }); + } + } else { + let _ = tx.rollback().await; + return Ok(RenameFileResult::FileNotFound); + } + + // 5. Check Path Conflict (P1 #6) + // Check if any active file exists at new path (using token_path for index lock) + // Use token_path to lock the path from concurrent operations + let conflict_check: Option = sqlx::query_scalar( + "SELECT 1 FROM files WHERE token_path = ? AND is_deleted = FALSE LIMIT 1 FOR UPDATE", + ) + .bind(&token_path) + .fetch_optional(&mut *tx) + .await + .map_err(|e| StorageError::Database(format!("Failed to check path conflict: {}", e)))?; + + if conflict_check.is_some() { + let _ = tx.rollback().await; + return Err(StorageError::Database(format!( + "Path conflict: active file already exists at target path" + ))); + } + + // Append-only: mark current active rows deleted and insert tombstone + new active rows + let tombstone_operation_type = if !old_operation_type.is_empty() { + old_operation_type.as_str() + } else if is_cross_watcher_move { "MOVE" } else { "RENAME" }; - let query = "UPDATE files SET file_path = ?, device_hash = ?, revision = ?, group_id = ?, watcher_id = ?, operation_type = ?, updated_time = NOW() WHERE file_id = ?"; - sqlx::query(query) - .bind(&new_file_path_bytes) - .bind(device_hash) - .bind(new_revision) - .bind(final_group_id) - .bind(final_watcher_id) - .bind(operation_type) + let now_ts = chrono::Utc::now().timestamp(); + + // Remove existing active rows for this file_id (tombstone will carry the old metadata) + sqlx::query(r#"DELETE FROM files WHERE file_id = ? AND is_deleted = FALSE"#) .bind(file_id) - .execute(&self.sqlx_pool) + .execute(&mut *tx) .await .map_err(|e| { StorageError::Database(format!( - "Failed to rename/move file (schema/VARBINARY type mismatch): {}", + "Failed to remove previous active revision before rename/move: {}", e )) })?; + // Tombstone row uses next revision, new active uses next+1 to avoid UNIQUE(file_id, revision) conflicts + let tombstone_revision = current_revision + 1; + let new_active_revision = tombstone_revision + 1; + + sqlx::query( + r#"INSERT INTO files ( + file_id, account_hash, device_hash, file_path, filename, file_hash, size, + encrypted_data, key_id, unix_permissions, server_group_id, server_watcher_id, + group_id, watcher_id, revision, is_deleted, operation_type, deleted_file_id, + created_time, updated_time, eq_index, token_path, prev_file_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, TRUE, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?, ?, NULL)"#, + ) + .bind(file_id as i64) + .bind(&account_hash) + .bind(&old_device_hash) + .bind(&old_file_path_bytes) + .bind(&old_filename_bytes) + .bind(&old_file_hash) + .bind(old_size) + .bind(old_key_id.as_deref()) + .bind(old_unix_permissions) + .bind(old_server_group_id) + .bind(old_server_watcher_id) + .bind(current_group_id) + .bind(current_watcher_id) + .bind(tombstone_revision) + .bind(tombstone_operation_type) + .bind(file_id) + .bind(now_ts) + .bind(now_ts) + .bind(&old_eq_index) + .bind(&old_token_path) + .execute(&mut *tx) + .await + .map_err(|e| { + StorageError::Database(format!( + "Failed to insert rename/move tombstone (append-only): {}", + e + )) + })?; + + // New active row with revision+1 (relative to tombstone) and new path + let operation_type = if is_cross_watcher_move { + "MOVE" + } else { + "RENAME" + }; + sqlx::query( + r#"INSERT INTO files ( + file_id, account_hash, device_hash, file_path, filename, file_hash, size, + encrypted_data, key_id, unix_permissions, server_group_id, server_watcher_id, + group_id, watcher_id, revision, is_deleted, operation_type, deleted_file_id, + created_time, updated_time, eq_index, token_path, prev_file_id + ) VALUES (?, ?, ?, ?, ?, ?, ?, NULL, ?, ?, ?, ?, ?, ?, ?, FALSE, ?, NULL, FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?, ?, NULL)"#, + ) + .bind(file_id as i64) + .bind(&account_hash) + .bind(device_hash) + .bind(&new_file_path_bytes) + .bind(&new_filename_bytes) + .bind(&old_file_hash) + .bind(old_size) + .bind(old_key_id.as_deref()) + .bind(old_unix_permissions) + .bind(final_group_id) + .bind(final_watcher_id) + .bind(final_group_id) + .bind(final_watcher_id) + .bind(new_active_revision) + .bind(operation_type) + .bind(now_ts) + .bind(now_ts) + .bind(&eq_index) + .bind(&token_path) + .execute(&mut *tx) + .await + .map_err(|e| { + StorageError::Database(format!( + "Failed to insert new active row for rename/move (append-only): {}", + e + )) + })?; + + tx.commit() + .await + .map_err(|e| StorageError::Database(format!("Transaction commit failed: {}", e)))?; + info!( - "File renamed in database: file_id={}, new_path={}, new_revision={}", - file_id, new_file_path, new_revision + "File renamed in database (append-only): file_id={}, new_path={}, new_revision={}", + file_id, new_file_path, new_active_revision ); - Ok(RenameFileResult::Success { new_revision }) + Ok(RenameFileResult::Success { + new_revision: new_active_revision, + }) } /// Record rename history for debugging and conflict detection diff --git a/src/storage/mysql_device.rs b/src/storage/mysql_device.rs index 806b080..1ae1145 100644 --- a/src/storage/mysql_device.rs +++ b/src/storage/mysql_device.rs @@ -67,18 +67,18 @@ impl MySqlDeviceExt for MySqlStorage { info!("Device already exists, updating: {}", device.device_hash); sqlx::query( - r#"UPDATE devices SET - device_name = ?, - device_type = ?, - os_type = ?, - os_version = ?, - app_version = ?, - last_sync = ?, + r#"UPDATE devices SET + device_name = ?, + device_type = ?, + os_type = ?, + os_version = ?, + app_version = ?, + last_sync = ?, updated_at = ?, is_active = ? WHERE account_hash = ? AND device_hash = ?"#, ) - .bind(&device.user_id) + .bind(&device.device_name) .bind("desktop") .bind("Linux") .bind(&device.os_version) @@ -109,15 +109,15 @@ impl MySqlDeviceExt for MySqlStorage { sqlx::query( r#"INSERT INTO devices ( - id, account_hash, device_hash, device_name, device_type, - os_type, os_version, app_version, last_sync, + id, account_hash, device_hash, device_name, device_type, + os_type, os_version, app_version, last_sync, created_at, updated_at, is_active ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#, ) .bind(&device_id) .bind(&device.account_hash) .bind(&device.device_hash) - .bind(&device.user_id) + .bind(&device.device_name) .bind("desktop") .bind("Linux") .bind(&device.os_version) @@ -208,9 +208,9 @@ impl MySqlDeviceExt for MySqlStorage { // Create Device object let device = Device { - user_id: device_name, account_hash: acc_hash, device_hash: dev_hash, + device_name, updated_at, registered_at: created_at, last_sync, @@ -284,9 +284,9 @@ impl MySqlDeviceExt for MySqlStorage { // Create Device object let device = Device { - user_id: device_name, account_hash: acc_hash, device_hash: dev_hash, + device_name, updated_at, registered_at: created_at, last_sync, @@ -313,16 +313,16 @@ impl MySqlDeviceExt for MySqlStorage { ); sqlx::query( - r#"UPDATE devices SET - device_name = ?, - os_version = ?, - app_version = ?, - last_sync = ?, + r#"UPDATE devices SET + device_name = ?, + os_version = ?, + app_version = ?, + last_sync = ?, updated_at = ?, is_active = ? WHERE account_hash = ? AND device_hash = ?"#, ) - .bind(&device.user_id) + .bind(&device.device_name) .bind(&device.os_version) .bind(&device.app_version) .bind(last_sync) diff --git a/src/storage/mysql_file.rs b/src/storage/mysql_file.rs index c56d5d8..17107c4 100644 --- a/src/storage/mysql_file.rs +++ b/src/storage/mysql_file.rs @@ -101,6 +101,9 @@ pub trait MySqlFileExt { /// Check file existence and deletion status by file ID async fn check_file_exists(&self, file_id: u64) -> Result<(bool, bool)>; + /// Latest revision state for a file_id (exists, is_deleted, latest_revision) + async fn latest_revision_state(&self, file_id: u64) -> Result<(bool, bool, i64)>; + /// TTL-based physical deletion: remove records where is_deleted=1 and updated_time is before NOW()-ttl seconds async fn purge_deleted_files_older_than(&self, ttl_secs: i64) -> Result; @@ -122,107 +125,50 @@ impl MySqlFileExt for MySqlStorage { let mut tx = self.get_sqlx_pool().begin().await.map_err(|e| { StorageError::Database(format!("Transaction start failed(sqlx): {}", e)) })?; - // First check if a file exists with the file_id - let existing_by_file_id: Option<(u64, i64)> = sqlx::query_as( - r#"SELECT file_id, revision FROM files WHERE file_id = ? AND is_deleted = FALSE LIMIT 1"# + + // Latest revision (including deleted) for this file_id + let latest_rev_row: Option<(i64, bool)> = sqlx::query_as( + r#"SELECT revision, is_deleted + FROM files + WHERE file_id = ? + ORDER BY revision DESC, id DESC + LIMIT 1"#, ) .bind(file_info.file_id) .fetch_optional(&mut *tx) .await - .map_err(|e| { error!("❌ File check by file_id failed(sqlx): {}", e); StorageError::Database(format!("File check by file_id failed: {}", e)) })?; - - if let Some((_existing_file_id, current_revision)) = existing_by_file_id { - // Validate revision if client provided one - if let Some(expected_rev) = expected_revision { - if expected_rev > 0 && current_revision != expected_rev { - warn!( - "Revision conflict on upload: file_id={}, server_revision={}, client_expected={}", - file_info.file_id, current_revision, expected_rev - ); - let _ = tx.rollback().await; - return Ok(StoreFileResult::RevisionConflict { - server_revision: current_revision, - client_revision: expected_rev, - }); - } - } + .map_err(|e| StorageError::Database(format!("File check by file_id failed: {}", e)))?; - // Case where the same file_id already exists - update only the file information + let (current_revision, _current_deleted) = latest_rev_row.unwrap_or((0, false)); - // Update existing file information - if let Some(ref kid) = file_info.key_id { - debug!( - "🔐 Updating key_id for existing file_id: {} -> {}", - file_info.file_id, kid - ); - } else { - debug!( - "🔐 Skipping key_id update (None) for existing file_id: {}", - file_info.file_id + // expected_revision validation + if let Some(expected_rev) = expected_revision { + if expected_rev > 0 && current_revision != expected_rev { + warn!( + "Revision conflict on upload: file_id={}, server_revision={}, client_expected={}", + file_info.file_id, current_revision, expected_rev ); + let _ = tx.rollback().await; + return Ok(StoreFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: expected_rev, + }); } - sqlx::query( - r#"UPDATE files SET - file_hash = ?, device_hash = ?, updated_time = FROM_UNIXTIME(?), size = ?, - revision = revision + 1, - key_id = COALESCE(?, key_id), - unix_permissions = ? - WHERE file_id = ?"#, - ) - .bind(&file_info.file_hash) - .bind(&file_info.device_hash) - .bind(updated_time) - .bind(file_info.size as i64) - .bind(file_info.key_id.as_deref()) - .bind(file_info.unix_permissions) - .bind(file_info.file_id) - .execute(&mut *tx) - .await - .map_err(|e| { - error!("❌ File information update failed(sqlx): {}", e); - StorageError::Database(format!("File information update failed: {}", e)) - })?; - - // Commit transaction - tx.commit().await.map_err(|e| { - error!("❌ Transaction commit failed(sqlx): {}", e); - StorageError::Database(format!("Transaction commit failed: {}", e)) - })?; - - // Return success with incremented revision - let new_revision = current_revision + 1; - return Ok(StoreFileResult::Success { - file_id: file_info.file_id, - new_revision, - }); } - debug!("🔍 Checking active files..."); - // Check if there is a non-deleted file with the same file path and name - let existing_active_file: Option<(u64, i64)> = sqlx::query_as( - r#"SELECT file_id, revision FROM files WHERE account_hash = ? AND file_path = ? AND filename = ? AND server_group_id = ? AND is_deleted = FALSE ORDER BY updated_time DESC, revision DESC LIMIT 1"# + let new_revision = current_revision + 1; + + // mark existing active rows for this file_id as deleted to keep a single active row + sqlx::query( + r#"UPDATE files SET is_deleted = TRUE WHERE file_id = ? AND is_deleted = FALSE"#, ) - .bind(&file_info.account_hash) - .bind(&file_info.file_path) - .bind(&file_info.filename) - .bind(file_info.group_id) - .fetch_optional(&mut *tx) + .bind(file_info.file_id) + .execute(&mut *tx) .await - .map_err(|e| { error!("❌ Active file existence check failed(sqlx): {}", e); StorageError::Database(format!("Active file existence check failed: {}", e)) })?; - - let new_revision = if let Some((existing_file_id, existing_revision)) = existing_active_file - { - // Active file exists → this is an update, increment revision - debug!( - "📝 Active file found (file_id={}, revision={}), incrementing revision", - existing_file_id, existing_revision - ); - existing_revision + 1 - } else { - // No active file → this is a new file (or re-upload after deletion), reset to revision 1 - debug!("✨ No active file found, starting with revision 1"); - 1 - }; + .map_err(|e| { + error!("❌ Existing file deletion marking failed(sqlx): {}", e); + StorageError::Database(format!("Existing file deletion marking failed: {}", e)) + })?; debug!( "📄 Preparing to store new file information: file_id={}, revision={} (key_id: {:?})", @@ -230,24 +176,6 @@ impl MySqlFileExt for MySqlStorage { ); // Path encryption and index computation omitted as it's handled elsewhere - - if let Some((existing_file_id, _)) = existing_active_file { - // If there is an existing active file, mark it as deleted - debug!( - "🗑️ Marking existing active file as deleted: existing_file_id={}", - existing_file_id - ); - - sqlx::query(r#"UPDATE files SET is_deleted = TRUE WHERE file_id = ?"#) - .bind(existing_file_id) - .execute(&mut *tx) - .await - .map_err(|e| { - error!("❌ Existing file deletion marking failed(sqlx): {}", e); - StorageError::Database(format!("Existing file deletion marking failed: {}", e)) - })?; - } - // Insert new file (using calculated revision) info!( "💾 Inserting new file record: file_id={}, revision={}, filename= {}", @@ -307,12 +235,18 @@ impl MySqlFileExt for MySqlStorage { (fpb, fnb, eq, tp) }; - // Determine operation_type based on revision - let operation_type = if new_revision == 1 { - "CREATE" - } else { - "UPDATE" - }; + // Determine operation_type: prefer client-provided value, fallback to CREATE/UPDATE based on revision + let operation_type = file_info + .operation_type + .as_deref() + .unwrap_or_else(|| { + if new_revision == 1 { + "CREATE" + } else { + "UPDATE" + } + }) + .to_string(); sqlx::query( r#"INSERT INTO files ( @@ -339,7 +273,7 @@ impl MySqlFileExt for MySqlStorage { .bind(&token_path) .bind(file_info.key_id.as_deref()) .bind(file_info.unix_permissions) - .bind(operation_type) + .bind(&operation_type) .execute(&mut *tx) .await .map_err(|e| { error!("❌ New file information insertion failed(sqlx): {}", e); StorageError::Database(format!("New file information insertion failed: {}", e)) })?; @@ -370,11 +304,13 @@ impl MySqlFileExt for MySqlStorage { r#"SELECT file_id, account_hash, device_hash, file_path, filename, file_hash, UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, is_deleted, revision, size, key_id, unix_permissions + group_id, watcher_id, is_deleted, revision, size, key_id, unix_permissions, operation_type FROM files WHERE file_id = ? AND is_deleted = FALSE - AND operation_type NOT IN ('DELETE')"#, + AND operation_type NOT IN ('DELETE') + ORDER BY revision DESC, id DESC + LIMIT 1"#, ) .bind(file_id) .fetch_optional(self.get_sqlx_pool()) @@ -421,6 +357,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; debug!( "✅ [GetFileInfo] Query successful: file_id={}, revision={}", @@ -452,9 +389,11 @@ impl MySqlFileExt for MySqlStorage { file_id, account_hash, device_hash, file_path, filename, file_hash, UNIX_TIMESTAMP(created_time) AS created_ts, UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, is_deleted, revision, size, key_id, unix_permissions + group_id, watcher_id, is_deleted, revision, size, key_id, unix_permissions, operation_type FROM files - WHERE file_id = ?"#, + WHERE file_id = ? + ORDER BY revision DESC, id DESC + LIMIT 1"#, ) .bind(file_id) .fetch_optional(self.get_sqlx_pool()) @@ -498,6 +437,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; debug!( "File information query successful (including deleted): file_id={}, is_deleted={}", @@ -545,7 +485,7 @@ impl MySqlFileExt for MySqlStorage { file_id, account_hash, device_hash, file_path, filename, file_hash, UNIX_TIMESTAMP(created_time) AS created_ts, UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, revision, size, key_id, unix_permissions + group_id, watcher_id, revision, size, key_id, unix_permissions, operation_type FROM files WHERE account_hash = ? AND eq_index = ? AND server_group_id = ? AND is_deleted = FALSE ORDER BY revision DESC LIMIT 1"# @@ -591,6 +531,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; debug!( "File information query by path successful: file_id={}", @@ -620,7 +561,7 @@ impl MySqlFileExt for MySqlStorage { file_id, account_hash, device_hash, file_path, filename, file_hash, UNIX_TIMESTAMP(created_time) AS created_ts, UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, revision, size, key_id, unix_permissions + group_id, watcher_id, revision, size, key_id, unix_permissions, operation_type FROM files WHERE account_hash = ? AND file_hash = ? AND is_deleted = FALSE ORDER BY revision DESC LIMIT 1"#, @@ -665,6 +606,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; debug!("File search by hash successful: file_id={}", file_id); Ok(Some(file_info)) @@ -707,12 +649,12 @@ impl MySqlFileExt for MySqlStorage { UNIX_TIMESTAMP(created_time) as created_ts, UNIX_TIMESTAMP(updated_time) as updated_ts, group_id, watcher_id, revision, size, key_id, unix_permissions, - is_deleted + is_deleted, operation_type FROM files WHERE account_hash = ? AND eq_index = ? AND is_deleted = FALSE AND operation_type NOT IN ('DELETE') - ORDER BY updated_time DESC, revision DESC, id DESC + ORDER BY revision DESC, id DESC LIMIT 1"#, ) .bind(account_hash) @@ -782,6 +724,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; debug!( "File search by path/filename successful: file_id={}", @@ -1100,6 +1043,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id, unix_permissions, + operation_type: Some("RESTORE".to_string()), }) } @@ -1118,13 +1062,18 @@ impl MySqlFileExt for MySqlStorage { let rows = sqlx::query( r#"SELECT - file_id, account_hash, device_hash, file_path, filename, file_hash, - UNIX_TIMESTAMP(created_time) AS created_ts, - UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, revision, size, key_id, unix_permissions - FROM files - WHERE account_hash = ? AND server_group_id = ? AND is_deleted = FALSE - ORDER BY updated_time DESC"#, + f.file_id, f.account_hash, f.device_hash, f.file_path, f.filename, f.file_hash, + UNIX_TIMESTAMP(f.created_time) AS created_ts, + UNIX_TIMESTAMP(f.updated_time) AS updated_ts, + f.group_id, f.watcher_id, f.revision, f.size, f.key_id, f.unix_permissions, f.operation_type + FROM files f + WHERE f.account_hash = ? + AND f.server_group_id = ? + AND f.is_deleted = FALSE + AND f.revision = ( + SELECT MAX(f2.revision) FROM files f2 WHERE f2.file_id = f.file_id + ) + ORDER BY f.revision DESC, f.id DESC"#, ) .bind(account_hash) .bind(group_id) @@ -1166,6 +1115,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }); } info!( @@ -1193,13 +1143,19 @@ impl MySqlFileExt for MySqlStorage { let rows = sqlx::query( r#"SELECT - file_id, account_hash, device_hash, file_path, filename, file_hash, - UNIX_TIMESTAMP(created_time) AS created_ts, - UNIX_TIMESTAMP(updated_time) AS updated_ts, - group_id, watcher_id, revision, size, key_id, unix_permissions - FROM files - WHERE account_hash = ? AND server_group_id = ? AND device_hash <> ? AND is_deleted = FALSE - ORDER BY updated_time DESC"# + f.file_id, f.account_hash, f.device_hash, f.file_path, f.filename, f.file_hash, + UNIX_TIMESTAMP(f.created_time) AS created_ts, + UNIX_TIMESTAMP(f.updated_time) AS updated_ts, + f.group_id, f.watcher_id, f.revision, f.size, f.key_id, f.unix_permissions, f.operation_type + FROM files f + WHERE f.account_hash = ? + AND f.server_group_id = ? + AND f.device_hash <> ? + AND f.is_deleted = FALSE + AND f.revision = ( + SELECT MAX(f2.revision) FROM files f2 WHERE f2.file_id = f.file_id + ) + ORDER BY f.revision DESC, f.id DESC"# ) .bind(account_hash) .bind(group_id) @@ -1242,6 +1198,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }); } Ok(files) @@ -1371,7 +1328,7 @@ impl MySqlFileExt for MySqlStorage { use sqlx::Row; info!("🔍 [v2025.11.19-FIXED] find_file_by_criteria called"); info!("🔍 [QUERY] Using: WHERE is_deleted = FALSE"); - info!("🔍 [QUERY] Sorting: ORDER BY updated_time DESC, revision DESC, id DESC"); + info!("🔍 [QUERY] Sorting: ORDER BY revision DESC, id DESC"); debug!("🔍 find_file_by_criteria called:"); debug!(" account_hash: {}", account_hash); debug!(" group_id: {}", group_id); @@ -1416,32 +1373,40 @@ impl MySqlFileExt for MySqlStorage { search_path, search_filename ); - // Perform search for both patterns with sqlx + // Build deterministic eq_index for encrypted storage lookup + let cfg = crate::server::app_state::AppState::get_config(); + let eq_index = if let Some(kv) = cfg.server_encode_key.as_ref() { + if kv.len() == 32 { + let key: &[u8; 32] = kv.as_slice().try_into().expect("len checked"); + let salt = crate::utils::crypto::derive_salt(key, "meta-index", account_hash); + crate::utils::crypto::make_eq_index(&salt, file_path) + } else { + crate::utils::crypto::make_eq_index(account_hash.as_bytes(), file_path) + } + } else { + crate::utils::crypto::make_eq_index(account_hash.as_bytes(), file_path) + }; + + // Perform search by eq_index (works for encrypted VARBINARY paths) let row = sqlx::query( r#"SELECT file_id, account_hash, device_hash, file_path, filename, file_hash, UNIX_TIMESTAMP(created_time) as created_ts, UNIX_TIMESTAMP(updated_time) as updated_ts, group_id, watcher_id, revision, size, key_id, unix_permissions, - is_deleted + is_deleted, operation_type FROM files WHERE account_hash = ? AND server_group_id = ? AND server_watcher_id = ? AND is_deleted = FALSE AND operation_type NOT IN ('DELETE') - AND ( - (file_path = ? AND filename = ?) OR - (file_path = ? AND filename = ?) - ) - ORDER BY updated_time DESC, revision DESC, id DESC + AND eq_index = ? + ORDER BY revision DESC, id DESC LIMIT 1"#, ) .bind(account_hash) .bind(group_id) .bind(watcher_id) - .bind(&search_path) - .bind(&search_filename) - .bind(file_path) - .bind(filename) + .bind(&eq_index) .fetch_optional(self.get_sqlx_pool()) .await .map_err(|e| { @@ -1526,6 +1491,7 @@ impl MySqlFileExt for MySqlStorage { size, key_id: key_id_opt, unix_permissions: row.try_get("unix_permissions").ok(), + operation_type: row.try_get("operation_type").ok(), }; info!( @@ -1547,15 +1513,20 @@ impl MySqlFileExt for MySqlStorage { /// Check file existence and deletion status by file ID async fn check_file_exists(&self, file_id: u64) -> Result<(bool, bool)> { info!("🔍 check_file_exists called: file_id={}", file_id); - let is_deleted_opt: Option = - sqlx::query_scalar(r#"SELECT is_deleted FROM files WHERE file_id = ?"#) - .bind(file_id as i64) - .fetch_optional(self.get_sqlx_pool()) - .await - .map_err(|e| { - error!("❌ File existence query SQL error(sqlx): {}", e); - StorageError::Database(format!("File existence query SQL error: {}", e)) - })?; + let is_deleted_opt: Option = sqlx::query_scalar( + r#"SELECT is_deleted + FROM files + WHERE file_id = ? + ORDER BY revision DESC, id DESC + LIMIT 1"#, + ) + .bind(file_id as i64) + .fetch_optional(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("❌ File existence query SQL error(sqlx): {}", e); + StorageError::Database(format!("File existence query SQL error: {}", e)) + })?; match is_deleted_opt { Some(raw) => Ok((true, raw != 0)), @@ -1566,6 +1537,32 @@ impl MySqlFileExt for MySqlStorage { } } + /// Latest revision state (exists, is_deleted, latest_revision) by file_id + async fn latest_revision_state(&self, file_id: u64) -> Result<(bool, bool, i64)> { + let row: Option<(i64, i8)> = sqlx::query_as( + r#"SELECT revision, is_deleted + FROM files + WHERE file_id = ? + ORDER BY revision DESC, id DESC + LIMIT 1"#, + ) + .bind(file_id as i64) + .fetch_optional(self.get_sqlx_pool()) + .await + .map_err(|e| { + StorageError::Database(format!( + "Failed to query latest revision state for file_id {}: {}", + file_id, e + )) + })?; + + if let Some((rev, del)) = row { + Ok((true, del != 0, rev)) + } else { + Ok((false, false, 0)) + } + } + /// TTL-based physical deletion: remove records where is_deleted=1 and updated_time is before NOW()-ttl seconds async fn purge_deleted_files_older_than(&self, ttl_secs: i64) -> Result { let affected = sqlx::query(r#"DELETE FROM files WHERE is_deleted = 1 AND updated_time < FROM_UNIXTIME(UNIX_TIMESTAMP() - ?)"#) diff --git a/src/storage/mysql_models.rs b/src/storage/mysql_models.rs index f77b1ce..18ad7a8 100644 --- a/src/storage/mysql_models.rs +++ b/src/storage/mysql_models.rs @@ -134,9 +134,9 @@ impl TryFrom for crate::models::device::Device { // Stub implementation - return basic Device object let now = chrono::Utc::now(); Ok(crate::models::device::Device { - user_id: "".to_string(), account_hash: "".to_string(), device_hash: data.device_hash, + device_name: "".to_string(), updated_at: now, registered_at: now, last_sync: now,