diff --git a/migrations/sql/20251119_add_deleted_file_id.sql b/migrations/sql/20251119_add_deleted_file_id.sql index e4a5fe6..86370df 100644 --- a/migrations/sql/20251119_add_deleted_file_id.sql +++ b/migrations/sql/20251119_add_deleted_file_id.sql @@ -6,10 +6,3 @@ ADD COLUMN deleted_file_id BIGINT UNSIGNED NULL COMMENT 'Reference to original f -- Add index for efficient lookup CREATE INDEX idx_deleted_file_id ON files(deleted_file_id); - - - - - - - diff --git a/migrations/sql/20251119_add_deleted_file_id_down.sql b/migrations/sql/20251119_add_deleted_file_id_down.sql index bac5af7..f1ba1c7 100644 --- a/migrations/sql/20251119_add_deleted_file_id_down.sql +++ b/migrations/sql/20251119_add_deleted_file_id_down.sql @@ -10,3 +10,4 @@ ALTER TABLE files DROP COLUMN IF EXISTS deleted_file_id; + diff --git a/migrations/sql/20251119_add_encrypted_data_to_files.sql b/migrations/sql/20251119_add_encrypted_data_to_files.sql index 62754f5..1a37e02 100644 --- a/migrations/sql/20251119_add_encrypted_data_to_files.sql +++ b/migrations/sql/20251119_add_encrypted_data_to_files.sql @@ -4,10 +4,3 @@ ALTER TABLE files ADD COLUMN encrypted_data LONGBLOB NULL COMMENT 'Encrypted file data for recovery' AFTER key_id; - - - - - - - diff --git a/migrations/sql/20251119_add_encrypted_data_to_files_down.sql b/migrations/sql/20251119_add_encrypted_data_to_files_down.sql index 1c88c5a..a3855b0 100644 --- a/migrations/sql/20251119_add_encrypted_data_to_files_down.sql +++ b/migrations/sql/20251119_add_encrypted_data_to_files_down.sql @@ -3,10 +3,3 @@ ALTER TABLE files DROP COLUMN encrypted_data; - - - - - - - diff --git a/migrations/sql/20251119_migrate_operation_types.sql b/migrations/sql/20251119_migrate_operation_types.sql index efebd05..0f444fd 100644 --- a/migrations/sql/20251119_migrate_operation_types.sql +++ b/migrations/sql/20251119_migrate_operation_types.sql @@ -40,3 +40,4 @@ WHERE is_deleted = 0 + diff --git a/migrations/sql/20251119_migrate_operation_types_down.sql b/migrations/sql/20251119_migrate_operation_types_down.sql index 86c642b..e188667 100644 --- a/migrations/sql/20251119_migrate_operation_types_down.sql +++ b/migrations/sql/20251119_migrate_operation_types_down.sql @@ -14,3 +14,4 @@ WHERE operation_type IN ('CREATE', 'UPDATE', 'DELETE', 'RESTORE'); + diff --git a/migrations/sql/20251119_redefine_operation_type.sql b/migrations/sql/20251119_redefine_operation_type.sql index b311344..a0ec919 100644 --- a/migrations/sql/20251119_redefine_operation_type.sql +++ b/migrations/sql/20251119_redefine_operation_type.sql @@ -7,3 +7,4 @@ ALTER TABLE files CHANGE COLUMN operation_type operation_type ENUM('CREATE', 'UPDATE', 'DELETE', 'RENAME', 'RESTORE', 'UPLOAD') DEFAULT 'CREATE' COMMENT 'File operation type: CREATE=new file, UPDATE=new version, DELETE=deleted, RENAME=renamed, RESTORE=restored from deletion, UPLOAD=legacy'; + diff --git a/migrations/sql/20251119_redefine_operation_type_down.sql b/migrations/sql/20251119_redefine_operation_type_down.sql index 0d9bf8c..5abf7cf 100644 --- a/migrations/sql/20251119_redefine_operation_type_down.sql +++ b/migrations/sql/20251119_redefine_operation_type_down.sql @@ -11,3 +11,4 @@ DEFAULT 'UPLOAD'; + diff --git a/migrations/sql/20251125_add_move_operation_type.sql b/migrations/sql/20251125_add_move_operation_type.sql new file mode 100644 index 0000000..ac98834 --- /dev/null +++ b/migrations/sql/20251125_add_move_operation_type.sql @@ -0,0 +1,8 @@ +-- Add MOVE operation type to support cross-watcher file movements +-- Date: 2025-11-25 + +-- Add MOVE to the operation_type ENUM +ALTER TABLE files +CHANGE COLUMN operation_type operation_type ENUM('CREATE', 'UPDATE', 'DELETE', 'RENAME', 'RESTORE', 'UPLOAD', 'MOVE') +DEFAULT 'CREATE' +COMMENT 'File operation type: CREATE=new file, UPDATE=new version, DELETE=deleted, RENAME=renamed within same watcher, RESTORE=restored from deletion, UPLOAD=legacy, MOVE=moved between watchers'; diff --git a/migrations/sql/20251125_add_move_operation_type_down.sql b/migrations/sql/20251125_add_move_operation_type_down.sql new file mode 100644 index 0000000..ebc1002 --- /dev/null +++ b/migrations/sql/20251125_add_move_operation_type_down.sql @@ -0,0 +1,13 @@ +-- Rollback: Remove MOVE operation type +-- Date: 2025-11-25 + +-- Before removing MOVE, migrate any MOVE operations back to RENAME +UPDATE files +SET operation_type = 'RENAME' +WHERE operation_type = 'MOVE'; + +-- Remove MOVE from the operation_type ENUM +ALTER TABLE files +CHANGE COLUMN operation_type operation_type ENUM('CREATE', 'UPDATE', 'DELETE', 'RENAME', 'RESTORE', 'UPLOAD') +DEFAULT 'CREATE' +COMMENT 'File operation type: CREATE=new file, UPDATE=new version, DELETE=deleted, RENAME=renamed, RESTORE=restored from deletion, UPLOAD=legacy'; diff --git a/proto/sync.proto b/proto/sync.proto index 242e136..86bf7ef 100755 --- a/proto/sync.proto +++ b/proto/sync.proto @@ -36,6 +36,8 @@ service SyncService { rpc RestoreFile(RestoreFileRequest) returns (RestoreFileResponse); // Restore deleted file rpc DownloadFile(DownloadFileRequest) returns (DownloadFileResponse); rpc ListFiles(ListFilesRequest) returns (ListFilesResponse); + // Batch operations for improved efficiency + rpc BatchOperations(BatchOperationsRequest) returns (BatchOperationsResponse); // Streaming variants for large payloads (hybrid approach) rpc UploadFileStream(stream UploadFileChunk) returns (UploadFileResponse); rpc DownloadFileStream(DownloadFileRequest) returns (stream DownloadFileChunk); @@ -392,6 +394,9 @@ message UploadFileRequest { uint64 file_size = 13; string key_id = 14; optional uint32 unix_permissions = 15; + int64 expected_revision = 16; // Expected server revision (0 for new files, or current revision for updates) + ConflictInfo.ResolutionStrategy conflict_resolution = 17; // How to resolve conflicts (default: MANUAL) + int64 client_timestamp = 18; // Client file modification time (Unix epoch seconds) } message UploadFileResponse { @@ -399,6 +404,9 @@ message UploadFileResponse { uint64 file_id = 2; int64 new_revision = 3; string return_message = 4; + optional ConflictInfo conflict = 5; // Revision conflict information + ErrorCode error_code = 6; // Standardized error code + int64 current_revision = 7; // Server's actual revision (populated on conflict) } // Client-streaming upload chunk (first chunk carries metadata) @@ -422,6 +430,8 @@ message UploadFileChunk { uint64 seq = 14; // sequential index starting at 0 bool last = 15; // true for the final chunk optional uint32 unix_permissions = 16; + int64 expected_revision = 17; // Expected server revision for streaming uploads + optional int64 client_timestamp = 18; // Client modification time (first chunk only) } message DownloadFileRequest { @@ -497,6 +507,9 @@ message DeleteFileRequest { google.protobuf.Timestamp updated_time = 8; int64 revision = 9; string auth_token = 10; + ConflictInfo.ResolutionStrategy conflict_resolution = 11; // How to resolve conflicts (default: MANUAL) + int64 expected_revision = 12; // Expected current revision for validation + int64 client_timestamp = 13; // Client deletion decision time (Unix epoch seconds) } message DeleteFileResponse { @@ -504,6 +517,9 @@ message DeleteFileResponse { string return_message = 2; uint64 delete_record_id = 3; // file_id of DELETE operation_type record (for recovery) int64 new_revision = 4; // New revision after delete operation + optional ConflictInfo conflict = 5; // Conflict information (e.g., revision mismatch) + ErrorCode error_code = 6; // Standardized error code + int64 current_revision = 7; // Server's actual revision (populated on conflict) } // Restore deleted file request @@ -532,6 +548,9 @@ message RenameFileRequest { int32 group_id = 7; // Watcher group ID int32 watcher_id = 8; // Watcher ID int64 timestamp = 9; // Rename timestamp (Unix seconds) + int64 expected_revision = 10; // Expected server revision (0 to skip check, or current revision for validation) + ConflictInfo.ResolutionStrategy conflict_resolution = 11; // How to resolve conflicts (default: MANUAL) + google.protobuf.Timestamp updated_time = 12; // Client's last known update time for timestamp-based conflict resolution } // NEW: Rename file response @@ -540,6 +559,8 @@ message RenameFileResponse { string return_message = 2; int64 new_revision = 3; // New revision after rename optional ConflictInfo conflict = 4; // If rename conflicts with another device + ErrorCode error_code = 5; // Standardized error code + int64 current_revision = 6; // Server's actual revision (populated on conflict) } // NEW: Conflict information @@ -550,11 +571,62 @@ message ConflictInfo { CONCURRENT_RENAME = 2; // Another device renamed same file CONCURRENT_MODIFY = 3; // File was modified during rename PATH_MISMATCH = 4; // old_file_path doesn't match current path + REVISION_CONFLICT = 5; // Client revision outdated + STALE_OPERATION = 6; // Operation timestamp too old } + + enum ResolutionStrategy { + MANUAL = 0; // Require manual conflict resolution + LAST_WRITE_WINS = 1; // Use timestamp to auto-resolve (newest wins) + SERVER_WINS = 2; // Server version always wins + CLIENT_WINS = 3; // Client version always wins + } + ConflictType type = 1; string conflicting_path = 2; // Path causing conflict string conflicting_device = 3; // Device that caused conflict int64 conflicting_revision = 4; // Revision causing conflict + google.protobuf.Timestamp server_timestamp = 5; // Server's last modification time + google.protobuf.Timestamp client_timestamp = 6; // Client's modification time + ResolutionStrategy suggested_resolution = 7; // Suggested resolution strategy +} + +// Standardized error codes for all operations +enum ErrorCode { + SUCCESS = 0; // Operation successful + UNKNOWN_ERROR = 1; // Unknown or unclassified error + + // Authentication & Authorization (10-19) + AUTH_FAILED = 10; // Authentication failed + AUTH_TOKEN_INVALID = 11; // Invalid or expired auth token + AUTH_ACCOUNT_MISMATCH = 12; // Account hash mismatch + + // Validation Errors (20-29) + VALIDATION_FAILED = 20; // General validation error + INVALID_REQUEST = 21; // Invalid request parameters + FILE_SIZE_MISMATCH = 22; // Declared size doesn't match actual + PATH_INVALID = 23; // Invalid file path + + // File Operation Errors (30-49) + FILE_NOT_FOUND = 30; // File doesn't exist + FILE_ALREADY_DELETED = 31; // File was already deleted + FILE_ALREADY_EXISTS = 32; // File already exists at target path + + // Conflict Errors (50-69) + REVISION_CONFLICT = 50; // Client revision outdated + PATH_CONFLICT = 51; // Target path already exists + PATH_MISMATCH = 52; // Path doesn't match current file path + CONCURRENT_MODIFICATION = 53; // File modified by another device + STALE_OPERATION = 54; // Operation timestamp too old (Last Write Wins) + + // Storage & Quota Errors (70-89) + STORAGE_FAILED = 70; // Storage operation failed + QUOTA_EXCEEDED = 71; // Storage quota exceeded + FILE_SIZE_LIMIT = 72; // File size exceeds tier limit + + // Database Errors (90-99) + DB_ERROR = 90; // Database operation failed + DB_SCHEMA_ERROR = 91; // Database schema mismatch } // Individual Watcher management messages @@ -821,6 +893,8 @@ message GetFileHistoryRequest { optional int32 change_count_threshold = 9; // Change count threshold int32 group_id = 10; // Group ID optional int32 watcher_id = 11; // Watcher ID (optional) + optional string operation_type = 12; // Filter by operation type (UPLOAD, DELETE, RENAME, MOVE) + optional int32 offset = 13; // Number of records to skip for pagination (default: 0) } message GetFileHistoryResponse { @@ -999,3 +1073,72 @@ message QuotaCheckResultProto { bool in_grace_period = 7; } +// ============================================================================ +// Batch Operations +// ============================================================================ + +// Batch operation types +enum BatchOperationType { + BATCH_UPLOAD = 0; + BATCH_DELETE = 1; + BATCH_RENAME = 2; +} + +// Single operation in a batch +message BatchOperation { + BatchOperationType type = 1; + + // Only one of these should be set based on type + UploadFileRequest upload = 2; + DeleteFileRequest delete = 3; + RenameFileRequest rename = 4; + + // Client-provided operation ID for matching request/response + string operation_id = 5; +} + +// Batch operations request +message BatchOperationsRequest { + string auth_token = 1; + string account_hash = 2; + string device_hash = 3; + + // List of operations to perform + repeated BatchOperation operations = 4; + + // Transaction mode: if true, all operations succeed or all fail + bool atomic = 5; + + // Stop on first error: if true, stop processing remaining operations after first failure + bool stop_on_error = 6; +} + +// Single operation result in a batch +message BatchOperationResult { + string operation_id = 1; // Matches operation_id from request + bool success = 2; + string return_message = 3; + int32 error_code = 4; + + // Operation-specific response data + oneof result { + UploadFileResponse upload_result = 5; + DeleteFileResponse delete_result = 6; + RenameFileResponse rename_result = 7; + } +} + +// Batch operations response +message BatchOperationsResponse { + bool success = 1; // True if all operations succeeded + string return_message = 2; + + // Individual operation results + repeated BatchOperationResult results = 3; + + // Statistics + int32 total_operations = 4; + int32 successful_operations = 5; + int32 failed_operations = 6; +} + diff --git a/src/handlers/file/batch.rs b/src/handlers/file/batch.rs new file mode 100644 index 0000000..120d0b5 --- /dev/null +++ b/src/handlers/file/batch.rs @@ -0,0 +1,268 @@ +use tonic::{Response, Status}; +use tracing::{error, info, warn}; + +use super::super::file_handler::FileHandler; +use crate::sync::{ + BatchOperationResult, BatchOperationType, BatchOperationsRequest, BatchOperationsResponse, + DeleteFileRequest, ErrorCode, RenameFileRequest, UploadFileRequest, +}; + +pub async fn handle_batch_operations( + handler: &FileHandler, + req: BatchOperationsRequest, +) -> Result, Status> { + info!("Batch operations request received:"); + info!(" account_hash: {}", req.account_hash); + info!(" device_hash: {}", req.device_hash); + info!(" operations count: {}", req.operations.len()); + info!(" atomic: {}", req.atomic); + + // Verify authentication once for all operations + let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { + Ok(v) if v.valid => v, + _ => { + return Ok(Response::new(BatchOperationsResponse { + success: false, + return_message: "Authentication failed".to_string(), + results: vec![], + total_operations: req.operations.len() as i32, + successful_operations: 0, + failed_operations: req.operations.len() as i32, + })); + } + }; + let server_account_hash = verified.account_hash; + + // Validate account hash + if server_account_hash != req.account_hash { + return Ok(Response::new(BatchOperationsResponse { + success: false, + return_message: "Account hash mismatch".to_string(), + results: vec![], + total_operations: req.operations.len() as i32, + successful_operations: 0, + failed_operations: req.operations.len() as i32, + })); + } + + // Process operations + let mut results = Vec::new(); + let mut successful_count = 0; + let mut failed_count = 0; + let mut first_error: Option = None; + let total_operations = req.operations.len() as i32; + + for operation in req.operations { + let operation_id = operation.operation_id.clone(); + + // Extract operation type + let op_type = BatchOperationType::try_from(operation.r#type) + .unwrap_or(BatchOperationType::BatchUpload); + + let result = match op_type { + BatchOperationType::BatchUpload => { + if let Some(upload_req) = operation.upload { + process_upload_operation(handler, upload_req, &operation_id).await + } else { + create_error_result( + &operation_id, + "Upload operation missing request data", + ErrorCode::ValidationFailed, + ) + } + } + BatchOperationType::BatchDelete => { + if let Some(delete_req) = operation.delete { + process_delete_operation(handler, delete_req, &operation_id).await + } else { + create_error_result( + &operation_id, + "Delete operation missing request data", + ErrorCode::ValidationFailed, + ) + } + } + BatchOperationType::BatchRename => { + if let Some(rename_req) = operation.rename { + process_rename_operation(handler, rename_req, &operation_id).await + } else { + create_error_result( + &operation_id, + "Rename operation missing request data", + ErrorCode::ValidationFailed, + ) + } + } + }; + + if result.success { + successful_count += 1; + } else { + failed_count += 1; + if first_error.is_none() { + first_error = Some(result.return_message.clone()); + } + + // If atomic mode and operation failed, rollback not implemented yet + // For now, we just fail the batch + if req.atomic { + warn!("Atomic batch failed at operation {}", operation_id); + // Add remaining operations as failed + results.push(result); + break; + } + + // If stop_on_error mode and operation failed, stop processing + if req.stop_on_error { + warn!( + "Stopping batch processing due to error at operation {}", + operation_id + ); + results.push(result); + break; + } + } + + results.push(result); + } + + let all_success = failed_count == 0; + + let response_message = if all_success { + format!("All {} operations completed successfully", successful_count) + } else if req.atomic { + format!( + "Atomic batch failed: {}", + first_error.unwrap_or_else(|| "Unknown error".to_string()) + ) + } else { + format!( + "Batch completed: {} succeeded, {} failed", + successful_count, failed_count + ) + }; + + info!( + "Batch operations completed: total={}, success={}, failed={}", + total_operations, successful_count, failed_count + ); + + Ok(Response::new(BatchOperationsResponse { + success: all_success, + return_message: response_message, + results, + total_operations, + successful_operations: successful_count, + failed_operations: failed_count, + })) +} + +async fn process_upload_operation( + handler: &FileHandler, + req: UploadFileRequest, + operation_id: &str, +) -> BatchOperationResult { + match handler + .handle_upload_file_internal(tonic::Request::new(req)) + .await + { + Ok(response) => { + let upload_resp = response.into_inner(); + BatchOperationResult { + operation_id: operation_id.to_string(), + success: upload_resp.success, + return_message: upload_resp.return_message.clone(), + error_code: upload_resp.error_code, + result: Some(crate::sync::batch_operation_result::Result::UploadResult( + upload_resp, + )), + } + } + Err(e) => { + error!("Upload operation {} failed: {}", operation_id, e); + create_error_result( + operation_id, + &format!("Upload failed: {}", e), + ErrorCode::StorageFailed, + ) + } + } +} + +async fn process_delete_operation( + handler: &FileHandler, + req: DeleteFileRequest, + operation_id: &str, +) -> BatchOperationResult { + match handler + .handle_delete_file_internal(tonic::Request::new(req)) + .await + { + Ok(response) => { + let delete_resp = response.into_inner(); + BatchOperationResult { + operation_id: operation_id.to_string(), + success: delete_resp.success, + return_message: delete_resp.return_message.clone(), + error_code: delete_resp.error_code, + result: Some(crate::sync::batch_operation_result::Result::DeleteResult( + delete_resp, + )), + } + } + Err(e) => { + error!("Delete operation {} failed: {}", operation_id, e); + create_error_result( + operation_id, + &format!("Delete failed: {}", e), + ErrorCode::StorageFailed, + ) + } + } +} + +async fn process_rename_operation( + handler: &FileHandler, + req: RenameFileRequest, + operation_id: &str, +) -> BatchOperationResult { + match handler + .handle_rename_file_internal(tonic::Request::new(req)) + .await + { + Ok(response) => { + let rename_resp = response.into_inner(); + BatchOperationResult { + operation_id: operation_id.to_string(), + success: rename_resp.success, + return_message: rename_resp.return_message.clone(), + error_code: rename_resp.error_code, + result: Some(crate::sync::batch_operation_result::Result::RenameResult( + rename_resp, + )), + } + } + Err(e) => { + error!("Rename operation {} failed: {}", operation_id, e); + create_error_result( + operation_id, + &format!("Rename failed: {}", e), + ErrorCode::StorageFailed, + ) + } + } +} + +fn create_error_result( + operation_id: &str, + message: &str, + error_code: ErrorCode, +) -> BatchOperationResult { + BatchOperationResult { + operation_id: operation_id.to_string(), + success: false, + return_message: message.to_string(), + error_code: error_code as i32, + result: None, + } +} diff --git a/src/handlers/file/delete.rs b/src/handlers/file/delete.rs index 6f1ac8d..560d09b 100644 --- a/src/handlers/file/delete.rs +++ b/src/handlers/file/delete.rs @@ -3,7 +3,15 @@ use tracing::{debug, error, info, warn}; use super::super::file_handler::FileHandler; use crate::services::usage_service::{OperationResult, UsageOperation}; -use crate::sync::{DeleteFileRequest, DeleteFileResponse}; +use crate::storage::DeleteFileResult; +use crate::sync::{ + conflict_info::{ConflictType, ResolutionStrategy}, + DeleteFileRequest, DeleteFileResponse, ErrorCode, +}; +use crate::utils::conflict_resolution::{ + create_conflict_info_with_timestamps, resolve_revision_conflict, suggest_resolution_strategy, + ResolutionDecision, +}; use crate::utils::events::{ publish_file_deleted_event, publish_version_deleted_event, FileDeleteEventData, }; @@ -23,8 +31,9 @@ pub async fn handle_delete_file( let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { Ok(v) if v.valid => v, _ => { - return Ok(Response::new(response::file_delete_error( + return Ok(Response::new(response::file_delete_error_with_code( "Authentication failed", + ErrorCode::AuthFailed, ))); } }; @@ -51,11 +60,24 @@ pub async fn handle_delete_file( } }; + // Prefer expected_revision over revision field for better client compatibility + let client_revision = if req.expected_revision > 0 { + Some(req.expected_revision) + } else if req.revision > 0 { + Some(req.revision) + } else { + None + }; + debug!( - "Executing file deletion: file_id={}, size={}", - file_id, file_size + "Executing file deletion: file_id={}, size={}, client_revision={:?}", + file_id, file_size, client_revision ); - let delete_result = handler.app_state.file.delete_file(file_id).await; + let delete_result = handler + .app_state + .file + .delete_file(file_id, client_revision) + .await; // Record storage decrease after deletion if delete_result.is_ok() && file_size > 0 { @@ -77,7 +99,10 @@ pub async fn handle_delete_file( } match delete_result { - Ok((deletion_record_id, new_revision)) => { + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) => { info!( "File deleted successfully: filename={}, file_id={}, deletion_record_id={}, new_revision={}", req.filename, file_id, deletion_record_id, new_revision @@ -102,6 +127,158 @@ pub async fn handle_delete_file( new_revision, ))) } + Ok(DeleteFileResult::RevisionConflict { + server_revision, + client_revision, + }) => { + warn!( + "Revision conflict during delete: file_id={}, server_revision={}, client_revision={}", + file_id, server_revision, client_revision + ); + + // Get server file info for timestamp comparison + let server_file_info = handler + .app_state + .file + .get_file_info(file_id) + .await + .ok() + .flatten(); + let server_timestamp = server_file_info + .as_ref() + .map(|info| info.updated_time.clone()); + + // Parse conflict_resolution strategy from request + let strategy = if req.conflict_resolution != 0 { + ResolutionStrategy::try_from(req.conflict_resolution) + .unwrap_or(ResolutionStrategy::Manual) + } else { + ResolutionStrategy::Manual + }; + + // Check for STALE_OPERATION if LAST_WRITE_WINS strategy and client_timestamp provided + if strategy == ResolutionStrategy::LastWriteWins && req.client_timestamp > 0 { + let server_ts = server_timestamp.as_ref().map(|ts| ts.seconds).unwrap_or(0); + + if req.client_timestamp < server_ts { + // Client operation is stale + warn!( + "STALE_OPERATION detected (delete): file_id={}, client_timestamp={} < server_timestamp={}", + file_id, req.client_timestamp, server_ts + ); + + let suggested_resolution = suggest_resolution_strategy( + req.updated_time.is_some(), + server_timestamp.is_some(), + ); + + return Ok(Response::new(DeleteFileResponse { + success: false, + return_message: format!( + "Delete operation is stale: client timestamp {} < server timestamp {}. Please sync before deleting.", + req.client_timestamp, server_ts + ), + delete_record_id: 0, + new_revision: 0, + conflict: Some(create_conflict_info_with_timestamps( + ConflictType::StaleOperation, + req.file_path.clone(), + String::new(), + server_revision, + server_timestamp.clone(), + req.updated_time.clone(), + suggested_resolution, + )), + error_code: ErrorCode::StaleOperation as i32, + current_revision: server_revision, + })); + } + } + + // Determine resolution decision + let decision = resolve_revision_conflict( + req.updated_time.as_ref(), + server_timestamp.as_ref(), + strategy, + ); + + // Auto-retry if decision is AcceptClient + if decision == ResolutionDecision::AcceptClient { + info!( + "Auto-resolving delete conflict: accepting client version (file_id={})", + file_id + ); + + // Retry deletion without revision check (force delete) + match handler.app_state.file.delete_file(file_id, None).await { + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) => { + info!( + "File deleted after auto-resolution: file_id={}, deletion_record_id={}, new_revision={}", + file_id, deletion_record_id, new_revision + ); + + // Publish cross-instance file deleted and version deleted events + let event_data = FileDeleteEventData { + account_hash: req.account_hash.clone(), + device_hash: req.device_hash.clone(), + file_path: req.file_path.clone(), + filename: req.filename.clone(), + file_id, + revision: new_revision, + }; + + publish_file_deleted_event(&handler.app_state.event_bus, &event_data).await; + publish_version_deleted_event(&handler.app_state.event_bus, &event_data) + .await; + + return Ok(Response::new(response::file_delete_success( + "File deleted successfully", + deletion_record_id, + new_revision, + ))); + } + Err(e) => { + error!("Failed to force delete after conflict resolution: {}", e); + } + _ => {} + } + } + + // Return conflict info with timestamps and suggested resolution + let suggested_resolution = + suggest_resolution_strategy(req.updated_time.is_some(), server_timestamp.is_some()); + + Ok(Response::new(DeleteFileResponse { + success: false, + return_message: format!( + "File has been modified (server revision: {}, your revision: {}). Please sync before deleting.", + server_revision, client_revision + ), + delete_record_id: 0, + new_revision: server_revision, + conflict: Some(create_conflict_info_with_timestamps( + ConflictType::RevisionConflict, + req.file_path.clone(), + String::new(), + server_revision, + server_timestamp, + req.updated_time.clone(), + suggested_resolution, + )), + error_code: ErrorCode::RevisionConflict as i32, + current_revision: server_revision, + })) + } + Ok(DeleteFileResult::AlreadyDeleted) => { + info!("File already deleted: file_id={}", file_id); + Ok(Response::new(response::file_delete_error_with_code( + "File already deleted", + ErrorCode::FileNotFound, + ))) + } Err(e) => { // Determine error category for logging and client classification let error_msg = format!("{}", e); @@ -137,7 +314,18 @@ pub async fn handle_delete_file( format!("File deletion failed: {}", error_msg) }; - Ok(Response::new(response::file_delete_error(client_error_msg))) + let error_code = if is_not_found { + ErrorCode::FileNotFound + } else if is_permanent_error { + ErrorCode::DbSchemaError + } else { + ErrorCode::StorageFailed + }; + + Ok(Response::new(response::file_delete_error_with_code( + client_error_msg, + error_code, + ))) } } } diff --git a/src/handlers/file/mod.rs b/src/handlers/file/mod.rs index 74d0e73..bd1aa76 100644 --- a/src/handlers/file/mod.rs +++ b/src/handlers/file/mod.rs @@ -1,3 +1,4 @@ +pub mod batch; pub mod delete; pub mod download; pub mod exists; diff --git a/src/handlers/file/rename.rs b/src/handlers/file/rename.rs index 9b0f7e9..fac9879 100644 --- a/src/handlers/file/rename.rs +++ b/src/handlers/file/rename.rs @@ -3,7 +3,12 @@ use tracing::{error, info, warn}; use super::super::file_handler::FileHandler; use crate::sync::{ - conflict_info::ConflictType, ConflictInfo, RenameFileRequest, RenameFileResponse, + conflict_info::{ConflictType, ResolutionStrategy}, + ErrorCode, RenameFileRequest, RenameFileResponse, +}; +use crate::utils::conflict_resolution::{ + create_conflict_info_with_timestamps, resolve_revision_conflict, suggest_resolution_strategy, + ResolutionDecision, }; pub async fn handle_rename_file( @@ -27,6 +32,8 @@ pub async fn handle_rename_file( return_message: "Authentication failed".to_string(), new_revision: 0, conflict: None, + error_code: ErrorCode::AuthFailed as i32, + current_revision: 0, })); } }; @@ -39,6 +46,8 @@ pub async fn handle_rename_file( return_message: "Account hash mismatch".to_string(), new_revision: 0, conflict: None, + error_code: ErrorCode::AuthAccountMismatch as i32, + current_revision: 0, })); } @@ -50,12 +59,14 @@ pub async fn handle_rename_file( success: false, return_message: "File not found: already deleted".to_string(), new_revision: 0, - conflict: Some(ConflictInfo { - r#type: ConflictType::FileNotFound as i32, - conflicting_path: req.old_file_path.clone(), - conflicting_device: String::new(), - conflicting_revision: 0, - }), + conflict: Some(crate::utils::conflict_resolution::create_conflict_info( + ConflictType::FileNotFound, + req.old_file_path.clone(), + String::new(), + 0, + )), + error_code: ErrorCode::FileNotFound as i32, + current_revision: 0, })); } Err(e) => { @@ -65,6 +76,8 @@ pub async fn handle_rename_file( return_message: format!("Failed to get file info: {}", e), new_revision: 0, conflict: None, + error_code: ErrorCode::DbError as i32, + current_revision: 0, })); } }; @@ -79,12 +92,14 @@ pub async fn handle_rename_file( success: false, return_message: "File path mismatch - file may have been renamed already".to_string(), new_revision: file_info.revision, - conflict: Some(ConflictInfo { - r#type: ConflictType::PathMismatch as i32, - conflicting_path: file_info.file_path.clone(), - conflicting_device: file_info.device_hash.clone(), - conflicting_revision: file_info.revision, - }), + conflict: Some(crate::utils::conflict_resolution::create_conflict_info( + ConflictType::PathMismatch, + file_info.file_path.clone(), + file_info.device_hash.clone(), + file_info.revision, + )), + error_code: ErrorCode::PathMismatch as i32, + current_revision: file_info.revision, })); } @@ -109,18 +124,48 @@ pub async fn handle_rename_file( success: false, return_message: "Target path already exists".to_string(), new_revision: file_info.revision, - conflict: Some(ConflictInfo { - r#type: ConflictType::PathExists as i32, - conflicting_path: req.new_file_path.clone(), - conflicting_device: existing.device_hash.clone(), - conflicting_revision: existing.revision, - }), + conflict: Some(crate::utils::conflict_resolution::create_conflict_info( + ConflictType::PathExists, + req.new_file_path.clone(), + existing.device_hash.clone(), + existing.revision, + )), + error_code: ErrorCode::PathConflict as i32, + current_revision: file_info.revision, })); } } - // 4. Perform rename operation + // 4. Determine if this is a cross-watcher MOVE + let is_cross_watcher_move = + req.group_id != file_info.group_id || req.watcher_id != file_info.watcher_id; + + if is_cross_watcher_move { + info!( + "Cross-watcher MOVE request: file_id={}, from group={}/watcher={} to group={}/watcher={}", + req.file_id, file_info.group_id, file_info.watcher_id, req.group_id, req.watcher_id + ); + } + + // 5. Perform rename/move operation with revision validation let new_revision = file_info.revision + 1; + let expected_revision = if req.expected_revision > 0 { + Some(req.expected_revision) + } else { + None + }; + + // Pass new group_id/watcher_id for cross-watcher MOVE, None for same-watcher rename + let new_group_id = if is_cross_watcher_move { + Some(req.group_id) + } else { + None + }; + let new_watcher_id = if is_cross_watcher_move { + Some(req.watcher_id) + } else { + None + }; match handler .app_state @@ -130,14 +175,28 @@ pub async fn handle_rename_file( &req.new_file_path, &req.device_hash, new_revision, + expected_revision, + new_group_id, + new_watcher_id, ) .await { - Ok(()) => { - info!( - "File renamed successfully: {} -> {} (new revision: {})", - req.old_file_path, req.new_file_path, new_revision - ); + Ok(crate::storage::RenameFileResult::Success { new_revision }) => { + if is_cross_watcher_move { + info!( + "File moved successfully: {} -> {} (group={}/watcher={}, new revision: {})", + req.old_file_path, + req.new_file_path, + req.group_id, + req.watcher_id, + new_revision + ); + } else { + info!( + "File renamed successfully: {} -> {} (new revision: {})", + req.old_file_path, req.new_file_path, new_revision + ); + } // 5. Record rename history if let Err(e) = handler @@ -178,6 +237,165 @@ pub async fn handle_rename_file( return_message: "File renamed successfully".to_string(), new_revision, conflict: None, + error_code: ErrorCode::Success as i32, + current_revision: 0, + })) + } + Ok(crate::storage::RenameFileResult::RevisionConflict { + server_revision, + client_revision, + }) => { + warn!( + "Revision conflict during rename: file_id={}, server_revision={}, client_revision={}", + req.file_id, server_revision, client_revision + ); + + // Get server file info for timestamp comparison + let server_timestamp = file_info.updated_time.clone(); + + // Parse conflict_resolution strategy from request + let strategy = if req.conflict_resolution != 0 { + ResolutionStrategy::try_from(req.conflict_resolution) + .unwrap_or(ResolutionStrategy::Manual) + } else { + ResolutionStrategy::Manual + }; + + // Determine resolution decision + let decision = resolve_revision_conflict( + req.updated_time.as_ref(), + Some(&server_timestamp), + strategy, + ); + + // Auto-retry if decision is AcceptClient + if decision == ResolutionDecision::AcceptClient { + info!( + "Auto-resolving rename conflict: accepting client version (file_id={})", + req.file_id + ); + + // Retry rename without revision check (force rename) + match handler + .app_state + .file + .rename_file( + req.file_id, + &req.new_file_path, + &req.device_hash, + new_revision, + None, // Skip revision check + new_group_id, + new_watcher_id, + ) + .await + { + Ok(crate::storage::RenameFileResult::Success { new_revision }) => { + if is_cross_watcher_move { + info!( + "File moved after auto-resolution: {} -> {} (group={}/watcher={}, new_revision={})", + req.old_file_path, + req.new_file_path, + req.group_id, + req.watcher_id, + new_revision + ); + } else { + info!( + "File renamed after auto-resolution: {} -> {}, new_revision={}", + req.old_file_path, req.new_file_path, new_revision + ); + } + + // Record rename history + if let Err(e) = handler + .record_rename_history( + req.file_id, + &req.old_file_path, + &req.new_file_path, + &req.account_hash, + &req.device_hash, + req.timestamp, + new_revision, + ) + .await + { + warn!("Failed to record rename history: {}", e); + } + + // Broadcast rename notification to other devices + if let Err(e) = handler + .broadcast_rename_notification( + &req.account_hash, + &req.device_hash, + req.file_id, + &req.old_file_path, + &req.new_file_path, + new_revision, + req.timestamp, + ) + .await + { + warn!("Failed to broadcast rename notification: {}", e); + } + + return Ok(Response::new(RenameFileResponse { + success: true, + return_message: "File renamed successfully".to_string(), + new_revision, + conflict: None, + error_code: ErrorCode::Success as i32, + current_revision: 0, + })); + } + Err(e) => { + error!("Failed to force rename after conflict resolution: {}", e); + } + _ => {} + } + } + + // Return conflict info with timestamps and suggested resolution + let suggested_resolution = suggest_resolution_strategy( + req.updated_time.is_some(), + Some(&server_timestamp).is_some(), + ); + + Ok(Response::new(RenameFileResponse { + success: false, + return_message: format!( + "File has been modified (server revision: {}, your revision: {}). Please sync before renaming.", + server_revision, client_revision + ), + new_revision: server_revision, + conflict: Some(create_conflict_info_with_timestamps( + ConflictType::RevisionConflict, + req.old_file_path.clone(), + String::new(), + server_revision, + Some(server_timestamp), + req.updated_time.clone(), + suggested_resolution, + )), + error_code: ErrorCode::RevisionConflict as i32, + current_revision: server_revision, + })) + } + Ok(crate::storage::RenameFileResult::FileNotFound) => { + warn!("File not found during rename: file_id={}", req.file_id); + + Ok(Response::new(RenameFileResponse { + success: false, + return_message: "File not found: already deleted".to_string(), + new_revision: 0, + conflict: Some(crate::utils::conflict_resolution::create_conflict_info( + ConflictType::FileNotFound, + req.old_file_path.clone(), + String::new(), + 0, + )), + error_code: ErrorCode::FileNotFound as i32, + current_revision: 0, })) } Err(e) => { @@ -215,11 +433,21 @@ pub async fn handle_rename_file( format!("Failed to rename file: {}", error_msg) }; + let error_code = if is_not_found { + ErrorCode::FileNotFound + } else if is_permanent_error { + ErrorCode::DbSchemaError + } else { + ErrorCode::StorageFailed + }; + Ok(Response::new(RenameFileResponse { success: false, return_message: client_error_msg, new_revision: file_info.revision, conflict: None, + error_code: error_code as i32, + current_revision: file_info.revision, })) } } diff --git a/src/handlers/file/upload.rs b/src/handlers/file/upload.rs index 9095cd1..1fc9442 100644 --- a/src/handlers/file/upload.rs +++ b/src/handlers/file/upload.rs @@ -1,9 +1,9 @@ use tonic::{Response, Status}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use super::super::file_handler::FileHandler; use crate::services::usage_service::{OperationResult, UsageOperation}; -use crate::sync::{UploadFileRequest, UploadFileResponse}; +use crate::sync::{ErrorCode, UploadFileRequest, UploadFileResponse}; use crate::utils::events::{ publish_file_uploaded_event, publish_version_created_event, FileEventData, }; @@ -23,8 +23,9 @@ pub async fn handle_upload_file( req.file_size, req.file_data.len() ); - return Ok(Response::new(response::file_upload_error( + return Ok(Response::new(response::file_upload_error_with_code( "File data size mismatch", + ErrorCode::ValidationFailed, ))); } @@ -32,8 +33,9 @@ pub async fn handle_upload_file( let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { Ok(v) if v.valid => v, _ => { - return Ok(Response::new(response::file_upload_error( + return Ok(Response::new(response::file_upload_error_with_code( "Authentication failed", + ErrorCode::AuthFailed, ))); } }; @@ -41,13 +43,21 @@ pub async fn handle_upload_file( // 2. Validate input if let Err(msg) = handler.validate_upload_input(&req) { - return Ok(Response::new(response::file_upload_error(msg))); + return Ok(Response::new(response::file_upload_error_with_code( + msg, + ErrorCode::ValidationFailed, + ))); } // 3. Normalize file path let normalized_file_path = match handler.normalize_file_path(&req.file_path) { Ok(path) => path, - Err(msg) => return Ok(Response::new(response::file_upload_error(msg))), + Err(msg) => { + return Ok(Response::new(response::file_upload_error_with_code( + msg, + ErrorCode::ValidationFailed, + ))) + } }; // 4. Validate file path with watcher (single attempt) @@ -74,12 +84,15 @@ pub async fn handle_upload_file( .as_ref() .unwrap_or(&"File size exceeds tier limit".to_string()) ); - return Ok(Response::new(response::file_upload_error(format!( - "File size exceeds limit: {}", - check_result - .reason - .unwrap_or_else(|| "File too large for your plan".to_string()) - )))); + return Ok(Response::new(response::file_upload_error_with_code( + format!( + "File size exceeds limit: {}", + check_result + .reason + .unwrap_or_else(|| "File too large for your plan".to_string()) + ), + ErrorCode::FileSizeLimit, + ))); } } Err(e) => { @@ -91,7 +104,12 @@ pub async fn handle_upload_file( // 6. Generate file ID let file_id = match handler.generate_file_id(&req) { Ok(id) => id, - Err(msg) => return Ok(Response::new(response::file_upload_error(msg))), + Err(msg) => { + return Ok(Response::new(response::file_upload_error_with_code( + msg, + ErrorCode::ValidationFailed, + ))) + } }; // 6.1. Check usage quota before upload @@ -120,12 +138,15 @@ pub async fn handle_upload_file( .as_ref() .unwrap_or(&"Unknown reason".to_string()) ); - return Ok(Response::new(response::file_upload_error(format!( - "Storage quota exceeded: {}", - check_result - .reason - .unwrap_or_else(|| "Storage limit reached".to_string()) - )))); + return Ok(Response::new(response::file_upload_error_with_code( + format!( + "Storage quota exceeded: {}", + check_result + .reason + .unwrap_or_else(|| "Storage limit reached".to_string()) + ), + ErrorCode::QuotaExceeded, + ))); } // Log warnings if any @@ -166,10 +187,10 @@ pub async fn handle_upload_file( Ok(ids) => ids, Err(e) => { error!("Failed to ensure server IDs: {}", e); - return Ok(Response::new(response::file_upload_error(format!( - "Failed to ensure server IDs: {}", - e - )))); + return Ok(Response::new(response::file_upload_error_with_code( + format!("Failed to ensure server IDs: {}", e), + ErrorCode::DbError, + ))); } }; @@ -184,16 +205,23 @@ pub async fn handle_upload_file( server_watcher_id, ); - // 8. Store file via FileService + // 8. Store file via FileService with revision validation + let expected_revision = if req.expected_revision > 0 { + Some(req.expected_revision) + } else { + None + }; + let store_result = handler .app_state .file - .store_file(&file_info, &req.file_data) + .store_file(&file_info, &req.file_data, expected_revision) .await; // 8.1. Record usage after operation let operation_result = match &store_result { - Ok(_) => OperationResult::Success, + Ok(crate::storage::StoreFileResult::Success { .. }) => OperationResult::Success, + Ok(crate::storage::StoreFileResult::RevisionConflict { .. }) => OperationResult::Failed, Err(_) => OperationResult::Failed, }; @@ -217,7 +245,10 @@ pub async fn handle_upload_file( } match store_result { - Ok(_) => { + Ok(crate::storage::StoreFileResult::Success { + file_id: _, + new_revision, + }) => { // Publish cross-instance file upload and version created events let event_data = FileEventData { account_hash: server_account_hash.clone(), @@ -228,7 +259,7 @@ pub async fn handle_upload_file( filename: req.filename.clone(), file_id, file_size: req.file_size, - revision: req.revision + 1, + revision: new_revision, }; publish_file_uploaded_event(&handler.app_state.event_bus, &event_data).await; @@ -236,15 +267,170 @@ pub async fn handle_upload_file( Ok(Response::new(response::file_upload_success( file_id, - req.revision + 1, + new_revision, ))) } + Ok(crate::storage::StoreFileResult::RevisionConflict { + server_revision, + client_revision, + }) => { + warn!( + "Revision conflict during upload: file_id={}, server_revision={}, client_revision={}", + file_id, server_revision, client_revision + ); + + // Get server's file info for timestamp comparison + let server_file_info = handler + .app_state + .file + .get_file_info(file_id) + .await + .ok() + .flatten(); + let server_timestamp = server_file_info + .as_ref() + .map(|info| info.updated_time.clone()); + + // Determine conflict resolution strategy + use crate::sync::conflict_info::ResolutionStrategy; + use crate::utils::conflict_resolution::{ + resolve_revision_conflict, suggest_resolution_strategy, ResolutionDecision, + }; + + let strategy = if req.conflict_resolution != 0 { + ResolutionStrategy::try_from(req.conflict_resolution) + .unwrap_or(ResolutionStrategy::Manual) + } else { + ResolutionStrategy::Manual + }; + + // Check for STALE_OPERATION if LAST_WRITE_WINS strategy and client_timestamp provided + if strategy == ResolutionStrategy::LastWriteWins && req.client_timestamp > 0 { + let server_ts = server_timestamp.as_ref().map(|ts| ts.seconds).unwrap_or(0); + + if req.client_timestamp < server_ts { + // Client operation is stale + warn!( + "STALE_OPERATION detected: file_id={}, client_timestamp={} < server_timestamp={}", + file_id, req.client_timestamp, server_ts + ); + + let suggested_resolution = suggest_resolution_strategy( + req.updated_time.is_some(), + server_timestamp.is_some(), + ); + + use crate::sync::conflict_info::ConflictType; + use crate::utils::conflict_resolution::create_conflict_info_with_timestamps; + + return Ok(Response::new(crate::sync::UploadFileResponse { + success: false, + file_id: 0, + new_revision: 0, + return_message: format!( + "Operation is stale: client timestamp {} < server timestamp {}. Please sync before uploading.", + req.client_timestamp, server_ts + ), + conflict: Some(create_conflict_info_with_timestamps( + ConflictType::StaleOperation, + req.file_path.clone(), + String::new(), + server_revision, + server_timestamp.clone(), + req.updated_time.clone(), + suggested_resolution, + )), + error_code: ErrorCode::StaleOperation as i32, + current_revision: server_revision, + })); + } + } + + let decision = resolve_revision_conflict( + req.updated_time.as_ref(), + server_timestamp.as_ref(), + strategy, + ); + + // If auto-resolution decided to accept client, retry the upload with force flag + if decision == ResolutionDecision::AcceptClient { + info!( + "Auto-resolving conflict: accepting client version (file_id={})", + file_id + ); + + // Retry upload without revision check (force overwrite) + match handler + .app_state + .file + .store_file(&file_info, &req.file_data, None) // None = skip revision check + .await + { + Ok(crate::storage::StoreFileResult::Success { + file_id: _, + new_revision, + }) => { + // Publish events + let event_data = FileEventData { + account_hash: server_account_hash.clone(), + device_hash: req.device_hash.clone(), + group_id: server_group_id, + watcher_id: server_watcher_id, + file_path: normalized_file_path.clone(), + filename: req.filename.clone(), + file_id, + file_size: req.file_size, + revision: new_revision, + }; + + publish_file_uploaded_event(&handler.app_state.event_bus, &event_data) + .await; + publish_version_created_event(&handler.app_state.event_bus, &event_data) + .await; + + return Ok(Response::new(response::file_upload_success( + file_id, + new_revision, + ))); + } + Err(e) => { + error!("Failed to force upload after conflict resolution: {}", e); + } + _ => {} + } + } + + // Prepare conflict info with timestamps and suggested resolution + let suggested_resolution = + suggest_resolution_strategy(req.updated_time.is_some(), server_timestamp.is_some()); + + Ok(Response::new(crate::sync::UploadFileResponse { + success: false, + file_id, + new_revision: server_revision, + return_message: format!( + "File has been modified (server revision: {}, your revision: {}). Please sync before uploading.", + server_revision, client_revision + ), + conflict: Some(crate::sync::ConflictInfo { + r#type: crate::sync::conflict_info::ConflictType::RevisionConflict as i32, + conflicting_path: req.file_path.clone(), + conflicting_device: String::new(), + conflicting_revision: server_revision, + server_timestamp, + client_timestamp: req.updated_time.clone(), + suggested_resolution: suggested_resolution as i32, + }), + error_code: ErrorCode::RevisionConflict as i32, + current_revision: server_revision, + })) + } Err(e) => { error!("File storage failed: {}", e); - Ok(Response::new(response::file_upload_error(format!( - "File storage failed: {}", - e - )))) + Ok(Response::new(response::file_upload_error_with_code( + format!("File storage failed: {}", e), + ErrorCode::StorageFailed, + ))) } } } diff --git a/src/handlers/file_handler.rs b/src/handlers/file_handler.rs index 0ea11af..e8fc804 100644 --- a/src/handlers/file_handler.rs +++ b/src/handlers/file_handler.rs @@ -383,6 +383,42 @@ impl FileHandler { super::file::rename::handle_rename_file(self, req).await } + /// Handle batch operations request + pub async fn handle_batch_operations( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + super::file::batch::handle_batch_operations(self, req).await + } + + /// Handle upload file request (internal, without extracting request) + pub async fn handle_upload_file_internal( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + super::file::upload::handle_upload_file(self, req).await + } + + /// Handle delete file request (internal, without extracting request) + pub async fn handle_delete_file_internal( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + super::file::delete::handle_delete_file(self, req).await + } + + /// Handle rename file request (internal, without extracting request) + pub async fn handle_rename_file_internal( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + super::file::rename::handle_rename_file(self, req).await + } + /// Record rename history for debugging and conflict detection pub(crate) async fn record_rename_history( &self, diff --git a/src/server/service.rs b/src/server/service.rs index 378518b..49bab97 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -6,10 +6,11 @@ use crate::sync::sync_client_service_server::SyncClientService; pub use crate::sync::sync_service_server::SyncService; use crate::sync::{ AuthNotificationResponse, AuthSuccessNotification, AuthUpdateNotification, - BroadcastFileRestoreRequest, BroadcastFileRestoreResponse, CheckAuthStatusRequest, - CheckAuthStatusResponse, CheckFileExistsRequest, CheckFileExistsResponse, - CheckQuotaStatusRequest, CheckQuotaStatusResponse, DeleteDeviceRequest, DeleteDeviceResponse, - DeleteFileRequest, DeleteFileResponse, DeleteWatcherGroupRequest, DeleteWatcherGroupResponse, + BatchOperationsRequest, BatchOperationsResponse, BroadcastFileRestoreRequest, + BroadcastFileRestoreResponse, CheckAuthStatusRequest, CheckAuthStatusResponse, + CheckFileExistsRequest, CheckFileExistsResponse, CheckQuotaStatusRequest, + CheckQuotaStatusResponse, DeleteDeviceRequest, DeleteDeviceResponse, DeleteFileRequest, + DeleteFileResponse, DeleteWatcherGroupRequest, DeleteWatcherGroupResponse, DeviceUpdateNotification, DownloadFileChunk, DownloadFileRequest, DownloadFileResponse, EncryptionKeyUpdateNotification, FileUpdateNotification, FindFileRequest, FindFileResponse, GetAccountInfoRequest, GetAccountInfoResponse, GetFileHistoryRequest, GetFileHistoryResponse, @@ -495,6 +496,9 @@ impl SyncService for SyncServiceImpl { file_size: meta.file_size, key_id: meta.key_id, unix_permissions: meta.unix_permissions, + expected_revision: meta.expected_revision, + conflict_resolution: 0, // Default to MANUAL + client_timestamp: meta.client_timestamp.unwrap_or(0), }; self.file_handler @@ -812,6 +816,14 @@ impl SyncService for SyncServiceImpl { self.file_handler.handle_rename_file(request).await } + async fn batch_operations( + &self, + request: Request, + ) -> Result, Status> { + debug!("Batch operations request received"); + self.file_handler.handle_batch_operations(request).await + } + async fn restore_file( &self, request: Request, diff --git a/src/services/file_service.rs b/src/services/file_service.rs index e5a2736..f8856b6 100644 --- a/src/services/file_service.rs +++ b/src/services/file_service.rs @@ -1,7 +1,9 @@ use crate::models::device::Device; use crate::models::file::FileInfo as ModelFileInfo; use crate::models::file::FileInfo as FileInfoData; -use crate::storage::{FileStorage, Result as StorageResult, Storage, StorageError}; +use crate::storage::{ + DeleteFileResult, FileStorage, Result as StorageResult, Storage, StorageError, +}; use crate::sync; use crate::sync::FileUpdateNotification; use std::sync::Arc; @@ -118,11 +120,13 @@ impl FileService { &self, file_info: &ModelFileInfo, data: &[u8], - ) -> Result<(), StorageError> { + expected_revision: Option, + ) -> Result { self.store_file_with_update_type( file_info, data, sync::file_update_notification::UpdateType::Uploaded, + expected_revision, ) .await } @@ -246,22 +250,32 @@ impl FileService { file_info: &ModelFileInfo, data: &[u8], update_type: sync::file_update_notification::UpdateType, - ) -> Result<(), StorageError> { + expected_revision: Option, + ) -> Result { debug!( - "🔄 FileService::store_file_with_update_type started: file_id={}, filename={}, size={} bytes, update_type={:?}", + "🔄 FileService::store_file_with_update_type started: file_id={}, filename={}, size={} bytes, update_type={:?}, expected_revision={:?}", file_info.file_id, file_info.filename, data.len(), - update_type + update_type, + expected_revision ); // Store file metadata debug!("📄 Storing file metadata..."); - match self.storage.store_file_info(file_info.clone()).await { - Ok(_) => debug!("✅ File metadata stored successfully"), - Err(e) => { - error!("❌ Failed to store file metadata: {}", e); - return Err(e); + let store_result = self + .storage + .store_file_info(file_info.clone(), expected_revision) + .await?; + + // Check for revision conflict + match &store_result { + crate::storage::StoreFileResult::RevisionConflict { .. } => { + // Return conflict immediately without storing data + return Ok(store_result); + } + crate::storage::StoreFileResult::Success { .. } => { + debug!("✅ File metadata stored successfully"); } } @@ -286,7 +300,7 @@ impl FileService { "🎉 FileService::store_file_with_update_type completed successfully for file_id={}", file_info.file_id ); - Ok(()) + Ok(store_result) } /// Internal method to store file data @@ -581,9 +595,16 @@ impl FileService { } /// Handle file deletion - /// Returns (deletion_record_id, new_revision) - pub async fn delete_file(&self, file_id: u64) -> Result<(u64, i64), StorageError> { - info!("Start file deletion processing: file_id={}", file_id); + /// Returns DeleteFileResult with revision validation support + pub async fn delete_file( + &self, + file_id: u64, + client_revision: Option, + ) -> Result { + info!( + "Start file deletion processing: file_id={}, client_revision={:?}", + file_id, client_revision + ); // Query file information (include deletion status) let file_info_result = self.storage.get_file_info_include_deleted(file_id).await?; @@ -625,10 +646,17 @@ impl FileService { // Delete the actual active file instead match self .storage - .delete_file(&active_file.account_hash, active_file.file_id) + .delete_file( + &active_file.account_hash, + active_file.file_id, + client_revision, + ) .await { - Ok((deletion_record_id, new_revision)) => { + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) => { self.files.lock().await.remove(&active_file.file_id); info!( "✅ Successfully deleted actual active file: file_id={}, path={}, deletion_record_id={}", @@ -648,7 +676,18 @@ impl FileService { let _ = nm.broadcast_file_update(notification).await; } - return Ok((deletion_record_id, new_revision)); + return Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }); + } + Ok(result @ DeleteFileResult::RevisionConflict { .. }) => { + // Pass through revision conflict + return Ok(result); + } + Ok(DeleteFileResult::AlreadyDeleted) => { + info!("File already deleted: file_id={}", active_file.file_id); + return Ok(DeleteFileResult::AlreadyDeleted); } Err(e) => { error!( @@ -661,14 +700,14 @@ impl FileService { } Ok(None) => { info!("No active file found with same path, file is truly deleted: file_id={}", file_id); - return Ok((0, 0)); // Already deleted, no deletion_record_id or revision + return Ok(DeleteFileResult::AlreadyDeleted); } Err(e) => { warn!( "Error searching for active file: {}, treating as already deleted", e ); - return Ok((0, 0)); // Already deleted, no deletion_record_id or revision + return Ok(DeleteFileResult::AlreadyDeleted); } } } @@ -681,10 +720,13 @@ impl FileService { match self .storage - .delete_file(&file_info.account_hash, file_id) + .delete_file(&file_info.account_hash, file_id, client_revision) .await { - Ok((deletion_record_id, new_revision)) => { + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) => { // Delete from memory cache self.files.lock().await.remove(&file_id); @@ -718,7 +760,21 @@ impl FileService { file_info.filename, deletion_record_id ); - Ok((deletion_record_id, new_revision)) + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) + } + Ok(result @ DeleteFileResult::RevisionConflict { .. }) => { + warn!( + "Revision conflict during file deletion: file_id={}, result={:?}", + file_id, result + ); + Ok(result) + } + Ok(DeleteFileResult::AlreadyDeleted) => { + info!("File already deleted: file_id={}", file_id); + Ok(DeleteFileResult::AlreadyDeleted) } Err(e) => { error!("File deletion failed: file_id={}, error={}", file_id, e); @@ -764,7 +820,9 @@ impl FileService { "Storing file info: file_id={}, filename={}", file_info.file_id, file_info.filename ); - self.storage.store_file_info(file_info.clone()).await?; + self.storage + .store_file_info(file_info.clone(), None) + .await?; Ok(()) } @@ -1013,14 +1071,32 @@ impl FileService { new_file_path: &str, device_hash: &str, new_revision: i64, - ) -> Result<(), StorageError> { - info!( - "Renaming file: file_id={}, new_path={}, new_revision={}", - file_id, new_file_path, new_revision - ); + expected_revision: Option, + new_group_id: Option, + new_watcher_id: Option, + ) -> Result { + if new_group_id.is_some() || new_watcher_id.is_some() { + info!( + "Renaming/moving file: file_id={}, new_path={}, new_group_id={:?}, new_watcher_id={:?}, new_revision={}, expected_revision={:?}", + file_id, new_file_path, new_group_id, new_watcher_id, new_revision, expected_revision + ); + } else { + info!( + "Renaming file: file_id={}, new_path={}, new_revision={}, expected_revision={:?}", + file_id, new_file_path, new_revision, expected_revision + ); + } self.storage - .rename_file(file_id, new_file_path, device_hash, new_revision) + .rename_file( + file_id, + new_file_path, + device_hash, + new_revision, + expected_revision, + new_group_id, + new_watcher_id, + ) .await } diff --git a/src/services/version_service.rs b/src/services/version_service.rs index 62133a7..e51de41 100644 --- a/src/services/version_service.rs +++ b/src/services/version_service.rs @@ -166,7 +166,10 @@ impl VersionService for VersionServiceImpl { &self, request: GetFileHistoryRequest, ) -> Result { - debug!("Getting file history for path: {}", request.file_path); + debug!( + "Getting file history for path: {} (operation_type: {:?}, start_time: {:?}, end_time: {:?}, max_versions: {:?}, offset: {:?})", + request.file_path, request.operation_type, request.start_time, request.end_time, request.max_versions, request.offset + ); // Validate required fields if request.account_hash.is_empty() || request.file_path.is_empty() { @@ -179,19 +182,43 @@ impl VersionService for VersionServiceImpl { }); } - // Get file history from storage - let mut files = self + // Convert timestamp filters to Unix timestamps + let start_time = request.start_time.as_ref().map(|ts| ts.seconds); + let end_time = request.end_time.as_ref().map(|ts| ts.seconds); + + // Get operation_type filter + let operation_type = if let Some(ref op) = request.operation_type { + if !op.is_empty() { + Some(op.as_str()) + } else { + None + } + } else { + None + }; + + // Get pagination parameters + let limit = request.max_versions.or(Some(50)); // Default to 50 + let offset = request.offset.unwrap_or(0); + + // Get file history from storage with filters + let files = self .storage - .get_file_history(&request.account_hash, &request.file_path, request.group_id) + .get_file_history_with_options( + &request.account_hash, + &request.file_path, + request.group_id, + start_time, + end_time, + operation_type, + limit, + offset, + ) .await .map_err(|e| AppError::storage(format!("Failed to get file history: {}", e)))?; - let total_before_filter = files.len() as i32; - - // Apply filters - self.apply_history_filters(&mut files, &request); - - let has_more = files.len() < total_before_filter as usize; + let total_versions = files.len() as i32; + let has_more = limit.is_some() && files.len() == limit.unwrap() as usize; // Convert to FileVersionInfo let versions: Vec = files @@ -200,16 +227,18 @@ impl VersionService for VersionServiceImpl { .collect(); info!( - "Retrieved {} file versions for path: {}", + "Retrieved {} file versions for path: {} (total in result: {}, has_more: {})", versions.len(), - request.file_path + request.file_path, + total_versions, + has_more ); Ok(GetFileHistoryResponse { success: true, return_message: format!("Found {} versions", versions.len()), versions, - total_versions: total_before_filter, + total_versions, has_more, }) } diff --git a/src/storage/memory.rs b/src/storage/memory.rs index acba188..07303a4 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -9,7 +9,10 @@ use crate::models::device::Device; use crate::models::file::FileInfo; use crate::models::watcher::WatcherGroup; -use crate::storage::{Result, Storage, StorageError, StorageMetrics}; +use crate::storage::{ + DeleteFileResult, RenameFileResult, Result, Storage, StorageError, StorageMetrics, + StoreFileResult, +}; use crate::sync::WatcherGroupData; // In-memory storage data structure (using Mutex for thread safety) @@ -249,8 +252,12 @@ impl Storage for MemoryStorage { Ok(valid) } - /// Store file info - async fn store_file_info(&self, file: FileInfo) -> crate::storage::Result { + /// Store file info with revision validation + async fn store_file_info( + &self, + file: FileInfo, + expected_revision: Option, + ) -> crate::storage::Result { let mut data = self.data.lock().await; let file_id = if file.file_id == 0 { @@ -262,14 +269,40 @@ impl Storage for MemoryStorage { file.file_id }; + // Check for revision conflict if updating existing file + if file.file_id != 0 { + if let Some(existing_file) = data.files.get(&file.file_id) { + // Validate revision if client provided one + if let Some(expected_rev) = expected_revision { + if expected_rev > 0 && existing_file.revision != expected_rev { + return Ok(StoreFileResult::RevisionConflict { + server_revision: existing_file.revision, + client_revision: expected_rev, + }); + } + } + } + } + // Assign ID to file info let mut file_info = file; file_info.file_id = file_id; + // Increment revision if updating + let new_revision = if data.files.contains_key(&file_id) { + file_info.revision + 1 + } else { + 1 + }; + file_info.revision = new_revision; + // Store file info data.files.insert(file_id, file_info); - Ok(file_id) + Ok(StoreFileResult::Success { + file_id, + new_revision, + }) } /// Get file info by id @@ -375,12 +408,13 @@ impl Storage for MemoryStorage { } /// Delete a file including its metadata and content - /// Returns (dummy_deletion_record_id, dummy_revision) for MemoryStorage (testing only) + /// Returns DeleteFileResult with revision validation support async fn delete_file( &self, account_hash: &str, file_id: u64, - ) -> crate::storage::Result<(u64, i64)> { + client_revision: Option, + ) -> crate::storage::Result { let mut data = self.data.lock().await; if let Some(file) = data.files.get(&file_id) { @@ -388,6 +422,16 @@ impl Storage for MemoryStorage { if file.account_hash == account_hash { let current_revision = file.revision; + // Validate revision if client provided one + if let Some(client_rev) = client_revision { + if client_rev > 0 && current_revision != client_rev { + return Ok(DeleteFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: client_rev, + }); + } + } + // Delete file data data.file_data.remove(&file_id); @@ -397,7 +441,10 @@ impl Storage for MemoryStorage { // Return dummy deletion_record_id and new_revision for testing let dummy_deletion_record_id = rand::random::(); let new_revision = current_revision + 1; - Ok((dummy_deletion_record_id, new_revision)) + Ok(DeleteFileResult::Success { + deletion_record_id: dummy_deletion_record_id, + new_revision, + }) } else { Err(StorageError::PermissionDenied(format!( "File not owned by user: {}", @@ -1009,19 +1056,40 @@ impl Storage for MemoryStorage { new_file_path: &str, device_hash: &str, new_revision: i64, - ) -> crate::storage::Result<()> { + expected_revision: Option, + new_group_id: Option, + new_watcher_id: Option, + ) -> crate::storage::Result { let mut data = self.data.lock().await; if let Some(file) = data.files.get_mut(&file_id) { + let current_revision = file.revision; + + // Validate revision if expected_revision is provided + if let Some(expected_rev) = expected_revision { + if expected_rev > 0 && current_revision != expected_rev { + return Ok(RenameFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: expected_rev, + }); + } + } + file.file_path = new_file_path.to_string(); file.device_hash = device_hash.to_string(); file.revision = new_revision; - Ok(()) + + // Update group_id and watcher_id if provided (cross-watcher MOVE) + if let Some(group_id) = new_group_id { + file.group_id = group_id; + } + if let Some(watcher_id) = new_watcher_id { + file.watcher_id = watcher_id; + } + + Ok(RenameFileResult::Success { new_revision }) } else { - Err(crate::storage::StorageError::NotFound(format!( - "File not found: {}", - file_id - ))) + Ok(RenameFileResult::FileNotFound) } } @@ -1279,12 +1347,37 @@ impl Storage for MemoryStorage { file_path: &str, group_id: i32, ) -> crate::storage::Result> { - // For MemoryStorage, we return an empty list as this is primarily for testing - // In a real implementation, this would query a files history table + self.get_file_history_with_options( + account_hash, + file_path, + group_id, + None, // start_time + None, // end_time + None, // operation_type + Some(50), // max_versions (default limit) + 0, // offset + ) + .await + } + + async fn get_file_history_with_options( + &self, + _account_hash: &str, + file_path: &str, + group_id: i32, + start_time: Option, + end_time: Option, + operation_type: Option<&str>, + limit: Option, + offset: i32, + ) -> crate::storage::Result> { debug!( - "MemoryStorage: Getting file history for path: {} in group: {} (returning empty list)", - file_path, group_id + "MemoryStorage: Getting file history for path: {} in group: {} (limit: {:?}, offset: {}, start_time: {:?}, end_time: {:?}, operation_type: {:?}) - returning empty (no version history in memory storage)", + file_path, group_id, limit, offset, start_time, end_time, operation_type ); + + // MemoryStorage doesn't track version history (it's primarily for testing) + // Return empty list - in a real implementation, this would query file_versions table Ok(Vec::new()) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b3a29b7..5c2189e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -175,6 +175,49 @@ impl From for SyncError { } } +/// Result type for file deletion operations with revision conflict support +#[derive(Debug, Clone)] +pub enum DeleteFileResult { + /// Deletion successful + Success { + deletion_record_id: u64, + new_revision: i64, + }, + /// Revision conflict - client has outdated revision + RevisionConflict { + server_revision: i64, + client_revision: i64, + }, + /// File already deleted + AlreadyDeleted, +} + +/// Result type for file upload/store operations with revision conflict support +#[derive(Debug, Clone)] +pub enum StoreFileResult { + /// Upload successful + Success { file_id: u64, new_revision: i64 }, + /// Revision conflict - client has outdated revision + RevisionConflict { + server_revision: i64, + client_revision: i64, + }, +} + +/// Result type for file rename operations with revision conflict support +#[derive(Debug, Clone)] +pub enum RenameFileResult { + /// Rename successful + Success { new_revision: i64 }, + /// Revision conflict - client has outdated revision + RevisionConflict { + server_revision: i64, + client_revision: i64, + }, + /// File not found + FileNotFound, +} + /// Performance metrics for storage operations #[derive(Debug, Clone, Default)] pub struct StorageMetrics { @@ -280,7 +323,11 @@ pub trait Storage: Sync + Send { async fn validate_device(&self, account_hash: &str, device_hash: &str) -> Result; // File related methods with streaming support - async fn store_file_info(&self, file: FileInfo) -> Result; + async fn store_file_info( + &self, + file: FileInfo, + expected_revision: Option, + ) -> Result; async fn get_file_info(&self, file_id: u64) -> Result>; async fn get_file_info_include_deleted(&self, file_id: u64) -> Result>; @@ -308,7 +355,12 @@ pub trait Storage: Sync + Send { exclude_device_hash: &str, upload_time_from: Option, ) -> Result>; - async fn delete_file(&self, account_hash: &str, file_id: u64) -> Result<(u64, i64)>; + async fn delete_file( + &self, + account_hash: &str, + file_id: u64, + client_revision: Option, + ) -> Result; /// Restore deleted file from deletion record async fn restore_file(&self, account_hash: &str, delete_record_id: u64) -> Result; @@ -331,13 +383,17 @@ pub trait Storage: Sync + Send { async fn check_file_exists(&self, file_id: u64) -> Result<(bool, bool)>; /// Rename file (update path, device_hash, and revision) + /// If new_group_id/new_watcher_id are provided, also updates those (cross-watcher MOVE) async fn rename_file( &self, file_id: u64, new_file_path: &str, device_hash: &str, new_revision: i64, - ) -> Result<()>; + expected_revision: Option, + new_group_id: Option, + new_watcher_id: Option, + ) -> Result; /// Record rename history for debugging and conflict detection async fn record_rename_history( @@ -517,6 +573,29 @@ pub trait Storage: Sync + Send { group_id: i32, ) -> Result>; + /// Get file history with advanced filtering and pagination + /// + /// # Parameters + /// - `account_hash`: User account hash + /// - `file_path`: File path to get history for + /// - `group_id`: Group ID + /// - `start_time`: Optional Unix timestamp for filtering history from this time + /// - `end_time`: Optional Unix timestamp for filtering history until this time + /// - `operation_type`: Optional filter by operation type (UPLOAD, DELETE, RENAME, MOVE) + /// - `limit`: Optional maximum number of versions to return + /// - `offset`: Number of records to skip (for pagination) + async fn get_file_history_with_options( + &self, + account_hash: &str, + file_path: &str, + group_id: i32, + start_time: Option, + end_time: Option, + operation_type: Option<&str>, + limit: Option, + offset: i32, + ) -> Result>; + /// Get file versions by file ID async fn get_file_versions_by_id( &self, diff --git a/src/storage/mysql.rs b/src/storage/mysql.rs index 3ca9d03..ace5b2a 100644 --- a/src/storage/mysql.rs +++ b/src/storage/mysql.rs @@ -10,7 +10,10 @@ use crate::models::device::Device; use crate::models::file::FileInfo; use crate::models::file::FileNotice; use crate::models::watcher::WatcherCondition; -use crate::storage::{Result, Storage, StorageError, StorageMetrics}; +use crate::storage::{ + DeleteFileResult, RenameFileResult, Result, Storage, StorageError, StorageMetrics, + StoreFileResult, +}; // Using MySQL modules use crate::storage::mysql_account::*; @@ -1379,9 +1382,13 @@ impl Storage for MySqlStorage { MySqlDeviceExt::validate_device(self, account_hash, device_hash).await } - /// Store file information - async fn store_file_info(&self, file_info: crate::models::file::FileInfo) -> Result { - MySqlFileExt::store_file_info(self, file_info).await + /// Store file information with revision validation + async fn store_file_info( + &self, + file_info: crate::models::file::FileInfo, + expected_revision: Option, + ) -> Result { + MySqlFileExt::store_file_info(self, file_info, expected_revision).await } /// Get file information @@ -1446,9 +1453,14 @@ impl Storage for MySqlStorage { .await } - /// Delete file - async fn delete_file(&self, account_hash: &str, file_id: u64) -> Result<(u64, i64)> { - MySqlFileExt::delete_file(self, account_hash, file_id).await + /// Delete file with revision validation + async fn delete_file( + &self, + account_hash: &str, + file_id: u64, + client_revision: Option, + ) -> Result { + MySqlFileExt::delete_file(self, account_hash, file_id, client_revision).await } async fn restore_file(&self, account_hash: &str, delete_record_id: u64) -> Result { @@ -1660,12 +1672,15 @@ impl Storage for MySqlStorage { new_file_path: &str, device_hash: &str, new_revision: i64, - ) -> Result<()> { + expected_revision: Option, + new_group_id: Option, + new_watcher_id: Option, + ) -> Result { use sqlx::Row; - // First, get file information to obtain account_hash, group_id, watcher_id for encryption + // First, get file information to obtain account_hash, group_id, watcher_id for encryption and current revision let row_opt = sqlx::query( - r#"SELECT account_hash, group_id, watcher_id FROM files WHERE file_id = ? LIMIT 1"#, + r#"SELECT account_hash, group_id, watcher_id, revision FROM files WHERE file_id = ? LIMIT 1"#, ) .bind(file_id) .fetch_optional(&self.sqlx_pool) @@ -1677,27 +1692,53 @@ impl Storage for MySqlStorage { )) })?; - let (account_hash, group_id, watcher_id) = match row_opt { + let (account_hash, current_group_id, current_watcher_id, current_revision) = match row_opt { Some(row) => { let account_hash: String = row.try_get("account_hash").unwrap_or_default(); let group_id: i32 = row.try_get("group_id").unwrap_or(0); let watcher_id: i32 = row.try_get("watcher_id").unwrap_or(0); - (account_hash, group_id, watcher_id) + let current_revision: i64 = row.try_get("revision").unwrap_or(0); + (account_hash, group_id, watcher_id, current_revision) } None => { - return Err(StorageError::NotFound(format!( - "File not found: {}", - file_id - ))); + return Ok(RenameFileResult::FileNotFound); } }; - // Encrypt new_file_path for VARBINARY storage + // Determine final group_id and watcher_id (use new values if provided, otherwise keep current) + let final_group_id = new_group_id.unwrap_or(current_group_id); + let final_watcher_id = new_watcher_id.unwrap_or(current_watcher_id); + + // Check if this is a cross-watcher MOVE + let is_cross_watcher_move = + final_group_id != current_group_id || final_watcher_id != current_watcher_id; + if is_cross_watcher_move { + info!( + "Cross-watcher MOVE detected: file_id={}, from group={}/watcher={} to group={}/watcher={}", + file_id, current_group_id, current_watcher_id, final_group_id, final_watcher_id + ); + } + + // Validate revision if expected_revision is provided + if let Some(expected_rev) = expected_revision { + if expected_rev > 0 && current_revision != expected_rev { + warn!( + "Revision conflict on rename: file_id={}, current_revision={}, expected_revision={}", + file_id, current_revision, expected_rev + ); + return Ok(RenameFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: expected_rev, + }); + } + } + + // Encrypt new_file_path for VARBINARY storage (use final group_id/watcher_id for AAD) let cfg = crate::server::app_state::AppState::get_config(); let new_file_path_bytes = if let Some(kv) = cfg.server_encode_key.as_ref() { if kv.len() == 32 { let key: &[u8; 32] = kv.as_slice().try_into().expect("len checked"); - let aad = format!("{}:{}:{}", account_hash, group_id, watcher_id); + let aad = format!("{}:{}:{}", account_hash, final_group_id, final_watcher_id); crate::utils::crypto::aead_encrypt(key, new_file_path.as_bytes(), aad.as_bytes()) } else { new_file_path.as_bytes().to_vec() @@ -1706,18 +1747,26 @@ impl Storage for MySqlStorage { new_file_path.as_bytes().to_vec() }; - // Update file_path with encrypted bytes (VARBINARY) and set operation_type to RENAME - let query = "UPDATE files SET file_path = ?, device_hash = ?, revision = ?, operation_type = 'RENAME', updated_time = NOW() WHERE file_id = ?"; + // Update file_path, group_id, watcher_id with encrypted bytes (VARBINARY) and set operation_type + let operation_type = if is_cross_watcher_move { + "MOVE" + } else { + "RENAME" + }; + let query = "UPDATE files SET file_path = ?, device_hash = ?, revision = ?, group_id = ?, watcher_id = ?, operation_type = ?, updated_time = NOW() WHERE file_id = ?"; sqlx::query(query) .bind(&new_file_path_bytes) .bind(device_hash) .bind(new_revision) + .bind(final_group_id) + .bind(final_watcher_id) + .bind(operation_type) .bind(file_id) .execute(&self.sqlx_pool) .await .map_err(|e| { StorageError::Database(format!( - "Failed to rename file (schema/VARBINARY type mismatch): {}", + "Failed to rename/move file (schema/VARBINARY type mismatch): {}", e )) })?; @@ -1726,7 +1775,7 @@ impl Storage for MySqlStorage { "File renamed in database: file_id={}, new_path={}, new_revision={}", file_id, new_file_path, new_revision ); - Ok(()) + Ok(RenameFileResult::Success { new_revision }) } /// Record rename history for debugging and conflict detection @@ -2029,14 +2078,41 @@ impl Storage for MySqlStorage { account_hash: &str, file_path: &str, group_id: i32, + ) -> Result> { + self.get_file_history_with_options( + account_hash, + file_path, + group_id, + None, // start_time + None, // end_time + None, // operation_type + Some(50), // max_versions (default limit) + 0, // offset + ) + .await + } + + /// Get file history with advanced filtering and pagination + async fn get_file_history_with_options( + &self, + account_hash: &str, + file_path: &str, + group_id: i32, + start_time: Option, + end_time: Option, + operation_type: Option<&str>, + limit: Option, + offset: i32, ) -> Result> { use sqlx::Row; + debug!( - "Getting file history for path: {} in group: {}", - file_path, group_id + "Getting file history for path: {} in group: {} (limit: {:?}, offset: {}, start_time: {:?}, end_time: {:?}, operation_type: {:?})", + file_path, group_id, limit, offset, start_time, end_time, operation_type ); - let rows = sqlx::query( + // Build dynamic query with filters + let mut query = String::from( r#"SELECT account_hash, device_hash, @@ -2050,17 +2126,55 @@ impl Storage for MySqlStorage { UNIX_TIMESTAMP(created_time) AS created_ts, UNIX_TIMESTAMP(updated_time) AS updated_ts, is_deleted, - revision + revision, + operation_type FROM files - WHERE account_hash = ? AND file_path = ? AND group_id = ? - ORDER BY revision DESC"#, - ) - .bind(account_hash) - .bind(file_path) - .bind(group_id) - .fetch_all(self.get_sqlx_pool()) - .await - .map_err(|e| StorageError::Database(format!("Failed to get file history: {}", e)))?; + WHERE account_hash = ? AND file_path = ? AND group_id = ?"#, + ); + + // Add time range filters + if start_time.is_some() { + query.push_str(" AND updated_time >= FROM_UNIXTIME(?)"); + } + if end_time.is_some() { + query.push_str(" AND updated_time <= FROM_UNIXTIME(?)"); + } + + // Add operation type filter + if operation_type.is_some() { + query.push_str(" AND operation_type = ?"); + } + + query.push_str(" ORDER BY revision DESC"); + + // Add pagination + if let Some(lim) = limit { + query.push_str(&format!(" LIMIT {}", lim)); + } + if offset > 0 { + query.push_str(&format!(" OFFSET {}", offset)); + } + + // Build query with dynamic bindings + let mut sql_query = sqlx::query(&query) + .bind(account_hash) + .bind(file_path) + .bind(group_id); + + if let Some(start) = start_time { + sql_query = sql_query.bind(start); + } + if let Some(end) = end_time { + sql_query = sql_query.bind(end); + } + if let Some(op_type) = operation_type { + sql_query = sql_query.bind(op_type); + } + + let rows = sql_query + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| StorageError::Database(format!("Failed to get file history: {}", e)))?; let mut files = Vec::with_capacity(rows.len()); for row in rows { diff --git a/src/storage/mysql_file.rs b/src/storage/mysql_file.rs index 7eed27a..c56d5d8 100644 --- a/src/storage/mysql_file.rs +++ b/src/storage/mysql_file.rs @@ -6,12 +6,16 @@ use tracing::{debug, error, info, warn}; use crate::models::file::FileInfo; use crate::storage::mysql::MySqlStorage; -use crate::storage::{Result, StorageError}; +use crate::storage::{DeleteFileResult, Result, StorageError, StoreFileResult}; /// MySQL file-related functionality extension trait pub trait MySqlFileExt { - /// Store file information - async fn store_file_info(&self, file_info: FileInfo) -> Result; + /// Store file information with revision validation + async fn store_file_info( + &self, + file_info: FileInfo, + expected_revision: Option, + ) -> Result; /// Query file information async fn get_file_info(&self, file_id: u64) -> Result>; @@ -53,9 +57,14 @@ pub trait MySqlFileExt { filename: &str, ) -> Result>; - /// Delete file - /// Returns (deletion_record_id, new_revision) - async fn delete_file(&self, account_hash: &str, file_id: u64) -> Result<(u64, i64)>; + /// Delete file with revision validation + /// Returns DeleteFileResult + async fn delete_file( + &self, + account_hash: &str, + file_id: u64, + client_revision: Option, + ) -> Result; /// Restore deleted file from deletion record async fn restore_file(&self, account_hash: &str, delete_record_id: u64) -> Result; @@ -100,8 +109,12 @@ pub trait MySqlFileExt { } impl MySqlFileExt for MySqlStorage { - /// Store file information - async fn store_file_info(&self, file_info: FileInfo) -> Result { + /// Store file information with revision validation + async fn store_file_info( + &self, + file_info: FileInfo, + expected_revision: Option, + ) -> Result { let now = Utc::now().timestamp(); let updated_time = file_info.updated_time.seconds; @@ -118,7 +131,22 @@ impl MySqlFileExt for MySqlStorage { .await .map_err(|e| { error!("❌ File check by file_id failed(sqlx): {}", e); StorageError::Database(format!("File check by file_id failed: {}", e)) })?; - if let Some((_existing_file_id, _current_revision)) = existing_by_file_id { + if let Some((_existing_file_id, current_revision)) = existing_by_file_id { + // Validate revision if client provided one + if let Some(expected_rev) = expected_revision { + if expected_rev > 0 && current_revision != expected_rev { + warn!( + "Revision conflict on upload: file_id={}, server_revision={}, client_expected={}", + file_info.file_id, current_revision, expected_rev + ); + let _ = tx.rollback().await; + return Ok(StoreFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: expected_rev, + }); + } + } + // Case where the same file_id already exists - update only the file information // Update existing file information @@ -161,7 +189,12 @@ impl MySqlFileExt for MySqlStorage { StorageError::Database(format!("Transaction commit failed: {}", e)) })?; - return Ok(file_info.file_id); + // Return success with incremented revision + let new_revision = current_revision + 1; + return Ok(StoreFileResult::Success { + file_id: file_info.file_id, + new_revision, + }); } debug!("🔍 Checking active files..."); @@ -318,7 +351,10 @@ impl MySqlFileExt for MySqlStorage { StorageError::Database(format!("Transaction commit failed: {}", e)) })?; - Ok(file_info.file_id) + Ok(StoreFileResult::Success { + file_id: file_info.file_id, + new_revision, + }) } /// Query file information @@ -760,10 +796,15 @@ impl MySqlFileExt for MySqlStorage { /// Delete file (metadata and content) /// Returns (deletion_record_id, new_revision) - async fn delete_file(&self, account_hash: &str, file_id: u64) -> Result<(u64, i64)> { + async fn delete_file( + &self, + account_hash: &str, + file_id: u64, + client_revision: Option, + ) -> Result { info!( - "Deleting file: account_hash={}, file_id={}", - account_hash, file_id + "Deleting file: account_hash={}, file_id={}, client_revision={:?}", + account_hash, file_id, client_revision ); let mut tx = self.get_sqlx_pool().begin().await.map_err(|e| { StorageError::Database(format!( @@ -804,6 +845,22 @@ impl MySqlFileExt for MySqlStorage { use sqlx::Row; let row = row_opt.unwrap(); let current_revision: i64 = row.try_get("revision").unwrap_or(0); + + // Validate revision if client provided one + if let Some(client_rev) = client_revision { + if client_rev > 0 && current_revision != client_rev { + warn!( + "Revision conflict detected: file_id={}, server_revision={}, client_revision={}", + file_id, current_revision, client_rev + ); + // Rollback transaction before returning + let _ = tx.rollback().await; + return Ok(DeleteFileResult::RevisionConflict { + server_revision: current_revision, + client_revision: client_rev, + }); + } + } let file_path_bytes: Vec = row.try_get("file_path").unwrap_or_default(); let filename_bytes: Vec = row.try_get("filename").unwrap_or_default(); let file_hash_bytes: Vec = row.try_get("file_hash").unwrap_or_default(); @@ -893,7 +950,10 @@ impl MySqlFileExt for MySqlStorage { "File deletion complete: file_id={}, new_revision={}, deletion_history_file_id={}", file_id, new_revision, deletion_record_id ); - Ok((deletion_record_id, new_revision)) + Ok(DeleteFileResult::Success { + deletion_record_id, + new_revision, + }) } /// Restore deleted file from deletion record diff --git a/src/utils/conflict_resolution.rs b/src/utils/conflict_resolution.rs new file mode 100644 index 0000000..aa74940 --- /dev/null +++ b/src/utils/conflict_resolution.rs @@ -0,0 +1,219 @@ +use prost_types::Timestamp; +use tracing::{debug, info}; + +use crate::sync::conflict_info::{ConflictType, ResolutionStrategy}; +use crate::sync::ConflictInfo; + +/// Conflict resolution decision +#[derive(Debug, Clone, PartialEq)] +pub enum ResolutionDecision { + /// Accept client's version (overwrite server) + AcceptClient, + /// Reject client's version (keep server) + RejectClient, + /// Cannot auto-resolve, require manual intervention + RequiresManual, +} + +/// Resolve conflict based on timestamps and strategy +pub fn resolve_revision_conflict( + client_timestamp: Option<&Timestamp>, + server_timestamp: Option<&Timestamp>, + strategy: ResolutionStrategy, +) -> ResolutionDecision { + match strategy { + ResolutionStrategy::Manual => { + debug!("Conflict resolution strategy: MANUAL - requires user intervention"); + ResolutionDecision::RequiresManual + } + ResolutionStrategy::ServerWins => { + debug!("Conflict resolution strategy: SERVER_WINS - rejecting client"); + ResolutionDecision::RejectClient + } + ResolutionStrategy::ClientWins => { + debug!("Conflict resolution strategy: CLIENT_WINS - accepting client"); + ResolutionDecision::AcceptClient + } + ResolutionStrategy::LastWriteWins => { + resolve_last_write_wins(client_timestamp, server_timestamp) + } + } +} + +/// Resolve using last-write-wins strategy (newest timestamp wins) +fn resolve_last_write_wins( + client_timestamp: Option<&Timestamp>, + server_timestamp: Option<&Timestamp>, +) -> ResolutionDecision { + match (client_timestamp, server_timestamp) { + (Some(client_ts), Some(server_ts)) => { + let client_secs = client_ts.seconds; + let client_nanos = client_ts.nanos; + let server_secs = server_ts.seconds; + let server_nanos = server_ts.nanos; + + // Compare timestamps + if client_secs > server_secs + || (client_secs == server_secs && client_nanos > server_nanos) + { + info!( + "LAST_WRITE_WINS: Client timestamp ({}.{:09}) is newer than server ({}.{:09}) - accepting client", + client_secs, client_nanos, server_secs, server_nanos + ); + ResolutionDecision::AcceptClient + } else if client_secs < server_secs + || (client_secs == server_secs && client_nanos < server_nanos) + { + info!( + "LAST_WRITE_WINS: Server timestamp ({}.{:09}) is newer than client ({}.{:09}) - rejecting client", + server_secs, server_nanos, client_secs, client_nanos + ); + ResolutionDecision::RejectClient + } else { + // Exact same timestamp - fallback to manual resolution + debug!( + "LAST_WRITE_WINS: Timestamps are identical ({}.{:09}) - requires manual resolution", + client_secs, client_nanos + ); + ResolutionDecision::RequiresManual + } + } + (None, _) | (_, None) => { + debug!("LAST_WRITE_WINS: Missing timestamp information - requires manual resolution"); + ResolutionDecision::RequiresManual + } + } +} + +/// Suggest resolution strategy based on conflict context +pub fn suggest_resolution_strategy( + has_client_timestamp: bool, + has_server_timestamp: bool, +) -> ResolutionStrategy { + if has_client_timestamp && has_server_timestamp { + // Both sides have timestamps - can use last-write-wins + ResolutionStrategy::LastWriteWins + } else { + // Missing timestamp information - require manual resolution + ResolutionStrategy::Manual + } +} + +/// Create ConflictInfo with default values for backward compatibility +pub fn create_conflict_info( + conflict_type: ConflictType, + conflicting_path: String, + conflicting_device: String, + conflicting_revision: i64, +) -> ConflictInfo { + ConflictInfo { + r#type: conflict_type as i32, + conflicting_path, + conflicting_device, + conflicting_revision, + server_timestamp: None, + client_timestamp: None, + suggested_resolution: ResolutionStrategy::Manual as i32, + } +} + +/// Create ConflictInfo with timestamps +pub fn create_conflict_info_with_timestamps( + conflict_type: ConflictType, + conflicting_path: String, + conflicting_device: String, + conflicting_revision: i64, + server_timestamp: Option, + client_timestamp: Option, + suggested_resolution: ResolutionStrategy, +) -> ConflictInfo { + ConflictInfo { + r#type: conflict_type as i32, + conflicting_path, + conflicting_device, + conflicting_revision, + server_timestamp, + client_timestamp, + suggested_resolution: suggested_resolution as i32, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn create_timestamp(seconds: i64, nanos: i32) -> Timestamp { + Timestamp { seconds, nanos } + } + + #[test] + fn test_last_write_wins_client_newer() { + let client_ts = create_timestamp(1000, 500); + let server_ts = create_timestamp(1000, 400); + + let decision = resolve_last_write_wins(Some(&client_ts), Some(&server_ts)); + assert_eq!(decision, ResolutionDecision::AcceptClient); + } + + #[test] + fn test_last_write_wins_server_newer() { + let client_ts = create_timestamp(1000, 400); + let server_ts = create_timestamp(1000, 500); + + let decision = resolve_last_write_wins(Some(&client_ts), Some(&server_ts)); + assert_eq!(decision, ResolutionDecision::RejectClient); + } + + #[test] + fn test_last_write_wins_same_timestamp() { + let client_ts = create_timestamp(1000, 500); + let server_ts = create_timestamp(1000, 500); + + let decision = resolve_last_write_wins(Some(&client_ts), Some(&server_ts)); + assert_eq!(decision, ResolutionDecision::RequiresManual); + } + + #[test] + fn test_last_write_wins_missing_timestamp() { + let client_ts = create_timestamp(1000, 500); + + let decision = resolve_last_write_wins(Some(&client_ts), None); + assert_eq!(decision, ResolutionDecision::RequiresManual); + + let decision = resolve_last_write_wins(None, Some(&client_ts)); + assert_eq!(decision, ResolutionDecision::RequiresManual); + } + + #[test] + fn test_strategy_server_wins() { + let decision = resolve_revision_conflict(None, None, ResolutionStrategy::ServerWins); + assert_eq!(decision, ResolutionDecision::RejectClient); + } + + #[test] + fn test_strategy_client_wins() { + let decision = resolve_revision_conflict(None, None, ResolutionStrategy::ClientWins); + assert_eq!(decision, ResolutionDecision::AcceptClient); + } + + #[test] + fn test_strategy_manual() { + let decision = resolve_revision_conflict(None, None, ResolutionStrategy::Manual); + assert_eq!(decision, ResolutionDecision::RequiresManual); + } + + #[test] + fn test_suggest_strategy_with_timestamps() { + let strategy = suggest_resolution_strategy(true, true); + assert_eq!(strategy, ResolutionStrategy::LastWriteWins); + } + + #[test] + fn test_suggest_strategy_without_timestamps() { + let strategy = suggest_resolution_strategy(false, false); + assert_eq!(strategy, ResolutionStrategy::Manual); + + let strategy = suggest_resolution_strategy(true, false); + assert_eq!(strategy, ResolutionStrategy::Manual); + } +} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 1766d56..61779a1 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -1,4 +1,5 @@ pub mod auth; // Authentication helpers +pub mod conflict_resolution; // Conflict resolution strategies pub mod crypto; pub mod db; pub mod events; // Event publishing utilities diff --git a/src/utils/response.rs b/src/utils/response.rs index 763de86..ab9c505 100644 --- a/src/utils/response.rs +++ b/src/utils/response.rs @@ -1,17 +1,28 @@ use crate::sync::{ - DeleteFileResponse, DownloadFileResponse, RegisterDeviceResponse, UploadFileResponse, + DeleteFileResponse, DownloadFileResponse, ErrorCode, RegisterDeviceResponse, UploadFileResponse, }; -/// Create error response for file upload -pub fn file_upload_error(message: impl Into) -> UploadFileResponse { +/// Create error response for file upload with custom error code +pub fn file_upload_error_with_code( + message: impl Into, + error_code: ErrorCode, +) -> UploadFileResponse { UploadFileResponse { success: false, file_id: 0, new_revision: 0, return_message: message.into(), + conflict: None, + error_code: error_code as i32, + current_revision: 0, } } +/// Create error response for file upload (defaults to UnknownError) +pub fn file_upload_error(message: impl Into) -> UploadFileResponse { + file_upload_error_with_code(message, ErrorCode::UnknownError) +} + /// Create success response for file upload pub fn file_upload_success(file_id: u64, new_revision: i64) -> UploadFileResponse { UploadFileResponse { @@ -19,6 +30,9 @@ pub fn file_upload_success(file_id: u64, new_revision: i64) -> UploadFileRespons file_id, new_revision, return_message: "OK".to_string(), + conflict: None, + error_code: ErrorCode::Success as i32, + current_revision: 0, } } @@ -39,16 +53,27 @@ pub fn file_download_error(message: impl Into) -> DownloadFileResponse { } } -/// Create error response for file deletion -pub fn file_delete_error(message: impl Into) -> DeleteFileResponse { +/// Create error response for file deletion with custom error code +pub fn file_delete_error_with_code( + message: impl Into, + error_code: ErrorCode, +) -> DeleteFileResponse { DeleteFileResponse { success: false, return_message: message.into(), delete_record_id: 0, // No deletion record for errors new_revision: 0, // No revision for errors + conflict: None, // No conflict info for general errors + error_code: error_code as i32, + current_revision: 0, } } +/// Create error response for file deletion (defaults to UnknownError) +pub fn file_delete_error(message: impl Into) -> DeleteFileResponse { + file_delete_error_with_code(message, ErrorCode::UnknownError) +} + /// Create success response for file deletion pub fn file_delete_success( message: impl Into, @@ -60,6 +85,9 @@ pub fn file_delete_success( return_message: message.into(), delete_record_id, new_revision, + conflict: None, // No conflict on success + error_code: ErrorCode::Success as i32, + current_revision: 0, } }