diff --git a/.env.cloud_test b/.env.cloud_test new file mode 100644 index 0000000..20d322c --- /dev/null +++ b/.env.cloud_test @@ -0,0 +1,47 @@ +# Temporary cloud DB configuration for migration testing +DB_USER="genesis76" +DB_PASS="VC1OGzY4ukM5DaCrVZJb6832XWYc08U072k4yp0bZ" +DB_NAME="cosmic_sync" +DB_HOST="127.0.0.1" +DB_PORT=63306 +DB_POOL=5 + +SERVER_HOST=0.0.0.0 +SERVER_PORT=50051 +WORKER_THREADS=4 +HEARTBEAT_INTERVAL_SECS=30 +AUTH_TOKEN_EXPIRY_HOURS=24 + +OAUTH_CLIENT_ID=cosmic-sync +OAUTH_CLIENT_SECRET=cosmicsecretsocmicsecret +OAUTH_REDIRECT_URI=http://10.241.62.167:8080/oauth/callback +OAUTH_AUTH_URL=http://10.241.62.167:4000/oauth/authorize +OAUTH_TOKEN_URL=http://10.241.62.167:4000/oauth/token +OAUTH_USER_INFO_URL=http://10.241.62.167:4000/api/settings +OAUTH_SCOPE=profile:read + +MAX_CONCURRENT_REQUESTS=100 +MAX_FILE_SIZE=52428800 + +RUST_LOG=cosmic_sync_server=debug,info +LOG_LEVEL=debug +LOG_TO_FILE=false + +STORAGE_TYPE="s3" +AWS_REGION="us-east-2" +S3_BUCKET="cosmic-sync-files" +S3_KEY_PREFIX="files/" +AWS_ACCESS_KEY_ID="minioadmin" +AWS_SECRET_ACCESS_KEY="minioadmin" +S3_ENDPOINT_URL="http://127.0.0.1:9000" +S3_FORCE_PATH_STYLE="true" +S3_TIMEOUT_SECONDS="30" +S3_MAX_RETRIES="3" + +COSMIC_SYNC_DEV_MODE="1" +COSMIC_SYNC_TEST_MODE="1" +COSMIC_SYNC_DEBUG_MODE="1" + +SERVER_ENCODE_KEY=c3e15e2f727cf777380f23a9f9fa8156c5f4f7f3e697f6dc95a47372e76ac6bf + +RABBITMQ_ENABLED=false diff --git a/Dockerfile b/Dockerfile index db98311..54d421a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -35,7 +35,6 @@ RUN rm -f src/main.rs src/lib.rs COPY src ./src # Clean and rebuild with actual source -RUN touch src/main.rs src/lib.rs RUN cargo build --release --bin cosmic-sync-server --features redis-cache --target ${RUST_TARGET} # Runtime stage - use Ubuntu 24.04 for newer glibc compatibility diff --git a/env_home.sh b/env_home.sh new file mode 100755 index 0000000..6bdbd2c --- /dev/null +++ b/env_home.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Switch to home network (192.168.50.100) + +sed -i 's/10.17.89.63/192.168.50.100/g' .env + +mysql -h 127.0.0.1 -P 3306 -u root -precognizer --ssl-mode=DISABLED recognizer_dev -e \ + "UPDATE oauth_applications SET redirect_uri = 'http://192.168.50.100:8080/oauth/callback' WHERE redirect_uri LIKE '%/oauth/callback';" + +echo "Switched to home network (192.168.50.100)" diff --git a/env_office.sh b/env_office.sh new file mode 100755 index 0000000..c1b49ca --- /dev/null +++ b/env_office.sh @@ -0,0 +1,9 @@ +#!/bin/bash +# Switch to office network (10.17.89.63) + +sed -i 's/192.168.50.100/10.17.89.63/g' .env + +mysql -h 127.0.0.1 -P 3306 -u root -precognizer --ssl-mode=DISABLED recognizer_dev -e \ + "UPDATE oauth_applications SET redirect_uri = 'http://10.17.89.63:8080/oauth/callback' WHERE redirect_uri LIKE '%/oauth/callback';" + +echo "Switched to office network (10.17.89.63)" diff --git a/migrations/add_key_status_table.sql b/migrations/add_key_status_table.sql new file mode 100644 index 0000000..25b8ba0 --- /dev/null +++ b/migrations/add_key_status_table.sql @@ -0,0 +1,15 @@ +-- Migration: Add key_status table for encryption key invalidation feature +-- Date: 2025-12-12 + +CREATE TABLE IF NOT EXISTS key_status ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + account_hash VARCHAR(64) NOT NULL, + key_id VARCHAR(32) NOT NULL, + status VARCHAR(20) NOT NULL DEFAULT 'active', + reason VARCHAR(50), + invalidated_at TIMESTAMP NULL, + invalidated_by_device VARCHAR(64), + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + UNIQUE INDEX idx_key_status_account_key (account_hash, key_id), + INDEX idx_key_status_account (account_hash) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; diff --git a/migrations/add_soft_delete.sql b/migrations/add_soft_delete.sql new file mode 100644 index 0000000..d103d3d --- /dev/null +++ b/migrations/add_soft_delete.sql @@ -0,0 +1,64 @@ +-- Soft Delete Migration for Watcher Sync +-- This migration adds is_deleted and deleted_at columns to support soft delete functionality +-- for watchers, watcher_groups, and files tables. + +-- ============================================================================ +-- 1. watcher_groups 테이블 수정 +-- ============================================================================ +ALTER TABLE watcher_groups + ADD COLUMN is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN deleted_at TIMESTAMP NULL; + +-- ============================================================================ +-- 2. watchers 테이블 수정 +-- ============================================================================ +ALTER TABLE watchers + ADD COLUMN is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN deleted_at TIMESTAMP NULL; + +-- ============================================================================ +-- 3. files 테이블 수정 +-- ============================================================================ +ALTER TABLE files + ADD COLUMN is_deleted BOOLEAN NOT NULL DEFAULT FALSE, + ADD COLUMN deleted_at TIMESTAMP NULL; + +-- ============================================================================ +-- 4. 성능 인덱스 추가 (조회 최적화) +-- ============================================================================ + +-- watcher_groups: account별 활성 그룹 조회용 +CREATE INDEX idx_watcher_groups_account_active + ON watcher_groups(account_hash, is_deleted); + +-- watchers: account별 활성 watcher 조회용 +CREATE INDEX idx_watchers_account_active + ON watchers(account_hash, is_deleted); + +-- watchers: group별 활성 watcher 조회용 +CREATE INDEX idx_watchers_group_active + ON watchers(group_id, is_deleted); + +-- files: watcher별 활성 파일 조회용 +CREATE INDEX idx_files_watcher_active + ON files(server_watcher_id, is_deleted); + +-- files: account별 활성 파일 조회용 +CREATE INDEX idx_files_account_active + ON files(account_hash, is_deleted); + +-- ============================================================================ +-- 5. Cleanup job용 인덱스 (삭제 대상 조회 최적화) +-- ============================================================================ + +-- watcher_groups: 삭제 대상 조회용 +CREATE INDEX idx_watcher_groups_deleted_at + ON watcher_groups(deleted_at); + +-- watchers: 삭제 대상 조회용 +CREATE INDEX idx_watchers_deleted_at + ON watchers(deleted_at); + +-- files: 삭제 대상 조회용 +CREATE INDEX idx_files_deleted_at + ON files(deleted_at); diff --git a/proto/sync.proto b/proto/sync.proto index d26c824..f66233b 100755 --- a/proto/sync.proto +++ b/proto/sync.proto @@ -89,6 +89,9 @@ service SyncService { rpc GetUsageStats(GetUsageStatsRequest) returns (GetUsageStatsResponse); rpc GetQuotaInfo(GetQuotaInfoRequest) returns (GetQuotaInfoResponse); rpc CheckQuotaStatus(CheckQuotaStatusRequest) returns (CheckQuotaStatusResponse); + + // Key invalidation + rpc InvalidateKeyId(InvalidateKeyIdRequest) returns (InvalidateKeyIdResponse); } // Service provided by the client daemon @@ -458,6 +461,7 @@ message DownloadFileResponse { uint64 file_size = 9; string key_id = 10; optional uint32 unix_permissions = 11; + int32 error_code = 12; } // Server-streaming download chunk @@ -554,6 +558,7 @@ message RenameFileRequest { optional string operation_type = 11; // e.g., "RENAME" or "MOVE" ConflictInfo.ResolutionStrategy conflict_resolution = 12; // How to resolve conflicts (default: MANUAL) google.protobuf.Timestamp updated_time = 13; // Client's last known update time for timestamp-based conflict resolution + optional bool overwrite = 14; // If true, tombstone conflicting target before rename/move } // NEW: Rename file response @@ -630,6 +635,9 @@ enum ErrorCode { // Database Errors (90-99) DB_ERROR = 90; // Database operation failed DB_SCHEMA_ERROR = 91; // Database schema mismatch + + // Key Errors (100+) + KEY_INVALIDATED = 100; // Encryption key has been invalidated } // Individual Watcher management messages @@ -856,6 +864,7 @@ message SyncConfigurationRequest { bool incremental = 6; // Incremental sync vs full sync bool force_update = 7; // Force update flag int64 client_timestamp = 8; // Client timestamp + int64 last_sync_timestamp = 9; // Last successful sync timestamp (for Item-level LWW) } message SyncConfigurationResponse { @@ -869,16 +878,37 @@ message SyncConfigurationResponse { repeated string conflict_details = 8; // Conflict details ActionTaken action_taken = 9; // Action taken during synchronization bool is_new_account = 10; // Whether this is a new account setup + int32 retention_days = 11; // Soft-deleted items retention period (days) + repeated DeletedItemInfo deleted_items = 12; // Items soft-deleted in this sync + repeated WatcherGroupData recently_added = 13; // Items added by other devices (client should merge) + repeated DeletedItemInfo recently_deleted = 14; // Items deleted by other devices (client should remove) +} + +// Information about a soft-deleted item +message DeletedItemInfo { + enum ItemType { + GROUP = 0; + WATCHER = 1; + } + ItemType type = 1; // Type of deleted item + int32 group_id = 2; // Deleted group_id (or parent group_id for watcher) + int32 watcher_id = 3; // Deleted watcher_id (0 if type is GROUP) + string title = 4; // Name of the deleted item + int32 affected_files = 5; // Number of files affected by deletion } message SyncStats { int32 groups_updated = 1; // Number of updated groups int32 groups_created = 2; // Number of created groups - int32 groups_deleted = 3; // Number of deleted groups + int32 groups_deleted = 3; // Number of deleted groups (soft delete) int32 presets_updated = 4; // Number of updated presets int64 sync_timestamp = 5; // Synchronization timestamp int32 total_operations = 6; // Total number of operations double sync_duration_ms = 7; // Synchronization duration (milliseconds) + int32 watchers_created = 8; // Number of created watchers + int32 watchers_updated = 9; // Number of updated watchers + int32 watchers_deleted = 10; // Number of deleted watchers (soft delete) + int32 files_soft_deleted = 11; // Number of files soft-deleted due to watcher deletion } // File history lookup request @@ -1144,3 +1174,20 @@ message BatchOperationsResponse { int32 successful_operations = 5; int32 failed_operations = 6; } + +// Key invalidation request +message InvalidateKeyIdRequest { + string auth_token = 1; + string account_hash = 2; + string key_id = 3; + string reason = 4; // e.g., "user_rotation", "security_breach" + string device_hash = 5; // Device that initiated invalidation +} + +// Key invalidation response +message InvalidateKeyIdResponse { + bool success = 1; + string return_message = 2; + int32 affected_files_count = 3; // Number of files using this key + int32 error_code = 4; +} diff --git a/src/auth/oauth.rs b/src/auth/oauth.rs index 5be24bc..6adfdff 100644 --- a/src/auth/oauth.rs +++ b/src/auth/oauth.rs @@ -329,11 +329,6 @@ impl OAuthService { pub async fn verify_token(&self, token: &str) -> Result { match self.validate_token(token).await { Ok(account_hash) => { - debug!( - "✅ Token validation successful: account_hash={}", - account_hash - ); - // Check if account exists in local DB match self.storage.get_account_by_hash(&account_hash).await { Ok(Some(_)) => { diff --git a/src/handlers/admin_handler.rs b/src/handlers/admin_handler.rs new file mode 100644 index 0000000..e792d9a --- /dev/null +++ b/src/handlers/admin_handler.rs @@ -0,0 +1,101 @@ +//! Admin API handlers for maintenance operations + +use actix_web::{web, HttpResponse, Result as ActixResult}; +use serde::Serialize; +use std::sync::Arc; +use tracing::{error, info}; + +use crate::server::app_state::AppState; +use crate::storage::CleanupStats; + +/// Default retention period for soft-deleted items (in days) +const DEFAULT_RETENTION_DAYS: i32 = 30; + +/// Response for cleanup operation +#[derive(Debug, Serialize)] +pub struct CleanupResponse { + pub success: bool, + pub message: String, + pub retention_days: i32, + pub stats: CleanupStats, +} + +/// Run cleanup job to permanently delete soft-deleted records older than retention period +/// +/// This endpoint permanently deletes: +/// - Files (including S3 objects) +/// - Watchers +/// - Watcher groups +/// +/// that have been soft-deleted for longer than the retention period (default 30 days). +/// +/// # Request +/// POST /admin/cleanup +/// +/// # Response +/// ```json +/// { +/// "success": true, +/// "message": "Cleanup completed successfully", +/// "retention_days": 30, +/// "stats": { +/// "files_deleted": 10, +/// "watchers_deleted": 5, +/// "groups_deleted": 2, +/// "s3_objects_deleted": 10, +/// "errors": [] +/// } +/// } +/// ``` +pub async fn run_cleanup(app_state: web::Data>) -> ActixResult { + info!("Admin cleanup job started"); + + match app_state + .storage + .cleanup_soft_deleted(DEFAULT_RETENTION_DAYS) + .await + { + Ok(stats) => { + info!( + "Cleanup completed: {} files, {} watchers, {} groups deleted", + stats.files_deleted, stats.watchers_deleted, stats.groups_deleted + ); + + Ok(HttpResponse::Ok().json(CleanupResponse { + success: true, + message: "Cleanup completed successfully".to_string(), + retention_days: DEFAULT_RETENTION_DAYS, + stats, + })) + } + Err(e) => { + error!("Cleanup failed: {}", e); + + Ok(HttpResponse::InternalServerError().json(CleanupResponse { + success: false, + message: format!("Cleanup failed: {}", e), + retention_days: DEFAULT_RETENTION_DAYS, + stats: CleanupStats::default(), + })) + } + } +} + +/// Get cleanup status and statistics (read-only, no actual cleanup) +/// +/// # Request +/// GET /admin/cleanup/status +/// +/// # Response +/// Returns information about pending cleanup items +pub async fn cleanup_status(app_state: web::Data>) -> ActixResult { + // For now, just return the retention configuration + // In the future, could query counts of items pending cleanup + let _ = app_state; // Will be used when we add pending count queries + + Ok(HttpResponse::Ok().json(serde_json::json!({ + "retention_days": DEFAULT_RETENTION_DAYS, + "description": "Items soft-deleted more than 30 days ago will be permanently deleted on cleanup", + "trigger_endpoint": "POST /admin/cleanup" + }))) +} diff --git a/src/handlers/file/batch.rs b/src/handlers/file/batch.rs index 120d0b5..3518a20 100644 --- a/src/handlers/file/batch.rs +++ b/src/handlers/file/batch.rs @@ -11,11 +11,11 @@ pub async fn handle_batch_operations( handler: &FileHandler, req: BatchOperationsRequest, ) -> Result, Status> { - info!("Batch operations request received:"); - info!(" account_hash: {}", req.account_hash); - info!(" device_hash: {}", req.device_hash); - info!(" operations count: {}", req.operations.len()); - info!(" atomic: {}", req.atomic); + info!( + "BatchOps: count={}, atomic={}", + req.operations.len(), + req.atomic + ); // Verify authentication once for all operations let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { @@ -179,11 +179,13 @@ async fn process_upload_operation( } } Err(e) => { - error!("Upload operation {} failed: {}", operation_id, e); + let error_msg = e.message(); + error!("Upload operation {} failed: {}", operation_id, error_msg); + let error_code = extract_error_code_from_message(error_msg); create_error_result( operation_id, - &format!("Upload failed: {}", e), - ErrorCode::StorageFailed, + &format!("Upload failed: {}", error_msg), + error_code, ) } } @@ -211,11 +213,13 @@ async fn process_delete_operation( } } Err(e) => { - error!("Delete operation {} failed: {}", operation_id, e); + let error_msg = e.message(); + error!("Delete operation {} failed: {}", operation_id, error_msg); + let error_code = extract_error_code_from_message(error_msg); create_error_result( operation_id, - &format!("Delete failed: {}", e), - ErrorCode::StorageFailed, + &format!("Delete failed: {}", error_msg), + error_code, ) } } @@ -243,11 +247,13 @@ async fn process_rename_operation( } } Err(e) => { - error!("Rename operation {} failed: {}", operation_id, e); + let error_msg = e.message(); + error!("Rename operation {} failed: {}", operation_id, error_msg); + let error_code = extract_error_code_from_message(error_msg); create_error_result( operation_id, - &format!("Rename failed: {}", e), - ErrorCode::StorageFailed, + &format!("Rename failed: {}", error_msg), + error_code, ) } } @@ -266,3 +272,40 @@ fn create_error_result( result: None, } } + +/// Extract appropriate ErrorCode from error message for client-side retry logic classification +fn extract_error_code_from_message(msg: &str) -> ErrorCode { + let msg_lower = msg.to_lowercase(); + + // Permanent errors - client should not retry + if msg_lower.contains("permission denied") || msg_lower.contains("access denied") { + ErrorCode::AuthFailed + } else if msg_lower.contains("already deleted") { + ErrorCode::FileAlreadyDeleted + } else if msg_lower.contains("path conflict") || msg_lower.contains("already exists") { + ErrorCode::PathConflict + } else if msg_lower.contains("quota") || msg_lower.contains("storage limit") { + ErrorCode::QuotaExceeded + } else if msg_lower.contains("invalid") || msg_lower.contains("validation") { + ErrorCode::InvalidRequest + } + // Conflict errors - client should sync and retry + else if msg_lower.contains("revision") || msg_lower.contains("conflict") { + ErrorCode::RevisionConflict + } + // File errors + else if msg_lower.contains("not found") { + ErrorCode::FileNotFound + } + // Schema/DB errors - permanent + else if msg_lower.contains("schema") + || msg_lower.contains("varbinary") + || msg_lower.contains("varchar") + { + ErrorCode::DbSchemaError + } + // Default: transient error - client can retry with backoff + else { + ErrorCode::StorageFailed + } +} diff --git a/src/handlers/file/delete.rs b/src/handlers/file/delete.rs index 560d09b..f7934d0 100644 --- a/src/handlers/file/delete.rs +++ b/src/handlers/file/delete.rs @@ -21,12 +21,10 @@ pub async fn handle_delete_file( handler: &FileHandler, req: DeleteFileRequest, ) -> Result, Status> { - info!("File deletion request received:"); - info!(" account_hash: {}", req.account_hash); - info!(" file_id: {}", req.file_id); - info!(" file_path: {}", req.file_path); - info!(" filename: {}", req.filename); - info!(" revision: {}", req.revision); + info!( + "DeleteFile: file_id={}, path={}, revision={}", + req.file_id, req.file_path, req.revision + ); let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { Ok(v) if v.valid => v, @@ -103,10 +101,7 @@ pub async fn handle_delete_file( deletion_record_id, new_revision, }) => { - info!( - "File deleted successfully: filename={}, file_id={}, deletion_record_id={}, new_revision={}", - req.filename, file_id, deletion_record_id, new_revision - ); + info!("DeleteFile OK: file_id={}, rev={}", file_id, new_revision); // Publish cross-instance file deleted and version deleted events let event_data = FileDeleteEventData { @@ -204,10 +199,7 @@ pub async fn handle_delete_file( // Auto-retry if decision is AcceptClient if decision == ResolutionDecision::AcceptClient { - info!( - "Auto-resolving delete conflict: accepting client version (file_id={})", - file_id - ); + debug!("Auto-resolve conflict: file_id={}", file_id); // Retry deletion without revision check (force delete) match handler.app_state.file.delete_file(file_id, None).await { @@ -216,8 +208,8 @@ pub async fn handle_delete_file( new_revision, }) => { info!( - "File deleted after auto-resolution: file_id={}, deletion_record_id={}, new_revision={}", - file_id, deletion_record_id, new_revision + "DeleteFile OK (auto-resolved): file_id={}, rev={}", + file_id, new_revision ); // Publish cross-instance file deleted and version deleted events @@ -273,11 +265,17 @@ pub async fn handle_delete_file( })) } Ok(DeleteFileResult::AlreadyDeleted) => { - info!("File already deleted: file_id={}", file_id); - Ok(Response::new(response::file_delete_error_with_code( - "File already deleted", - ErrorCode::FileNotFound, - ))) + // Idempotent success: if the file is already deleted, the client's goal is achieved + debug!("DeleteFile: already deleted, file_id={}", file_id); + Ok(Response::new(DeleteFileResponse { + success: true, + return_message: "File already deleted".to_string(), + delete_record_id: 0, + new_revision: 0, + conflict: None, + error_code: ErrorCode::Success as i32, + current_revision: 0, + })) } Err(e) => { // Determine error category for logging and client classification diff --git a/src/handlers/file/download.rs b/src/handlers/file/download.rs index 1e9ec0e..1d7bdb7 100644 --- a/src/handlers/file/download.rs +++ b/src/handlers/file/download.rs @@ -1,9 +1,10 @@ use tonic::{Response, Status}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use super::super::file_handler::FileHandler; +use super::key::is_key_invalidated; use crate::services::usage_service::{OperationResult, UsageOperation}; -use crate::sync::{DownloadFileRequest, DownloadFileResponse}; +use crate::sync::{DownloadFileRequest, DownloadFileResponse, ErrorCode}; use crate::utils::crypto::parse_account_key; use crate::utils::response; use base64::Engine as _; @@ -42,6 +43,35 @@ pub async fn handle_download_file( } }; + // Check if the encryption key is invalidated + if let Some(ref key_id) = file_info.key_id { + if !key_id.is_empty() { + match is_key_invalidated(handler, &file_info.account_hash, key_id).await { + Ok(true) => { + debug!( + "Download blocked: key_id={} is invalidated for file_id={}", + key_id, file_id + ); + return Ok(Response::new(response::file_download_error_with_code( + "Encryption key has been invalidated. Please re-encrypt the file with a new key.", + ErrorCode::KeyInvalidated, + ))); + } + Ok(false) => { + // Key is valid, continue with download + debug!("Key {} is valid for file_id={}", key_id, file_id); + } + Err(e) => { + // If we can't check key status, log but allow download (fail-open for availability) + warn!( + "Failed to check key status for key_id={}, allowing download: {}", + key_id, e + ); + } + } + } + } + // Get account transport key let account_key: Option<[u8; 32]> = if handler.app_state.config.features.transport_encrypt_metadata { @@ -189,6 +219,7 @@ pub async fn handle_download_file( file_size: file_info.size, key_id: file_info.key_id.clone().unwrap_or_default(), unix_permissions: file_info.unix_permissions, + error_code: ErrorCode::Success as i32, })) } Ok(None) => Ok(Response::new(response::file_download_error( diff --git a/src/handlers/file/find.rs b/src/handlers/file/find.rs index 9233852..5d9224e 100644 --- a/src/handlers/file/find.rs +++ b/src/handlers/file/find.rs @@ -1,5 +1,5 @@ use tonic::{Response, Status}; -use tracing::{debug, error, info}; +use tracing::{debug, error}; use super::super::file_handler::FileHandler; use crate::sync::{FindFileRequest, FindFileResponse}; @@ -8,10 +8,9 @@ pub async fn handle_find_file_by_criteria( handler: &FileHandler, req: FindFileRequest, ) -> Result, Status> { - info!("🔍 [v2025.11.19-FIXED] FindFileByCriteria handler called"); debug!( - "FindFileByCriteria request: account={}, file_path={}, file_name={}", - req.account_hash, req.file_path, req.file_name + "FindFileByCriteria: path={}, name={}", + req.file_path, req.file_name ); match handler.app_state.oauth.verify_token(&req.auth_token).await { diff --git a/src/handlers/file/key.rs b/src/handlers/file/key.rs new file mode 100644 index 0000000..4c4ba51 --- /dev/null +++ b/src/handlers/file/key.rs @@ -0,0 +1,151 @@ +//! Key invalidation handler +//! +//! Handles encryption key invalidation requests for managing key lifecycle. + +use tonic::{Response, Status}; +use tracing::{debug, error, info}; + +use super::super::file_handler::FileHandler; +use crate::storage::mysql::MySqlStorage; +use crate::storage::mysql_key::MySqlKeyExt; +use crate::sync::{ErrorCode, InvalidateKeyIdRequest, InvalidateKeyIdResponse}; + +/// Handle key invalidation request +pub async fn handle_invalidate_key_id( + handler: &FileHandler, + req: InvalidateKeyIdRequest, +) -> Result, Status> { + info!( + "InvalidateKeyId: account_hash={}, key_id={}, reason={}", + req.account_hash, req.key_id, req.reason + ); + + // Verify authentication + let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { + Ok(v) if v.valid => v, + _ => { + return Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: "Authentication failed".to_string(), + affected_files_count: 0, + error_code: ErrorCode::AuthFailed as i32, + })); + } + }; + + // Validate account hash matches token + if verified.account_hash != req.account_hash { + return Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: "Account hash mismatch".to_string(), + affected_files_count: 0, + error_code: ErrorCode::AuthAccountMismatch as i32, + })); + } + + // Validate key_id is not empty + if req.key_id.trim().is_empty() { + return Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: "key_id is required".to_string(), + affected_files_count: 0, + error_code: ErrorCode::InvalidRequest as i32, + })); + } + + // Get MySQL storage for key operations + let mysql_storage = match handler + .app_state + .storage + .as_any() + .downcast_ref::() + { + Some(s) => s, + None => { + error!("Storage is not MySqlStorage"); + return Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: "Internal storage error".to_string(), + affected_files_count: 0, + error_code: ErrorCode::StorageFailed as i32, + })); + } + }; + + // Get affected files count before invalidation + let affected_files_count = match mysql_storage + .get_affected_files_count(&req.account_hash, &req.key_id) + .await + { + Ok(count) => count, + Err(e) => { + error!("Failed to get affected files count: {}", e); + return Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: format!("Failed to count affected files: {}", e), + affected_files_count: 0, + error_code: ErrorCode::DbError as i32, + })); + } + }; + + // Invalidate the key + match mysql_storage + .invalidate_key( + &req.account_hash, + &req.key_id, + &req.reason, + &req.device_hash, + ) + .await + { + Ok(()) => { + info!( + "Key invalidated successfully: account_hash={}, key_id={}, affected_files={}", + req.account_hash, req.key_id, affected_files_count + ); + Ok(Response::new(InvalidateKeyIdResponse { + success: true, + return_message: format!( + "Key invalidated successfully. {} files affected.", + affected_files_count + ), + affected_files_count, + error_code: ErrorCode::Success as i32, + })) + } + Err(e) => { + error!("Failed to invalidate key: {}", e); + Ok(Response::new(InvalidateKeyIdResponse { + success: false, + return_message: format!("Failed to invalidate key: {}", e), + affected_files_count: 0, + error_code: ErrorCode::DbError as i32, + })) + } + } +} + +/// Check if a key is invalidated (helper function for download handler) +pub async fn is_key_invalidated( + handler: &FileHandler, + account_hash: &str, + key_id: &str, +) -> Result { + // Empty key_id means file is not encrypted with a trackable key + if key_id.trim().is_empty() { + return Ok(false); + } + + let mysql_storage = handler + .app_state + .storage + .as_any() + .downcast_ref::() + .ok_or_else(|| "Storage is not MySqlStorage".to_string())?; + + mysql_storage + .is_key_invalidated(account_hash, key_id) + .await + .map_err(|e| format!("Failed to check key status: {}", e)) +} diff --git a/src/handlers/file/mod.rs b/src/handlers/file/mod.rs index bd1aa76..f736276 100644 --- a/src/handlers/file/mod.rs +++ b/src/handlers/file/mod.rs @@ -4,6 +4,7 @@ pub mod download; pub mod exists; pub mod find; pub mod get_info; +pub mod key; pub mod list; pub mod rename; pub mod restore; diff --git a/src/handlers/file/rename.rs b/src/handlers/file/rename.rs index 5f20762..1c3f150 100644 --- a/src/handlers/file/rename.rs +++ b/src/handlers/file/rename.rs @@ -1,5 +1,5 @@ use tonic::{Response, Status}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; use super::super::file_handler::FileHandler; use crate::sync::{ @@ -15,13 +15,10 @@ pub async fn handle_rename_file( handler: &FileHandler, req: RenameFileRequest, ) -> Result, Status> { - info!("File rename request received:"); - info!(" account_hash: {}", req.account_hash); - info!(" file_id: {}", req.file_id); - info!(" old_file_path: {}", req.old_file_path); - info!(" new_file_path: {}", req.new_file_path); - info!(" group_id: {}", req.group_id); - info!(" watcher_id: {}", req.watcher_id); + info!( + "RenameFile: file_id={}, old={}, new={}", + req.file_id, req.old_file_path, req.new_file_path + ); // 1. Verify authentication let verified = match handler.app_state.oauth.verify_token(&req.auth_token).await { @@ -158,17 +155,11 @@ pub async fn handle_rename_file( // Fix P2 #5: Idempotency check // If file at target path is the SAME file_id, consider it a successful retry if existing.file_id == req.file_id { - info!( - "Idempotency check: File {} is already at target path {}", - req.file_id, req.new_file_path + debug!( + "Idempotency: file_id={} already at target path", + req.file_id ); - // If client provided expected_revision, check if we already met it or exceeded it - if req.expected_revision > 0 && existing.revision != req.expected_revision { - warn!("Idempotency warning: File is at target path but revision mismatch (current={}, expected={})", existing.revision, req.expected_revision); - // We can still treat it as success if we assume the client missed the response - } - return Ok(Response::new(RenameFileResponse { success: true, return_message: "File renamed successfully (idempotent)".to_string(), @@ -205,9 +196,13 @@ pub async fn handle_rename_file( server_group_id != file_info.group_id || server_watcher_id != file_info.watcher_id; if is_cross_watcher_move { - info!( - "Cross-watcher MOVE request: file_id={}, from group={}/watcher={} to group={}/watcher={}", - req.file_id, file_info.group_id, file_info.watcher_id, server_group_id, server_watcher_id + debug!( + "Cross-watcher MOVE: file_id={}, {}:{} -> {}:{}", + req.file_id, + file_info.group_id, + file_info.watcher_id, + server_group_id, + server_watcher_id ); } @@ -246,21 +241,10 @@ pub async fn handle_rename_file( .await { Ok(crate::storage::RenameFileResult::Success { new_revision }) => { - if is_cross_watcher_move { - info!( - "File moved successfully: {} -> {} (group={}/watcher={}, new revision: {})", - req.old_file_path, - req.new_file_path, - req.group_id, - req.watcher_id, - new_revision - ); - } else { - info!( - "File renamed successfully: {} -> {} (new revision: {})", - req.old_file_path, req.new_file_path, new_revision - ); - } + info!( + "RenameFile OK: file_id={}, rev={}, move={}", + req.file_id, new_revision, is_cross_watcher_move + ); // 5. Record rename history if let Err(e) = handler @@ -280,18 +264,18 @@ pub async fn handle_rename_file( } // 6. Broadcast rename notification to other devices - // Fix P0 #3: Include updated file info with Server IDs + // Fix P0 #3: Use SERVER IDs for cross-watcher MOVE let mut broadcast_file_info = file_info.clone(); broadcast_file_info.file_path = req.new_file_path.clone(); broadcast_file_info.revision = new_revision; broadcast_file_info.updated_time.seconds = req.timestamp; broadcast_file_info.updated_time.nanos = 0; - if let Some(gid) = new_group_id { - broadcast_file_info.group_id = gid; - } - if let Some(wid) = new_watcher_id { - broadcast_file_info.watcher_id = wid; + // CRITICAL: For cross-watcher moves, use SERVER IDs (not client IDs) + // so other devices can properly locate the file + if is_cross_watcher_move { + broadcast_file_info.group_id = server_group_id; + broadcast_file_info.watcher_id = server_watcher_id; } if let Err(e) = handler @@ -349,10 +333,7 @@ pub async fn handle_rename_file( // Auto-retry if decision is AcceptClient if decision == ResolutionDecision::AcceptClient { - info!( - "Auto-resolving rename conflict: accepting client version (file_id={})", - req.file_id - ); + debug!("Auto-resolve conflict: file_id={}", req.file_id); // Retry rename without revision check (force rename) match handler @@ -370,21 +351,10 @@ pub async fn handle_rename_file( .await { Ok(crate::storage::RenameFileResult::Success { new_revision }) => { - if is_cross_watcher_move { - info!( - "File moved after auto-resolution: {} -> {} (group={}/watcher={}, new_revision={})", - req.old_file_path, - req.new_file_path, - req.group_id, - req.watcher_id, - new_revision - ); - } else { - info!( - "File renamed after auto-resolution: {} -> {}, new_revision={}", - req.old_file_path, req.new_file_path, new_revision - ); - } + info!( + "RenameFile OK (auto-resolved): file_id={}, rev={}", + req.file_id, new_revision + ); // Record rename history if let Err(e) = handler diff --git a/src/handlers/file_handler.rs b/src/handlers/file_handler.rs index 362b2fc..9024df8 100644 --- a/src/handlers/file_handler.rs +++ b/src/handlers/file_handler.rs @@ -4,9 +4,10 @@ use crate::sync::{ AuthUpdateNotification, CheckFileExistsRequest, CheckFileExistsResponse, DeleteFileRequest, DeleteFileResponse, DeviceUpdateNotification, DownloadFileRequest, DownloadFileResponse, EncryptionKeyUpdateNotification, FileUpdateNotification, FindFileRequest, FindFileResponse, - GetFileInfoRequest, GetFileInfoResponse, ListFilesRequest, ListFilesResponse, - RenameFileRequest, RenameFileResponse, UploadFileRequest, UploadFileResponse, - VersionUpdateNotification, WatcherGroupUpdateNotification, WatcherPresetUpdateNotification, + GetFileInfoRequest, GetFileInfoResponse, InvalidateKeyIdRequest, InvalidateKeyIdResponse, + ListFilesRequest, ListFilesResponse, RenameFileRequest, RenameFileResponse, UploadFileRequest, + UploadFileResponse, VersionUpdateNotification, WatcherGroupUpdateNotification, + WatcherPresetUpdateNotification, }; use crate::utils::response; use async_trait::async_trait; @@ -423,6 +424,15 @@ impl FileHandler { super::file::rename::handle_rename_file(self, req).await } + /// Handle key invalidation request + pub async fn invalidate_key_id( + &self, + request: Request, + ) -> Result, Status> { + let req = request.into_inner(); + super::file::key::handle_invalidate_key_id(self, req).await + } + /// Record rename history for debugging and conflict detection pub(crate) async fn record_rename_history( &self, diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index fbd1b0d..c377884 100755 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -29,6 +29,9 @@ pub mod metrics; // Usage tracking handlers pub mod usage_handler; +// Admin handlers (cleanup, maintenance) +pub mod admin_handler; + use crate::sync::HealthCheckRequest; use crate::sync::HealthCheckResponse; use tonic::{Request, Response, Status}; diff --git a/src/handlers/watcher_handler.rs b/src/handlers/watcher_handler.rs index f3d6f3f..b3c8794 100644 --- a/src/handlers/watcher_handler.rs +++ b/src/handlers/watcher_handler.rs @@ -20,6 +20,15 @@ use std::sync::Arc; use tonic::{Request, Response, Status}; use tracing::{debug, error, info, warn}; +/// Result of apply_client_configuration with Item-level LWW info +#[derive(Default)] +struct ApplyConfigResult { + /// Groups/watchers added by other devices that client should merge + recently_added: Vec, + /// Items deleted by other devices that client should remove + recently_deleted: Vec, +} + /// Handler that processes Watcher-related requests pub struct WatcherHandler { pub app_state: Arc, @@ -788,8 +797,10 @@ impl WatcherHandler { let device_hash = req.device_hash.clone(); let client_watcher_groups = req.watcher_groups; let client_presets = req.presets; + let incremental = req.incremental; let force_update = req.force_update; let client_timestamp = req.client_timestamp; + let last_sync_timestamp = req.last_sync_timestamp; let sync_start = std::time::Instant::now(); @@ -834,11 +845,16 @@ impl WatcherHandler { sync_timestamp: chrono::Utc::now().timestamp(), total_operations: 0, sync_duration_ms: 0.0, + watchers_created: 0, + watchers_updated: 0, + watchers_deleted: 0, + files_soft_deleted: 0, }; let mut conflicts_detected = false; let mut conflict_details = Vec::new(); let action_taken: i32; + let mut lww_result = ApplyConfigResult::default(); // Determine action based on timestamp comparison if force_update { @@ -847,32 +863,38 @@ impl WatcherHandler { action_taken = 3; // CONFLICT_RESOLVED // Apply client configuration to server - self.apply_client_configuration( - &account_hash, - &device_hash, - &client_watcher_groups, - &client_presets, - &mut stats, - &mut conflicts_detected, - &mut conflict_details, - ) - .await?; + lww_result = self + .apply_client_configuration( + &account_hash, + &device_hash, + &client_watcher_groups, + &client_presets, + incremental, + last_sync_timestamp, + &mut stats, + &mut conflicts_detected, + &mut conflict_details, + ) + .await?; } else if is_new_account { // New account: store initial configuration info!("🆕 [CONFIG_SYNC] New account - storing initial configuration"); action_taken = 4; // INITIAL_SETUP - // Store client configuration - self.apply_client_configuration( - &account_hash, - &device_hash, - &client_watcher_groups, - &client_presets, - &mut stats, - &mut conflicts_detected, - &mut conflict_details, - ) - .await?; + // Store client configuration (new account uses merge mode to avoid accidental deletes) + lww_result = self + .apply_client_configuration( + &account_hash, + &device_hash, + &client_watcher_groups, + &client_presets, + true, // New account always uses incremental (merge) mode + 0, // No LWW for new accounts + &mut stats, + &mut conflicts_detected, + &mut conflict_details, + ) + .await?; } else { // Existing account: compare timestamps let server_latest_ts = Self::get_server_latest_timestamp(&server_groups); @@ -887,17 +909,20 @@ impl WatcherHandler { info!("⬆️ [CONFIG_SYNC] Client is newer - uploading to server"); action_taken = 1; // CLIENT_TO_SERVER - // Apply client configuration - self.apply_client_configuration( - &account_hash, - &device_hash, - &client_watcher_groups, - &client_presets, - &mut stats, - &mut conflicts_detected, - &mut conflict_details, - ) - .await?; + // Apply client configuration with LWW + lww_result = self + .apply_client_configuration( + &account_hash, + &device_hash, + &client_watcher_groups, + &client_presets, + incremental, + last_sync_timestamp, + &mut stats, + &mut conflicts_detected, + &mut conflict_details, + ) + .await?; } else if server_latest_ts > client_timestamp { // Server is newer → client should download info!("⬇️ [CONFIG_SYNC] Server is newer - client should download"); @@ -960,7 +985,7 @@ impl WatcherHandler { 4 => "Initial configuration stored".to_string(), _ => "Sync completed".to_string(), }, - stats: Some(stats), + stats: Some(stats.clone()), server_watcher_groups, server_presets, server_timestamp: chrono::Utc::now().timestamp(), @@ -968,6 +993,10 @@ impl WatcherHandler { conflict_details, action_taken, is_new_account, + retention_days: 30, // Soft-deleted items are kept for 30 days + deleted_items: vec![], // Items soft-deleted in this sync (populated from stats) + recently_added: lww_result.recently_added, // Items added by other devices (LWW) + recently_deleted: lww_result.recently_deleted, // Items deleted by other devices (LWW) }; info!( @@ -987,16 +1016,102 @@ impl WatcherHandler { } /// Apply client configuration to server (helper method for sync_configuration) + /// When incremental=false (Replace mode), calculates set difference and soft deletes items not in client config + /// Uses Item-level Last-Write-Wins when last_sync_timestamp > 0 async fn apply_client_configuration( &self, account_hash: &str, device_hash: &str, client_watcher_groups: &[WatcherGroupData], client_presets: &[String], + incremental: bool, + last_sync_timestamp: i64, stats: &mut SyncStats, conflicts_detected: &mut bool, conflict_details: &mut Vec, - ) -> Result<(), Status> { + ) -> Result { + use std::collections::HashSet; + + let mut result = ApplyConfigResult::default(); + + // 0. Replace mode: calculate set difference and soft delete groups not in client config + if !incremental { + // Get server's existing groups with updated_at timestamps (for LWW) + let server_groups_with_ts: Vec<(i32, i64)> = match self + .app_state + .storage + .get_group_ids_with_updated_at(account_hash) + .await + { + Ok(groups) => groups, + Err(e) => { + warn!("Failed to get server group IDs with timestamps: {}", e); + Vec::new() + } + }; + + // Get client's group IDs + let client_group_ids: HashSet = + client_watcher_groups.iter().map(|g| g.group_id).collect(); + + // Process groups that exist on server but not in client config + for (group_id, updated_at) in server_groups_with_ts { + if !client_group_ids.contains(&group_id) { + // Item-level LWW: check if this was added by another device after last sync + if last_sync_timestamp > 0 && updated_at > last_sync_timestamp { + // Added by another device after client's last sync → keep and notify client + info!( + "🔄 [LWW] Group {} was added by another device (updated_at={} > last_sync={}), keeping", + group_id, updated_at, last_sync_timestamp + ); + // Fetch full group data to return to client + if let Ok(Some(group_data)) = self + .app_state + .storage + .get_watcher_group_data(account_hash, group_id) + .await + { + result.recently_added.push(group_data); + } + } else { + // Client deleted this item → soft delete + if let Ok(Some(server_group_id)) = self + .app_state + .storage + .get_server_group_id(account_hash, group_id) + .await + { + match self + .app_state + .storage + .soft_delete_watcher_group(account_hash, server_group_id) + .await + { + Ok((watchers_deleted, files_deleted)) => { + info!( + "Soft deleted group {} (server_id={}) with {} watchers and {} files", + group_id, server_group_id, watchers_deleted, files_deleted + ); + stats.groups_deleted += 1; + stats.watchers_deleted += watchers_deleted; + stats.files_soft_deleted += files_deleted; + stats.total_operations += 1; + } + Err(e) => { + warn!("Failed to soft delete group {}: {}", group_id, e); + conflict_details.push(format!( + "Failed to delete group {}: {}", + group_id, e + )); + *conflicts_detected = true; + } + } + } + } + } + } + } + // 1. Synchronize Watcher Groups for group_data in client_watcher_groups { let group_id = group_data.group_id; @@ -1041,7 +1156,43 @@ impl WatcherHandler { stats.total_operations += 1; // Apply watchers delivered within group (including conditions) + // With UPSERT restoration prevention for Item-level LWW for watcher in &group_data.watchers { + // UPSERT restoration prevention: check if watcher was soft-deleted by another device + if last_sync_timestamp > 0 { + if let Ok(Some((watcher_id, deleted_at))) = self + .app_state + .storage + .find_soft_deleted_watcher_by_folder( + account_hash, + existing.id, // server_group_id + &watcher.folder, + ) + .await + { + if deleted_at > last_sync_timestamp { + // Watcher was deleted by another device after client's last sync + // Skip restoration and notify client + info!( + "🔄 [LWW] Skipping restoration of watcher {} (folder={}) - deleted by another device at {} > last_sync={}", + watcher_id, &watcher.folder, deleted_at, last_sync_timestamp + ); + result + .recently_deleted + .push(crate::sync::DeletedItemInfo { + r#type: + crate::sync::deleted_item_info::ItemType::Watcher + as i32, + group_id, + watcher_id: watcher.watcher_id, + title: watcher.folder.clone(), + affected_files: 0, + }); + continue; // Skip this watcher + } + } + } + match self .app_state .create_or_get_watcher(account_hash, group_id, watcher) @@ -1061,6 +1212,74 @@ impl WatcherHandler { } } + // Replace mode: soft delete watchers not in client config (with LWW) + if !incremental { + let server_group_id = existing.id; + + // Get server watchers with updated_at for LWW + let server_watchers_with_ts: Vec<(i32, i64)> = match self + .app_state + .storage + .get_watcher_ids_with_updated_at(account_hash, server_group_id) + .await + { + Ok(watchers) => watchers, + Err(e) => { + warn!( + "Failed to get server watcher IDs with timestamps for group {}: {}", + group_id, e + ); + Vec::new() + } + }; + + let client_watcher_ids: HashSet = + group_data.watchers.iter().map(|w| w.watcher_id).collect(); + + for (watcher_id, watcher_updated_at) in server_watchers_with_ts { + if !client_watcher_ids.contains(&watcher_id) { + // Item-level LWW for watchers + if last_sync_timestamp > 0 + && watcher_updated_at > last_sync_timestamp + { + // Added by another device → keep (already in recently_added group) + info!( + "🔄 [LWW] Watcher {} in group {} was added by another device, keeping", + watcher_id, group_id + ); + } else { + // Client deleted → soft delete + match self + .app_state + .storage + .soft_delete_watcher(account_hash, watcher_id) + .await + { + Ok(files_deleted) => { + info!( + "Soft deleted watcher {} in group {} with {} files", + watcher_id, group_id, files_deleted + ); + stats.watchers_deleted += 1; + stats.files_soft_deleted += files_deleted; + } + Err(e) => { + warn!( + "Failed to soft delete watcher {}: {}", + watcher_id, e + ); + conflict_details.push(format!( + "Failed to delete watcher {}: {}", + watcher_id, e + )); + *conflicts_detected = true; + } + } + } + } + } + } + // Send real-time notification if let Err(e) = self .app_state @@ -1131,6 +1350,64 @@ impl WatcherHandler { } } + // Replace mode: soft delete watchers not in client config + // (handles edge case of restored soft-deleted groups) + if !incremental { + if let Ok(Some(server_group_id)) = self + .app_state + .storage + .get_server_group_id(account_hash, group_id) + .await + { + let server_watcher_ids: HashSet = match self + .app_state + .storage + .get_watcher_ids_by_group(account_hash, server_group_id) + .await + { + Ok(ids) => ids.into_iter().collect(), + Err(e) => { + warn!( + "Failed to get server watcher IDs for new group {}: {}", + group_id, e + ); + HashSet::new() + } + }; + + let client_watcher_ids: HashSet = + group_data.watchers.iter().map(|w| w.watcher_id).collect(); + + for watcher_id in server_watcher_ids.difference(&client_watcher_ids) + { + match self + .app_state + .storage + .soft_delete_watcher(account_hash, *watcher_id) + .await + { + Ok(files_deleted) => { + info!( + "Soft deleted watcher {} in new group {} with {} files", + watcher_id, group_id, files_deleted + ); + } + Err(e) => { + warn!( + "Failed to soft delete watcher {}: {}", + watcher_id, e + ); + conflict_details.push(format!( + "Failed to delete watcher {}: {}", + watcher_id, e + )); + *conflicts_detected = true; + } + } + } + } + } + // Send real-time notification if let Err(e) = self .app_state @@ -1192,7 +1469,7 @@ impl WatcherHandler { } } - Ok(()) + Ok(result) } } diff --git a/src/server/service.rs b/src/server/service.rs index 97a3a81..cde4f31 100644 --- a/src/server/service.rs +++ b/src/server/service.rs @@ -17,13 +17,14 @@ use crate::sync::{ GetFileInfoRequest, GetFileInfoResponse, GetQuotaInfoRequest, GetQuotaInfoResponse, GetUsageStatsRequest, GetUsageStatsResponse, GetWatcherGroupRequest, GetWatcherGroupResponse, GetWatcherGroupsRequest, GetWatcherGroupsResponse, GetWatcherPresetRequest, - GetWatcherPresetResponse, HealthCheckRequest, HealthCheckResponse, ListDevicesRequest, - ListDevicesResponse, ListFilesRequest, ListFilesResponse, LoginRequest, LoginResponse, - OAuthExchangeRequest, OAuthExchangeResponse, RegisterDeviceRequest, RegisterDeviceResponse, - RegisterWatcherGroupRequest, RegisterWatcherGroupResponse, RegisterWatcherPresetRequest, - RegisterWatcherPresetResponse, RenameFileRequest, RenameFileResponse, - RequestEncryptionKeyRequest, RequestEncryptionKeyResponse, RestoreFileRequest, - RestoreFileResponse, RestoreFileVersionRequest, RestoreFileVersionResponse, SubscribeRequest, + GetWatcherPresetResponse, HealthCheckRequest, HealthCheckResponse, InvalidateKeyIdRequest, + InvalidateKeyIdResponse, ListDevicesRequest, ListDevicesResponse, ListFilesRequest, + ListFilesResponse, LoginRequest, LoginResponse, OAuthExchangeRequest, OAuthExchangeResponse, + RegisterDeviceRequest, RegisterDeviceResponse, RegisterWatcherGroupRequest, + RegisterWatcherGroupResponse, RegisterWatcherPresetRequest, RegisterWatcherPresetResponse, + RenameFileRequest, RenameFileResponse, RequestEncryptionKeyRequest, + RequestEncryptionKeyResponse, RestoreFileRequest, RestoreFileResponse, + RestoreFileVersionRequest, RestoreFileVersionResponse, SubscribeRequest, SyncConfigurationRequest, SyncConfigurationResponse, UpdateDeviceInfoRequest, UpdateDeviceInfoResponse, UpdateWatcherGroupRequest, UpdateWatcherGroupResponse, UpdateWatcherPresetRequest, UpdateWatcherPresetResponse, UploadFileChunk, UploadFileRequest, @@ -1675,7 +1676,11 @@ impl SyncService for SyncServiceImpl { quota_reset_date: summary.quota_reset_date.map(|d| { use prost_types::Timestamp; Timestamp { - seconds: d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(), + seconds: d + .and_hms_opt(0, 0, 0) + .expect("Valid time: midnight") + .and_utc() + .timestamp(), nanos: 0, } }), @@ -1752,7 +1757,11 @@ impl SyncService for SyncServiceImpl { period_start: None, period_end: None, quota_reset_date: summary.quota_reset_date.map(|d| Timestamp { - seconds: d.and_hms_opt(0, 0, 0).unwrap().and_utc().timestamp(), + seconds: d + .and_hms_opt(0, 0, 0) + .expect("Valid time: midnight") + .and_utc() + .timestamp(), nanos: 0, }), is_transfer_exceeded: summary.transfer_percentage >= 100.0, @@ -1881,6 +1890,14 @@ impl SyncService for SyncServiceImpl { check_result: None, // TODO: implement QuotaCheckResultProto if needed })) } + + /// Invalidate an encryption key + async fn invalidate_key_id( + &self, + request: Request, + ) -> Result, Status> { + self.file_handler.invalidate_key_id(request).await + } } /// Synchronization client service implementation diff --git a/src/server/startup.rs b/src/server/startup.rs index e0ff506..0a91ee6 100644 --- a/src/server/startup.rs +++ b/src/server/startup.rs @@ -311,6 +311,15 @@ async fn start_http_server(config: &ServerConfig, app_state: Arc) -> R "/api/usage/health", web::get().to(handlers::usage_handler::usage_health_check), ) + // Admin endpoints (maintenance operations) + .route( + "/admin/cleanup", + web::post().to(handlers::admin_handler::run_cleanup), + ) + .route( + "/admin/cleanup/status", + web::get().to(handlers::admin_handler::cleanup_status), + ) }) .workers(config.worker_threads) .keep_alive(Duration::from_secs(75)) diff --git a/src/services/usage_service.rs b/src/services/usage_service.rs index e65010d..d929406 100644 --- a/src/services/usage_service.rs +++ b/src/services/usage_service.rs @@ -275,13 +275,13 @@ impl UsageService { )); // Update warning timestamp - let _ = self + if let Some(mysql_storage) = self .storage .as_any() .downcast_ref::() - .unwrap() - .update_last_warning(account_hash) - .await; + { + let _ = mysql_storage.update_last_warning(account_hash).await; + } } Ok((true, None, warnings)) diff --git a/src/services/version_service.rs b/src/services/version_service.rs index 1f311fd..4fa2027 100644 --- a/src/services/version_service.rs +++ b/src/services/version_service.rs @@ -219,7 +219,7 @@ impl VersionService for VersionServiceImpl { .map_err(|e| AppError::storage(format!("Failed to get file history: {}", e)))?; let total_versions = files.len() as i32; - let has_more = limit.is_some() && files.len() == limit.unwrap() as usize; + let has_more = limit.map_or(false, |l| files.len() == l as usize); // Convert to FileVersionInfo let versions: Vec = files diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 5a5aa67..b3752e3 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -10,8 +10,8 @@ use crate::models::file::FileInfo; use crate::models::watcher::WatcherGroup; use crate::storage::{ - DeleteFileResult, RenameFileResult, Result, Storage, StorageError, StorageMetrics, - StoreFileResult, + CleanupStats, DeleteFileResult, RenameFileResult, Result, Storage, StorageError, + StorageMetrics, StoreFileResult, }; use crate::sync::WatcherGroupData; @@ -1562,4 +1562,82 @@ impl Storage for MemoryStorage { async fn trim_old_revisions(&self, _max_revisions: i32) -> Result { Ok(0) } + + // === Soft Delete Methods (stub for MemoryStorage) === + async fn get_group_ids_for_account(&self, _account_hash: &str) -> Result> { + Ok(Vec::new()) + } + + async fn get_watcher_ids_by_group( + &self, + _account_hash: &str, + _server_group_id: i32, + ) -> Result> { + Ok(Vec::new()) + } + + async fn soft_delete_watcher( + &self, + _account_hash: &str, + _server_watcher_id: i32, + ) -> Result { + Ok(0) + } + + async fn soft_delete_watcher_group( + &self, + _account_hash: &str, + _server_group_id: i32, + ) -> Result<(i32, i32)> { + Ok((0, 0)) + } + + async fn cleanup_soft_deleted(&self, _retention_days: i32) -> Result { + // Memory storage doesn't support soft delete cleanup + Ok(CleanupStats::default()) + } + + // === Item-level Last-Write-Wins (LWW) Methods === + + async fn get_group_ids_with_updated_at(&self, _account_hash: &str) -> Result> { + // Memory storage doesn't track updated_at + Ok(Vec::new()) + } + + async fn get_watcher_ids_with_updated_at( + &self, + _account_hash: &str, + _server_group_id: i32, + ) -> Result> { + // Memory storage doesn't track updated_at + Ok(Vec::new()) + } + + async fn get_watcher_group_data( + &self, + _account_hash: &str, + _group_id: i32, + ) -> Result> { + // Memory storage doesn't support this + Ok(None) + } + + async fn get_watcher_deleted_info( + &self, + _account_hash: &str, + _server_watcher_id: i32, + ) -> Result)>> { + // Memory storage doesn't track deleted info + Ok(None) + } + + async fn find_soft_deleted_watcher_by_folder( + &self, + _account_hash: &str, + _group_id: i32, + _folder: &str, + ) -> Result> { + // Memory storage doesn't track soft-deleted watchers + Ok(None) + } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 6a26a3f..a3bd7e9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -8,6 +8,7 @@ mod mysql_account; mod mysql_auth; mod mysql_device; mod mysql_file; +pub mod mysql_key; pub mod mysql_quota; pub mod mysql_usage; pub mod mysql_watcher; @@ -218,6 +219,21 @@ pub enum RenameFileResult { FileNotFound, } +/// Statistics for cleanup job execution +#[derive(Debug, Clone, Default, serde::Serialize)] +pub struct CleanupStats { + /// Number of files permanently deleted from database + pub files_deleted: i64, + /// Number of watchers permanently deleted + pub watchers_deleted: i64, + /// Number of watcher groups permanently deleted + pub groups_deleted: i64, + /// Number of S3 objects deleted + pub s3_objects_deleted: i64, + /// Errors encountered during cleanup (non-fatal) + pub errors: Vec, +} + /// Performance metrics for storage operations #[derive(Debug, Clone, Default)] pub struct StorageMetrics { @@ -625,6 +641,72 @@ pub trait Storage: Sync + Send { async fn purge_deleted_files_older_than(&self, ttl_secs: i64) -> Result; /// Trim older revisions per (account_hash, file_path) beyond max_revisions; return affected rows async fn trim_old_revisions(&self, max_revisions: i32) -> Result; + + // === Soft Delete Methods === + /// Get all active group IDs for an account (for set difference calculation) + async fn get_group_ids_for_account(&self, account_hash: &str) -> Result>; + + /// Get all active watcher IDs in a group (for set difference calculation) + async fn get_watcher_ids_by_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result>; + + /// Soft delete a watcher and its associated files + /// Returns the number of files soft deleted + async fn soft_delete_watcher(&self, account_hash: &str, server_watcher_id: i32) -> Result; + + /// Soft delete a watcher group and all its watchers and files + /// Returns (watchers_deleted, files_deleted) + async fn soft_delete_watcher_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result<(i32, i32)>; + + /// Cleanup soft-deleted records older than retention_days + /// Permanently deletes from database and S3 storage + /// Order: files -> watchers -> groups (to respect FK constraints) + async fn cleanup_soft_deleted(&self, retention_days: i32) -> Result; + + // === Item-level Last-Write-Wins (LWW) Methods === + + /// Get all active group IDs with their updated_at timestamps (for Item-level LWW) + /// Returns Vec<(group_id, updated_at_unix_timestamp)> + async fn get_group_ids_with_updated_at(&self, account_hash: &str) -> Result>; + + /// Get all active watcher IDs with their updated_at timestamps for a group (for Item-level LWW) + /// Returns Vec<(watcher_id, updated_at_unix_timestamp)> + async fn get_watcher_ids_with_updated_at( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result>; + + /// Get full WatcherGroupData for returning to client (for LWW recently_added) + async fn get_watcher_group_data( + &self, + account_hash: &str, + group_id: i32, + ) -> Result>; + + /// Get watcher info including soft-delete status (for UPSERT restoration prevention) + /// Returns (is_deleted, deleted_at_unix_timestamp) + async fn get_watcher_deleted_info( + &self, + account_hash: &str, + server_watcher_id: i32, + ) -> Result)>>; + + /// Find soft-deleted watcher by folder (for UPSERT restoration prevention) + /// Returns (server_watcher_id, is_deleted, deleted_at_unix_timestamp) if found + async fn find_soft_deleted_watcher_by_folder( + &self, + account_hash: &str, + group_id: i32, + folder: &str, + ) -> Result>; } /// Optimized storage factory with connection pooling diff --git a/src/storage/mysql.rs b/src/storage/mysql.rs index 8f143f5..2734ed6 100644 --- a/src/storage/mysql.rs +++ b/src/storage/mysql.rs @@ -11,15 +11,17 @@ use crate::models::file::FileInfo; use crate::models::file::FileNotice; use crate::models::watcher::WatcherCondition; use crate::storage::{ - DeleteFileResult, RenameFileResult, Result, Storage, StorageError, StorageMetrics, - StoreFileResult, + CleanupStats, DeleteFileResult, RenameFileResult, Result, Storage, StorageError, + StorageMetrics, StoreFileResult, }; +use crate::sync::WatcherGroupData; // Using MySQL modules use crate::storage::mysql_account::*; use crate::storage::mysql_auth::*; use crate::storage::mysql_device::*; use crate::storage::mysql_file::*; +use crate::storage::mysql_watcher::MySqlWatcherExt; use crate::storage::mysql_watcher::*; /// MySQL storage implementation @@ -1269,19 +1271,19 @@ END"#; account_hash: &str, group_id: i32, watcher_id: i32, - data: Vec, + data: &[u8], ) -> String { let cfg = crate::server::app_state::AppState::get_config(); if let Some(kv) = cfg.server_encode_key.as_ref() { if kv.len() == 32 { let key: &[u8; 32] = kv.as_slice().try_into().expect("len checked"); let aad = format!("{}:{}:{}", account_hash, group_id, watcher_id); - if let Ok(pt) = crate::utils::crypto::aead_decrypt(key, &data, aad.as_bytes()) { + if let Ok(pt) = crate::utils::crypto::aead_decrypt(key, data, aad.as_bytes()) { return String::from_utf8_lossy(&pt).to_string(); } } } - String::from_utf8_lossy(&data).to_string() + String::from_utf8_lossy(data).to_string() } pub async fn migrate_encrypt_paths(&self, batch_size: usize) -> Result { @@ -1332,8 +1334,14 @@ END"#; (maybe_plain_path.as_ref(), maybe_plain_name.as_ref()) { // Already encrypted; recompute indexes from decrypted plaintext - let plain_path = - String::from_utf8_lossy(maybe_plain_path.as_ref().unwrap()).to_string(); + let plain_path = match maybe_plain_path.as_ref() { + Some(bytes) => String::from_utf8_lossy(bytes).to_string(), + None => { + return Err(StorageError::Database( + "Missing plain_path during encryption migration".into(), + )) + } + }; let salt = crate::utils::crypto::derive_salt(key, "meta-index", &account_hash); ( file_path_b, @@ -2000,9 +2008,11 @@ impl Storage for MySqlStorage { )) })?; - // Tombstone row uses next revision, new active uses next+1 to avoid UNIQUE(file_id, revision) conflicts + // Both tombstone and new active row share the same revision (current+1) + // This represents a single logical operation (rename/move) with one revision increment + // Note: No UNIQUE(file_id, revision) constraint exists, so this is safe let tombstone_revision = current_revision + 1; - let new_active_revision = tombstone_revision + 1; + let new_active_revision = current_revision + 1; sqlx::query( r#"INSERT INTO files ( @@ -2886,6 +2896,75 @@ impl Storage for MySqlStorage { "delete_encryption_key not implemented".to_string(), )) } + + // === Soft Delete Methods (delegated to MySqlWatcherExt) === + async fn get_group_ids_for_account(&self, account_hash: &str) -> Result> { + MySqlWatcherExt::get_group_ids_for_account(self, account_hash).await + } + + async fn get_watcher_ids_by_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result> { + MySqlWatcherExt::get_watcher_ids_by_group(self, account_hash, server_group_id).await + } + + async fn soft_delete_watcher(&self, account_hash: &str, server_watcher_id: i32) -> Result { + MySqlWatcherExt::soft_delete_watcher(self, account_hash, server_watcher_id).await + } + + async fn soft_delete_watcher_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result<(i32, i32)> { + MySqlWatcherExt::soft_delete_watcher_group(self, account_hash, server_group_id).await + } + + async fn cleanup_soft_deleted(&self, retention_days: i32) -> Result { + MySqlWatcherExt::cleanup_soft_deleted(self, retention_days).await + } + + // === Item-level Last-Write-Wins (LWW) Methods === + + async fn get_group_ids_with_updated_at(&self, account_hash: &str) -> Result> { + MySqlWatcherExt::get_group_ids_with_updated_at(self, account_hash).await + } + + async fn get_watcher_ids_with_updated_at( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result> { + MySqlWatcherExt::get_watcher_ids_with_updated_at(self, account_hash, server_group_id).await + } + + async fn get_watcher_group_data( + &self, + account_hash: &str, + group_id: i32, + ) -> Result> { + MySqlWatcherExt::get_watcher_group_data(self, account_hash, group_id).await + } + + async fn get_watcher_deleted_info( + &self, + account_hash: &str, + server_watcher_id: i32, + ) -> Result)>> { + MySqlWatcherExt::get_watcher_deleted_info(self, account_hash, server_watcher_id).await + } + + async fn find_soft_deleted_watcher_by_folder( + &self, + account_hash: &str, + group_id: i32, + folder: &str, + ) -> Result> { + MySqlWatcherExt::find_soft_deleted_watcher_by_folder(self, account_hash, group_id, folder) + .await + } } // Helper methods for version management diff --git a/src/storage/mysql_file.rs b/src/storage/mysql_file.rs index 17107c4..077b7d1 100644 --- a/src/storage/mysql_file.rs +++ b/src/storage/mysql_file.rs @@ -171,15 +171,8 @@ impl MySqlFileExt for MySqlStorage { })?; debug!( - "📄 Preparing to store new file information: file_id={}, revision={} (key_id: {:?})", - file_info.file_id, new_revision, file_info.key_id - ); - - // Path encryption and index computation omitted as it's handled elsewhere - // Insert new file (using calculated revision) - info!( - "💾 Inserting new file record: file_id={}, revision={}, filename= {}", - file_info.file_id, new_revision, file_info.filename + "store_file: file_id={}, revision={}", + file_info.file_id, new_revision ); // Determine server key; if present and 32 bytes, encrypt path/name and compute indices deterministically @@ -344,13 +337,13 @@ impl MySqlFileExt for MySqlStorage { let file_info = FileInfo { file_id, - filename: self.decrypt_text(&account_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&account_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&account_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&account_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash, @@ -424,13 +417,13 @@ impl MySqlFileExt for MySqlStorage { let file_info = FileInfo { file_id, - filename: self.decrypt_text(&account_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&account_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&account_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&account_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash, @@ -518,13 +511,13 @@ impl MySqlFileExt for MySqlStorage { let file_info = FileInfo { file_id, - filename: self.decrypt_text(&account_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&account_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&account_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&account_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash, @@ -593,13 +586,13 @@ impl MySqlFileExt for MySqlStorage { let file_info = FileInfo { file_id, - filename: self.decrypt_text(&account_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&account_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&account_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&account_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash, @@ -625,8 +618,8 @@ impl MySqlFileExt for MySqlStorage { ) -> Result> { use sqlx::Row; debug!( - "🔍 Searching file by path and filename (by updated_time): account_hash={}, file_path={}, filename={}", - account_hash, file_path, filename + "find_file_by_path_and_name: path={}, name={}", + file_path, filename ); // equality by eq_index @@ -671,25 +664,15 @@ impl MySqlFileExt for MySqlStorage { let file_id: u64 = row.try_get("file_id").unwrap_or(0); let revision: i64 = row.try_get("revision").unwrap_or(0); - let updated_ts_value: Option = row.try_get("updated_ts").ok().flatten(); - debug!( - "📊 find_file_by_path_and_name query returned: file_id={}, revision={}, updated_time={:?}, is_deleted={}", - file_id, revision, updated_ts_value, is_deleted - ); - - // Critical validation: Double-check is_deleted + // Critical validation: Double-check is_deleted (should never fail with WHERE clause) if is_deleted { - error!("❌ CRITICAL BUG: find_file_by_path_and_name returned deleted file!"); error!( - " file_id={}, revision={}, is_deleted={}", - file_id, revision, is_deleted + "CRITICAL: Query returned deleted file - file_id={}, revision={}", + file_id, revision ); - error!(" This should NEVER happen - database inconsistency detected"); return Ok(None); } - debug!("✅ Verified file is not deleted: file_id={}", file_id); - let acc_hash: String = row.try_get("account_hash").unwrap_or_default(); let device_hash: String = row.try_get("device_hash").unwrap_or_default(); let file_path_b: Vec = row.try_get("file_path").unwrap_or_default(); @@ -701,8 +684,8 @@ impl MySqlFileExt for MySqlStorage { let size: u64 = row.try_get("size").unwrap_or(0); let key_id_opt: Option = row.try_get("key_id").ok(); - let file_path = self.decrypt_text(&acc_hash, group_id, watcher_id, file_path_b); - let filename = self.decrypt_text(&acc_hash, group_id, watcher_id, filename_b); + let file_path = self.decrypt_text(&acc_hash, group_id, watcher_id, &file_path_b); + let filename = self.decrypt_text(&acc_hash, group_id, watcher_id, &filename_b); let timestamp = prost_types::Timestamp { seconds: updated_ts.unwrap_or(0), @@ -745,9 +728,9 @@ impl MySqlFileExt for MySqlStorage { file_id: u64, client_revision: Option, ) -> Result { - info!( - "Deleting file: account_hash={}, file_id={}, client_revision={:?}", - account_hash, file_id, client_revision + debug!( + "delete_file: file_id={}, client_rev={:?}", + file_id, client_revision ); let mut tx = self.get_sqlx_pool().begin().await.map_err(|e| { StorageError::Database(format!( @@ -774,19 +757,20 @@ impl MySqlFileExt for MySqlStorage { )) })?; - if row_opt.is_none() { - debug!( - "File to delete does not exist or does not belong to the user: file_id={}, account_hash={}", - file_id, account_hash - ); - return Err(StorageError::NotFound(format!( - "Cannot find file: {}", - file_id - ))); - } - use sqlx::Row; - let row = row_opt.unwrap(); + let row = match row_opt { + Some(r) => r, + None => { + debug!( + "File to delete does not exist or does not belong to the user: file_id={}, account_hash={}", + file_id, account_hash + ); + return Err(StorageError::NotFound(format!( + "Cannot find file: {}", + file_id + ))); + } + }; let current_revision: i64 = row.try_get("revision").unwrap_or(0); // Validate revision if client provided one @@ -821,9 +805,9 @@ impl MySqlFileExt for MySqlStorage { // Decrypt for logging purposes only let file_path_for_log = - self.decrypt_text(account_hash, group_id, watcher_id, file_path_bytes.clone()); + self.decrypt_text(account_hash, group_id, watcher_id, &file_path_bytes); let filename_for_log = - self.decrypt_text(account_hash, group_id, watcher_id, filename_bytes.clone()); + self.decrypt_text(account_hash, group_id, watcher_id, &filename_bytes); debug!( "Processing file deletion: file_id={}, file_path={}, filename={}, current_revision={}, new_revision={}", file_id, file_path_for_log, filename_for_log, current_revision, new_revision @@ -889,10 +873,7 @@ impl MySqlFileExt for MySqlStorage { )) })?; - info!( - "File deletion complete: file_id={}, new_revision={}, deletion_history_file_id={}", - file_id, new_revision, deletion_record_id - ); + debug!("delete_file OK: file_id={}, rev={}", file_id, new_revision); Ok(DeleteFileResult::Success { deletion_record_id, new_revision, @@ -901,10 +882,7 @@ impl MySqlFileExt for MySqlStorage { /// Restore deleted file from deletion record async fn restore_file(&self, account_hash: &str, delete_record_id: u64) -> Result { - info!( - "Restoring file: account_hash={}, delete_record_id={}", - account_hash, delete_record_id - ); + debug!("restore_file: record_id={}", delete_record_id); let mut tx = self .get_sqlx_pool() @@ -959,13 +937,13 @@ impl MySqlFileExt for MySqlStorage { // Decrypt for logging let file_path_for_log = - self.decrypt_text(account_hash, group_id, watcher_id, file_path_bytes.clone()); + self.decrypt_text(account_hash, group_id, watcher_id, &file_path_bytes); let filename_for_log = - self.decrypt_text(account_hash, group_id, watcher_id, filename_bytes.clone()); + self.decrypt_text(account_hash, group_id, watcher_id, &filename_bytes); - info!( - "Restoring file: path={}, filename={}, revision={}", - file_path_for_log, filename_for_log, new_revision + debug!( + "restore_file: path={}, rev={}", + file_path_for_log, new_revision ); let now = Utc::now().timestamp(); @@ -1022,8 +1000,8 @@ impl MySqlFileExt for MySqlStorage { ); // Return FileInfo for the restored file - let file_path_str = self.decrypt_text(account_hash, group_id, watcher_id, file_path_bytes); - let filename_str = self.decrypt_text(account_hash, group_id, watcher_id, filename_bytes); + let file_path_str = self.decrypt_text(account_hash, group_id, watcher_id, &file_path_bytes); + let filename_str = self.decrypt_text(account_hash, group_id, watcher_id, &filename_bytes); Ok(FileInfo { file_id: restored_file_id, @@ -1055,10 +1033,7 @@ impl MySqlFileExt for MySqlStorage { _upload_time_from: Option, ) -> Result> { use sqlx::Row; - info!( - "Listing files: account_hash={}, group_id={}", - account_hash, group_id - ); + debug!("list_files: group_id={}", group_id); let rows = sqlx::query( r#"SELECT @@ -1102,13 +1077,13 @@ impl MySqlFileExt for MySqlStorage { }; files.push(FileInfo { file_id, - filename: self.decrypt_text(&acc_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&acc_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&acc_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&acc_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash: acc_hash, @@ -1136,10 +1111,7 @@ impl MySqlFileExt for MySqlStorage { _upload_time_from: Option, ) -> Result> { use sqlx::Row; - info!( - "Listing files (excluding device): account_hash={}, group_id={}, exclude_device={}", - account_hash, group_id, exclude_device_hash - ); + debug!("list_files_excluding_device: group_id={}", group_id); let rows = sqlx::query( r#"SELECT @@ -1185,13 +1157,13 @@ impl MySqlFileExt for MySqlStorage { }; files.push(FileInfo { file_id, - filename: self.decrypt_text(&acc_hash, group_id, watcher_id, filename_b), + filename: self.decrypt_text(&acc_hash, group_id, watcher_id, &filename_b), file_hash, device_hash, group_id, watcher_id, is_encrypted: false, - file_path: self.decrypt_text(&acc_hash, group_id, watcher_id, file_path_b), + file_path: self.decrypt_text(&acc_hash, group_id, watcher_id, &file_path_b), updated_time: timestamp, revision, account_hash: acc_hash, @@ -1206,19 +1178,15 @@ impl MySqlFileExt for MySqlStorage { /// Store file data async fn store_file_data(&self, file_id: u64, data_bytes: Vec) -> Result<()> { - info!( - "🔄 Starting MySQL file data storage: file_id={}, data_size={} bytes", + debug!( + "store_file_data: file_id={}, size={}", file_id, data_bytes.len() ); - debug!("📡 Preparing database connection(sqlx)..."); - // Current time (seconds) let now = chrono::Utc::now().timestamp(); - debug!("⏰ Timestamp: {}", now); - debug!("🔍 Checking existing file data..."); // Check if existing data exists let exists: Option = sqlx::query_scalar(r#"SELECT file_id FROM file_data WHERE file_id = ?"#) @@ -1232,7 +1200,6 @@ impl MySqlFileExt for MySqlStorage { if exists.is_some() { // Update - info!("🔄 Updating existing file data: file_id={}", file_id); sqlx::query(r#"UPDATE file_data SET data = ?, updated_at = ? WHERE file_id = ?"#) .bind(data_bytes) .bind(now) @@ -1245,7 +1212,6 @@ impl MySqlFileExt for MySqlStorage { })?; } else { // Insert new - info!("💾 Inserting new file data: file_id={}", file_id); sqlx::query(r#"INSERT INTO file_data (file_id, data, created_at, updated_at) VALUES (?, ?, ?, ?)"#) .bind(file_id) .bind(data_bytes) @@ -1253,10 +1219,9 @@ impl MySqlFileExt for MySqlStorage { .bind(now) .execute(self.get_sqlx_pool()) .await - .map_err(|e| { error!("❌ File data insertion failed(sqlx): {}", e); StorageError::Database(e.to_string()) })?; + .map_err(|e| { error!("File data insertion failed: {}", e); StorageError::Database(e.to_string()) })?; } - info!("🎉 File data storage complete: file_id={}", file_id); Ok(()) } @@ -1326,53 +1291,23 @@ impl MySqlFileExt for MySqlStorage { filename: &str, ) -> Result> { use sqlx::Row; - info!("🔍 [v2025.11.19-FIXED] find_file_by_criteria called"); - info!("🔍 [QUERY] Using: WHERE is_deleted = FALSE"); - info!("🔍 [QUERY] Sorting: ORDER BY revision DESC, id DESC"); - debug!("🔍 find_file_by_criteria called:"); - debug!(" account_hash: {}", account_hash); - debug!(" group_id: {}", group_id); - debug!(" watcher_id: {}", watcher_id); - debug!(" file_path: {}", file_path); - debug!(" filename: {}", filename); - - // Path analysis: check if filename is already included - let (search_path, search_filename) = - if file_path.ends_with(&format!("/{}", filename)) || file_path.ends_with(filename) { - // Case where filename is already included in file path - debug!("Filename already included in file path: {}", file_path); + debug!( + "find_file_by_criteria: group_id={}, watcher_id={}, path={}, name={}", + group_id, watcher_id, file_path, filename + ); - // Extract filename: content after last / or entire path + // Path analysis: check if filename is already included (for legacy compatibility) + let (_search_path, _search_filename) = + if file_path.ends_with(&format!("/{}", filename)) || file_path.ends_with(filename) { let last_slash_pos = file_path.rfind('/'); - match last_slash_pos { - Some(pos) => { - // Path is before the last /, filename is after it - let path = &file_path[0..pos]; - let fname = &file_path[pos + 1..]; - debug!("Extracted path: {}, filename: {}", path, fname); - (path.to_string(), fname.to_string()) - } - None => { - // If no /, the whole thing is the filename - debug!("No path, only filename: {}", file_path); - ("".to_string(), file_path.to_string()) - } + Some(pos) => (&file_path[0..pos], &file_path[pos + 1..]), + None => ("", file_path), } } else { - // Case where path and filename are separated - debug!( - "Path and filename separated: path={}, filename={}", - file_path, filename - ); - (file_path.to_string(), filename.to_string()) + (file_path, filename) }; - debug!( - "🔍 Final search criteria (by updated_time): path='{}', filename='{}'", - search_path, search_filename - ); - // Build deterministic eq_index for encrypted storage lookup let cfg = crate::server::app_state::AppState::get_config(); let eq_index = if let Some(kv) = cfg.server_encode_key.as_ref() { @@ -1414,44 +1349,21 @@ impl MySqlFileExt for MySqlStorage { StorageError::Database(format!("File search query execution failed: {}", e)) })?; - info!("🔍 [QUERY EXECUTED] is_deleted=FALSE filter applied"); - info!( - "🔍 [QUERY RESULT] Row count: {}", - if row.is_some() { "1" } else { "0" } - ); - if let Some(row) = row { // Extract is_deleted first for validation let is_deleted: bool = row.try_get("is_deleted").unwrap_or(false); let file_id: u64 = row.try_get("file_id").unwrap_or(0); let revision: i64 = row.try_get("revision").unwrap_or(0); - let updated_ts_val: Option = row.try_get("updated_ts").ok().flatten(); - debug!("✅ File found!"); - debug!( - "📊 Query returned: file_id={}, revision={}, updated_time={:?}, is_deleted={}", - file_id, revision, updated_ts_val, is_deleted - ); - - // Critical validation: Double-check is_deleted + // Critical validation: Double-check is_deleted (should never fail with WHERE clause) if is_deleted { - error!("❌ CRITICAL BUG: Query returned deleted file!"); - error!("❌ [v2025.11.19] This should NEVER happen with new query!"); error!( - " file_id={}, revision={}, is_deleted={}", - file_id, revision, is_deleted + "CRITICAL: Query returned deleted file - file_id={}, revision={}", + file_id, revision ); - error!(" This should NEVER happen - database inconsistency detected"); - error!(" Query had is_deleted = FALSE filter but returned deleted file"); return Ok(None); } - info!( - "✅ [VALIDATION PASSED] file_id={}, revision={}, is_deleted=false", - file_id, revision - ); - debug!("✅ Verified file is not deleted: file_id={}", file_id); - // Extract remaining fields from Row object let acc_hash: String = row.try_get("account_hash").unwrap_or_default(); let device_hash: String = row.try_get("device_hash").unwrap_or_default(); @@ -1464,11 +1376,6 @@ impl MySqlFileExt for MySqlStorage { let size: u64 = row.try_get("size").unwrap_or(0); let key_id_opt: Option = row.try_get("key_id").ok(); - info!( - "✅ find_file_by_criteria result: file_id={}, filename={}, watcher_id={}, revision={}", - file_id, filename, watcher_id, revision - ); - // Convert datetime to Unix timestamp let timestamp = prost_types::Timestamp { seconds: updated_ts.unwrap_or(0), @@ -1494,25 +1401,23 @@ impl MySqlFileExt for MySqlStorage { operation_type: row.try_get("operation_type").ok(), }; - info!( - "✅ find_file_by_criteria complete: file_id={}, revision={}", + debug!( + "find_file_by_criteria: found file_id={}, revision={}", file_id, revision ); Ok(Some(file_info)) } else { - warn!("❌ File search failed - cannot find file matching criteria:"); - warn!(" account_hash: {}", account_hash); - warn!(" file_path: {}", file_path); - warn!(" filename: {}", filename); - warn!(" search_path: {}", search_path); - warn!(" search_filename: {}", search_filename); + debug!( + "find_file_by_criteria: no file found for path={}, name={}", + file_path, filename + ); Ok(None) } } /// Check file existence and deletion status by file ID async fn check_file_exists(&self, file_id: u64) -> Result<(bool, bool)> { - info!("🔍 check_file_exists called: file_id={}", file_id); + debug!("check_file_exists: file_id={}", file_id); let is_deleted_opt: Option = sqlx::query_scalar( r#"SELECT is_deleted FROM files diff --git a/src/storage/mysql_key.rs b/src/storage/mysql_key.rs new file mode 100644 index 0000000..7589c93 --- /dev/null +++ b/src/storage/mysql_key.rs @@ -0,0 +1,122 @@ +//! Key status management for encryption key invalidation +//! +//! This module provides storage operations for managing encryption key lifecycle, +//! including invalidation and status checks. + +use tracing::{debug, info}; + +use crate::storage::mysql::MySqlStorage; +use crate::storage::{Result, StorageError}; + +/// MySQL key status extension trait +pub trait MySqlKeyExt { + /// Invalidate an encryption key + /// + /// Marks the key as invalidated and records the reason and device that initiated it. + /// Uses INSERT ... ON DUPLICATE KEY UPDATE for idempotent behavior. + async fn invalidate_key( + &self, + account_hash: &str, + key_id: &str, + reason: &str, + device_hash: &str, + ) -> Result<()>; + + /// Check if an encryption key is invalidated + /// + /// Returns true if the key exists in key_status with status='invalidated' + async fn is_key_invalidated(&self, account_hash: &str, key_id: &str) -> Result; + + /// Get the count of files using a specific key_id + async fn get_affected_files_count(&self, account_hash: &str, key_id: &str) -> Result; +} + +impl MySqlKeyExt for MySqlStorage { + async fn invalidate_key( + &self, + account_hash: &str, + key_id: &str, + reason: &str, + device_hash: &str, + ) -> Result<()> { + debug!( + "Invalidating key: account_hash={}, key_id={}, reason={}", + account_hash, key_id, reason + ); + + // Use INSERT ... ON DUPLICATE KEY UPDATE for idempotent behavior + sqlx::query( + r#"INSERT INTO key_status (account_hash, key_id, status, reason, invalidated_at, invalidated_by_device) + VALUES (?, ?, 'invalidated', ?, NOW(), ?) + ON DUPLICATE KEY UPDATE + status = 'invalidated', + reason = VALUES(reason), + invalidated_at = NOW(), + invalidated_by_device = VALUES(invalidated_by_device)"#, + ) + .bind(account_hash) + .bind(key_id) + .bind(reason) + .bind(device_hash) + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| StorageError::Database(format!("Failed to invalidate key: {}", e)))?; + + info!( + "Key invalidated successfully: account_hash={}, key_id={}", + account_hash, key_id + ); + + Ok(()) + } + + async fn is_key_invalidated(&self, account_hash: &str, key_id: &str) -> Result { + debug!( + "Checking key status: account_hash={}, key_id={}", + account_hash, key_id + ); + + let result: Option<(String,)> = sqlx::query_as( + r#"SELECT status FROM key_status + WHERE account_hash = ? AND key_id = ? AND status = 'invalidated'"#, + ) + .bind(account_hash) + .bind(key_id) + .fetch_optional(self.get_sqlx_pool()) + .await + .map_err(|e| StorageError::Database(format!("Failed to check key status: {}", e)))?; + + let is_invalidated = result.is_some(); + debug!( + "Key status check result: account_hash={}, key_id={}, invalidated={}", + account_hash, key_id, is_invalidated + ); + + Ok(is_invalidated) + } + + async fn get_affected_files_count(&self, account_hash: &str, key_id: &str) -> Result { + debug!( + "Counting affected files: account_hash={}, key_id={}", + account_hash, key_id + ); + + let count: (i64,) = sqlx::query_as( + r#"SELECT COUNT(*) FROM files + WHERE account_hash = ? AND key_id = ? AND is_deleted = FALSE"#, + ) + .bind(account_hash) + .bind(key_id) + .fetch_one(self.get_sqlx_pool()) + .await + .map_err(|e| StorageError::Database(format!("Failed to count affected files: {}", e)))?; + + let count = count.0 as i32; + debug!( + "Affected files count: account_hash={}, key_id={}, count={}", + account_hash, key_id, count + ); + + Ok(count) + } +} diff --git a/src/storage/mysql_quota.rs b/src/storage/mysql_quota.rs index 518e2d9..a64c844 100644 --- a/src/storage/mysql_quota.rs +++ b/src/storage/mysql_quota.rs @@ -225,9 +225,11 @@ impl MySqlQuotaExt for super::mysql::MySqlStorage { // Calculate next reset date (first day of next month) let now = Utc::now(); let next_month = if now.month() == 12 { - chrono::NaiveDate::from_ymd_opt(now.year() + 1, 1, 1).unwrap() + chrono::NaiveDate::from_ymd_opt(now.year() + 1, 1, 1) + .expect("Valid date: January 1st of next year") } else { - chrono::NaiveDate::from_ymd_opt(now.year(), now.month() + 1, 1).unwrap() + chrono::NaiveDate::from_ymd_opt(now.year(), now.month() + 1, 1) + .expect("Valid date: 1st of next month") }; sqlx::query( @@ -755,9 +757,11 @@ impl MySqlQuotaExt for super::mysql::MySqlStorage { // Update quota reset date let now = Utc::now(); let next_month = if now.month() == 12 { - chrono::NaiveDate::from_ymd_opt(now.year() + 1, 1, 1).unwrap() + chrono::NaiveDate::from_ymd_opt(now.year() + 1, 1, 1) + .expect("Valid date: January 1st of next year") } else { - chrono::NaiveDate::from_ymd_opt(now.year(), now.month() + 1, 1).unwrap() + chrono::NaiveDate::from_ymd_opt(now.year(), now.month() + 1, 1) + .expect("Valid date: 1st of next month") }; sqlx::query( diff --git a/src/storage/mysql_watcher.rs b/src/storage/mysql_watcher.rs index dfcd2e9..ea37a14 100644 --- a/src/storage/mysql_watcher.rs +++ b/src/storage/mysql_watcher.rs @@ -5,7 +5,7 @@ use tracing::{debug, error, info, warn}; use crate::models::watcher::{ConditionType, WatcherCondition, WatcherGroup}; use crate::storage::mysql::MySqlStorage; -use crate::storage::{Result, StorageError}; +use crate::storage::{CleanupStats, Result, StorageError}; use crate::sync::{WatcherData, WatcherGroupData}; use crate::utils::helpers; use crate::utils::time; @@ -137,6 +137,75 @@ pub trait MySqlWatcherExt { client_group_id: i32, client_watcher_id: i32, ) -> Result>; + + // === Soft Delete Methods === + + /// Get all group IDs for an account (for set difference calculation) + async fn get_group_ids_for_account(&self, account_hash: &str) -> Result>; + + /// Get all watcher IDs in a group (for set difference calculation) + async fn get_watcher_ids_by_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result>; + + /// Get group IDs with updated_at timestamp (for Item-level Last-Write-Wins) + async fn get_group_ids_with_updated_at(&self, account_hash: &str) -> Result>; + + /// Get watcher IDs with updated_at timestamp (for Item-level Last-Write-Wins) + async fn get_watcher_ids_with_updated_at( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result>; + + /// Soft delete a watcher (also soft deletes associated files) + async fn soft_delete_watcher(&self, account_hash: &str, server_watcher_id: i32) -> Result; + + /// Soft delete a watcher group (also soft deletes all watchers and files in the group) + async fn soft_delete_watcher_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result<(i32, i32)>; + + /// Get watcher with is_deleted status (for checking if recently deleted by another device) + async fn get_watcher_with_deleted_status( + &self, + account_hash: &str, + group_id: i32, + watcher_id: i32, + ) -> Result)>>; + + /// Get watcher deleted info by server_watcher_id only (for UPSERT restoration prevention) + /// Returns (is_deleted, deleted_at_unix_timestamp) + async fn get_watcher_deleted_info( + &self, + account_hash: &str, + server_watcher_id: i32, + ) -> Result)>>; + + /// Find soft-deleted watcher by folder (for UPSERT restoration prevention) + /// Returns (server_watcher_id, deleted_at_unix_timestamp) if found and is_deleted=TRUE + async fn find_soft_deleted_watcher_by_folder( + &self, + account_hash: &str, + group_id: i32, + folder: &str, + ) -> Result>; + + /// Get full WatcherGroupData for a group (for LWW recently_added) + async fn get_watcher_group_data( + &self, + account_hash: &str, + group_id: i32, + ) -> Result>; + + /// Cleanup soft-deleted records older than retention_days + /// Permanently deletes from database and S3 storage + /// Order: files -> watchers -> groups (to respect FK constraints) + async fn cleanup_soft_deleted(&self, retention_days: i32) -> Result; } impl MySqlWatcherExt for MySqlStorage { @@ -144,7 +213,7 @@ impl MySqlWatcherExt for MySqlStorage { async fn get_watcher(&self, watcher_id: i32) -> Result { use sqlx::Row; let row_opt = sqlx::query( - r#"SELECT id, watcher_id, account_hash, folder, is_recursive FROM watchers WHERE id = ?"#, + r#"SELECT id, watcher_id, account_hash, folder, is_recursive FROM watchers WHERE id = ? AND is_deleted = FALSE"#, ) .bind(watcher_id) .fetch_optional(self.get_sqlx_pool()) @@ -248,13 +317,15 @@ impl MySqlWatcherExt for MySqlStorage { // Instead of deleting entire Account, UPSERT the group to maintain stable server_group_id(id) let _ = sqlx::query( r#"INSERT INTO watcher_groups ( - group_id, account_hash, device_hash, title, + group_id, account_hash, device_hash, title, created_at, updated_at, is_active ) VALUES (?, ?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?) - ON DUPLICATE KEY UPDATE + ON DUPLICATE KEY UPDATE title = VALUES(title), updated_at = VALUES(updated_at), - is_active = VALUES(is_active)"#, + is_active = VALUES(is_active), + is_deleted = FALSE, + deleted_at = NULL"#, ) .bind(watcher_group.group_id) .bind(account_hash) @@ -268,13 +339,15 @@ impl MySqlWatcherExt for MySqlStorage { let res = sqlx::query( r#"INSERT INTO watcher_groups ( - group_id, account_hash, device_hash, title, + group_id, account_hash, device_hash, title, created_at, updated_at, is_active ) VALUES (?, ?, ?, ?, FROM_UNIXTIME(?), FROM_UNIXTIME(?), ?) - ON DUPLICATE KEY UPDATE + ON DUPLICATE KEY UPDATE title = VALUES(title), updated_at = VALUES(updated_at), - is_active = VALUES(is_active)"#, + is_active = VALUES(is_active), + is_deleted = FALSE, + deleted_at = NULL"#, ) .bind(watcher_group.group_id) .bind(account_hash) @@ -307,7 +380,7 @@ impl MySqlWatcherExt for MySqlStorage { UNIX_TIMESTAMP(updated_at) AS updated_ts, is_active FROM watcher_groups - WHERE account_hash = ? + WHERE account_hash = ? AND is_deleted = FALSE ORDER BY id"#, ) .bind(account_hash) @@ -345,7 +418,7 @@ impl MySqlWatcherExt for MySqlStorage { .unwrap_or_else(Utc::now); let watcher_rows = - sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ?"#) + sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#) .bind(id) .bind(account_hash) .fetch_all(self.get_sqlx_pool()) @@ -398,7 +471,7 @@ impl MySqlWatcherExt for MySqlStorage { UNIX_TIMESTAMP(updated_at) AS updated_ts, is_active FROM watcher_groups - WHERE account_hash = ? AND group_id = ?"#, + WHERE account_hash = ? AND group_id = ? AND is_deleted = FALSE"#, ) .bind(account_hash) .bind(group_id) @@ -435,7 +508,7 @@ impl MySqlWatcherExt for MySqlStorage { .unwrap_or_else(Utc::now); let watcher_rows = - sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ?"#) + sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#) .bind(id) .bind(account_hash) .fetch_all(self.get_sqlx_pool()) @@ -600,7 +673,7 @@ impl MySqlWatcherExt for MySqlStorage { use sqlx::Row; let row_opt = sqlx::query( r#"SELECT id, group_id, title, UNIX_TIMESTAMP(updated_at) AS updated_at_ts - FROM watcher_groups WHERE account_hash = ? AND group_id = ?"#, + FROM watcher_groups WHERE account_hash = ? AND group_id = ? AND is_deleted = FALSE"#, ) .bind(account_hash) .bind(group_id) @@ -627,7 +700,7 @@ impl MySqlWatcherExt for MySqlStorage { // Fetch watcher ids with sqlx let rows = - sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ?"#) + sqlx::query(r#"SELECT id FROM watchers WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#) .bind(id) .bind(account_hash) .fetch_all(self.get_sqlx_pool()) @@ -794,7 +867,7 @@ impl MySqlWatcherExt for MySqlStorage { ); let row_opt = sqlx::query( - r#"SELECT id FROM watchers WHERE account_hash = ? AND group_id = ? AND folder = ?"#, + r#"SELECT id FROM watchers WHERE account_hash = ? AND group_id = ? AND folder = ? AND is_deleted = FALSE"#, ) .bind(account_hash) .bind(group_id) @@ -993,7 +1066,7 @@ impl MySqlWatcherExt for MySqlStorage { let result = sqlx::query( r#"INSERT INTO watchers ( watcher_id, account_hash, group_id, local_group_id, folder, title, - is_recursive, created_at, updated_at, + is_recursive, created_at, updated_at, is_active, extra_json ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE @@ -1003,7 +1076,9 @@ impl MySqlWatcherExt for MySqlStorage { updated_at = VALUES(updated_at), is_active = VALUES(is_active), extra_json = VALUES(extra_json), - group_id = VALUES(group_id)"#, + group_id = VALUES(group_id), + is_deleted = FALSE, + deleted_at = NULL"#, ) .bind(watcher_data.watcher_id) .bind(account_hash) @@ -1196,8 +1271,8 @@ impl MySqlWatcherExt for MySqlStorage { use sqlx::Row; let row_opt = sqlx::query( r#"SELECT id, watcher_id, folder, is_recursive - FROM watchers - WHERE account_hash = ? AND local_group_id = ? AND watcher_id = ?"#, + FROM watchers + WHERE account_hash = ? AND local_group_id = ? AND watcher_id = ? AND is_deleted = FALSE"#, ) .bind(account_hash) .bind(group_id) @@ -1643,4 +1718,609 @@ impl MySqlWatcherExt for MySqlStorage { Ok(None) } } + + // === Soft Delete Methods Implementation === + + /// Get all group IDs for an account (for set difference calculation) + async fn get_group_ids_for_account(&self, account_hash: &str) -> Result> { + debug!("Getting all group IDs for account_hash={}", account_hash); + + let ids: Vec = sqlx::query_scalar( + r#"SELECT group_id FROM watcher_groups + WHERE account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(account_hash) + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to get group IDs for account: {}", e); + StorageError::Database(format!("Failed to get group IDs: {}", e)) + })?; + + debug!("Found {} groups for account", ids.len()); + Ok(ids) + } + + /// Get all watcher IDs in a group (for set difference calculation) + async fn get_watcher_ids_by_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result> { + debug!( + "Getting watcher IDs for group_id={}, account_hash={}", + server_group_id, account_hash + ); + + let ids: Vec = sqlx::query_scalar( + r#"SELECT watcher_id FROM watchers + WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(server_group_id) + .bind(account_hash) + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to get watcher IDs by group: {}", e); + StorageError::Database(format!("Failed to get watcher IDs: {}", e)) + })?; + + debug!("Found {} watchers in group {}", ids.len(), server_group_id); + Ok(ids) + } + + /// Get group IDs with updated_at timestamp (for Item-level Last-Write-Wins) + async fn get_group_ids_with_updated_at(&self, account_hash: &str) -> Result> { + debug!( + "Getting group IDs with updated_at for account_hash={}", + account_hash + ); + + use sqlx::Row; + let rows = sqlx::query( + r#"SELECT group_id, UNIX_TIMESTAMP(updated_at) as updated_ts + FROM watcher_groups + WHERE account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(account_hash) + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to get group IDs with updated_at: {}", e); + StorageError::Database(format!("Failed to get group IDs with updated_at: {}", e)) + })?; + + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + let group_id: i32 = row + .try_get("group_id") + .map_err(|e| StorageError::Database(format!("Row get group_id: {}", e)))?; + let updated_ts: i64 = row.try_get("updated_ts").unwrap_or(0); + result.push((group_id, updated_ts)); + } + + debug!("Found {} groups with updated_at", result.len()); + Ok(result) + } + + /// Get watcher IDs with updated_at timestamp (for Item-level Last-Write-Wins) + async fn get_watcher_ids_with_updated_at( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result> { + debug!( + "Getting watcher IDs with updated_at for group_id={}, account_hash={}", + server_group_id, account_hash + ); + + use sqlx::Row; + let rows = sqlx::query( + r#"SELECT watcher_id, UNIX_TIMESTAMP(updated_at) as updated_ts + FROM watchers + WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(server_group_id) + .bind(account_hash) + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to get watcher IDs with updated_at: {}", e); + StorageError::Database(format!("Failed to get watcher IDs with updated_at: {}", e)) + })?; + + let mut result = Vec::with_capacity(rows.len()); + for row in rows { + let watcher_id: i32 = row + .try_get("watcher_id") + .map_err(|e| StorageError::Database(format!("Row get watcher_id: {}", e)))?; + let updated_ts: i64 = row.try_get("updated_ts").unwrap_or(0); + result.push((watcher_id, updated_ts)); + } + + debug!("Found {} watchers with updated_at", result.len()); + Ok(result) + } + + /// Soft delete a watcher (also soft deletes associated files) + /// Returns the number of files soft deleted + async fn soft_delete_watcher(&self, account_hash: &str, server_watcher_id: i32) -> Result { + debug!( + "Soft deleting watcher: id={}, account_hash={}", + server_watcher_id, account_hash + ); + + let mut tx = self.get_sqlx_pool().begin().await.map_err(|e| { + error!("Failed to start transaction: {}", e); + StorageError::Database(format!("Failed to start transaction: {}", e)) + })?; + + // 1. Soft delete all files associated with this watcher + let files_result = sqlx::query( + r#"UPDATE files + SET is_deleted = TRUE, deleted_at = NOW() + WHERE server_watcher_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(server_watcher_id) + .bind(account_hash) + .execute(&mut *tx) + .await + .map_err(|e| { + error!("Failed to soft delete files: {}", e); + StorageError::Database(format!("Failed to soft delete files: {}", e)) + })?; + + let files_deleted = files_result.rows_affected() as i32; + debug!( + "Soft deleted {} files for watcher {}", + files_deleted, server_watcher_id + ); + + // 2. Soft delete the watcher + sqlx::query( + r#"UPDATE watchers + SET is_deleted = TRUE, deleted_at = NOW() + WHERE id = ? AND account_hash = ?"#, + ) + .bind(server_watcher_id) + .bind(account_hash) + .execute(&mut *tx) + .await + .map_err(|e| { + error!("Failed to soft delete watcher: {}", e); + StorageError::Database(format!("Failed to soft delete watcher: {}", e)) + })?; + + tx.commit().await.map_err(|e| { + error!("Failed to commit transaction: {}", e); + StorageError::Database(format!("Failed to commit transaction: {}", e)) + })?; + + info!( + "Soft deleted watcher {} with {} files", + server_watcher_id, files_deleted + ); + Ok(files_deleted) + } + + /// Soft delete a watcher group (also soft deletes all watchers and files in the group) + /// Returns (watchers_deleted, files_deleted) + async fn soft_delete_watcher_group( + &self, + account_hash: &str, + server_group_id: i32, + ) -> Result<(i32, i32)> { + debug!( + "Soft deleting watcher group: id={}, account_hash={}", + server_group_id, account_hash + ); + + let mut tx = self.get_sqlx_pool().begin().await.map_err(|e| { + error!("Failed to start transaction: {}", e); + StorageError::Database(format!("Failed to start transaction: {}", e)) + })?; + + // 1. Get all watcher IDs in this group + let watcher_ids: Vec = sqlx::query_scalar( + r#"SELECT id FROM watchers + WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(server_group_id) + .bind(account_hash) + .fetch_all(&mut *tx) + .await + .map_err(|e| { + error!("Failed to get watcher IDs: {}", e); + StorageError::Database(format!("Failed to get watcher IDs: {}", e)) + })?; + + let watchers_count = watcher_ids.len() as i32; + + // 2. Soft delete all files for watchers in this group + let mut total_files_deleted = 0i32; + for watcher_id in &watcher_ids { + let files_result = sqlx::query( + r#"UPDATE files + SET is_deleted = TRUE, deleted_at = NOW() + WHERE server_watcher_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(watcher_id) + .bind(account_hash) + .execute(&mut *tx) + .await + .map_err(|e| { + error!( + "Failed to soft delete files for watcher {}: {}", + watcher_id, e + ); + StorageError::Database(format!("Failed to soft delete files: {}", e)) + })?; + total_files_deleted += files_result.rows_affected() as i32; + } + + // 3. Soft delete all watchers in this group + sqlx::query( + r#"UPDATE watchers + SET is_deleted = TRUE, deleted_at = NOW() + WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(server_group_id) + .bind(account_hash) + .execute(&mut *tx) + .await + .map_err(|e| { + error!("Failed to soft delete watchers: {}", e); + StorageError::Database(format!("Failed to soft delete watchers: {}", e)) + })?; + + // 4. Soft delete the group itself + sqlx::query( + r#"UPDATE watcher_groups + SET is_deleted = TRUE, deleted_at = NOW() + WHERE id = ? AND account_hash = ?"#, + ) + .bind(server_group_id) + .bind(account_hash) + .execute(&mut *tx) + .await + .map_err(|e| { + error!("Failed to soft delete watcher group: {}", e); + StorageError::Database(format!("Failed to soft delete watcher group: {}", e)) + })?; + + tx.commit().await.map_err(|e| { + error!("Failed to commit transaction: {}", e); + StorageError::Database(format!("Failed to commit transaction: {}", e)) + })?; + + info!( + "Soft deleted watcher group {} with {} watchers and {} files", + server_group_id, watchers_count, total_files_deleted + ); + Ok((watchers_count, total_files_deleted)) + } + + /// Get watcher with is_deleted status (for checking if recently deleted by another device) + /// Returns (server_watcher_id, is_deleted, deleted_at_timestamp) + async fn get_watcher_with_deleted_status( + &self, + account_hash: &str, + group_id: i32, + watcher_id: i32, + ) -> Result)>> { + debug!( + "Getting watcher with deleted status: account={}, group_id={}, watcher_id={}", + account_hash, group_id, watcher_id + ); + + use sqlx::Row; + let row_opt = sqlx::query( + r#"SELECT id, is_deleted, UNIX_TIMESTAMP(deleted_at) as deleted_ts + FROM watchers + WHERE account_hash = ? AND local_group_id = ? AND watcher_id = ?"#, + ) + .bind(account_hash) + .bind(group_id) + .bind(watcher_id) + .fetch_optional(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to get watcher with deleted status: {}", e); + StorageError::Database(format!("Failed to get watcher with deleted status: {}", e)) + })?; + + if let Some(row) = row_opt { + let id: i32 = row + .try_get("id") + .map_err(|e| StorageError::Database(format!("Row get id: {}", e)))?; + let is_deleted: bool = row.try_get("is_deleted").unwrap_or(false); + let deleted_ts: Option = row.try_get("deleted_ts").ok(); + + debug!( + "Found watcher: id={}, is_deleted={}, deleted_at={:?}", + id, is_deleted, deleted_ts + ); + Ok(Some((id, is_deleted, deleted_ts))) + } else { + debug!("Watcher not found"); + Ok(None) + } + } + + /// Get watcher deleted info by server_watcher_id only (for UPSERT restoration prevention) + async fn get_watcher_deleted_info( + &self, + account_hash: &str, + server_watcher_id: i32, + ) -> Result)>> { + use sqlx::Row; + + let row_opt = sqlx::query( + r#"SELECT is_deleted, UNIX_TIMESTAMP(deleted_at) as deleted_ts + FROM watchers + WHERE id = ? AND account_hash = ?"#, + ) + .bind(server_watcher_id) + .bind(account_hash) + .fetch_optional(self.get_sqlx_pool()) + .await?; + + if let Some(row) = row_opt { + let is_deleted: bool = row.try_get("is_deleted")?; + let deleted_ts: Option = row.try_get("deleted_ts")?; + Ok(Some((is_deleted, deleted_ts))) + } else { + Ok(None) + } + } + + /// Find soft-deleted watcher by folder (for UPSERT restoration prevention) + async fn find_soft_deleted_watcher_by_folder( + &self, + account_hash: &str, + group_id: i32, + folder: &str, + ) -> Result> { + use sqlx::Row; + + let row_opt = sqlx::query( + r#"SELECT id, UNIX_TIMESTAMP(deleted_at) as deleted_ts + FROM watchers + WHERE account_hash = ? AND group_id = ? AND folder = ? AND is_deleted = TRUE"#, + ) + .bind(account_hash) + .bind(group_id) + .bind(folder) + .fetch_optional(self.get_sqlx_pool()) + .await?; + + if let Some(row) = row_opt { + let id: i32 = row.try_get("id")?; + let deleted_ts: i64 = row.try_get::, _>("deleted_ts")?.unwrap_or(0); + Ok(Some((id, deleted_ts))) + } else { + Ok(None) + } + } + + /// Get full WatcherGroupData for a group (for LWW recently_added) + async fn get_watcher_group_data( + &self, + account_hash: &str, + group_id: i32, + ) -> Result> { + use sqlx::Row; + + // First, get the group info + let group_row_opt = sqlx::query( + r#"SELECT group_id, group_name + FROM watcher_groups + WHERE id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(group_id) + .bind(account_hash) + .fetch_optional(self.get_sqlx_pool()) + .await?; + + let group_row = match group_row_opt { + Some(row) => row, + None => return Ok(None), + }; + + let client_group_id: i32 = group_row.try_get("group_id")?; + let group_name: String = group_row.try_get("group_name")?; + + // Get all watchers in the group + let watcher_rows = sqlx::query( + r#"SELECT id, watcher_id, folder, is_recursive, preset, custom_type, update_mode, is_active, extra_json + FROM watchers + WHERE group_id = ? AND account_hash = ? AND is_deleted = FALSE"#, + ) + .bind(group_id) + .bind(account_hash) + .fetch_all(self.get_sqlx_pool()) + .await?; + + let mut watchers = Vec::new(); + for row in watcher_rows { + let server_watcher_id: i32 = row.try_get("id")?; + let client_watcher_id: i32 = row.try_get("watcher_id")?; + let folder: String = row.try_get("folder")?; + let is_recursive: bool = row.try_get("is_recursive")?; + let preset: bool = row.try_get::("preset").unwrap_or(false); + let custom_type: String = row.try_get::("custom_type").unwrap_or_default(); + let update_mode: String = row.try_get::("update_mode").unwrap_or_default(); + let is_active: bool = row.try_get::("is_active").unwrap_or(true); + let extra_json: String = row.try_get::("extra_json").unwrap_or_default(); + + // Get conditions for this watcher (union and subtracting) + let condition_rows = sqlx::query( + r#"SELECT condition_type, patterns + FROM watcher_conditions + WHERE watcher_id = ?"#, + ) + .bind(server_watcher_id) + .fetch_all(self.get_sqlx_pool()) + .await?; + + let mut union_conditions = Vec::new(); + let mut subtracting_conditions = Vec::new(); + + for cond_row in condition_rows { + let condition_type: i32 = cond_row.try_get("condition_type")?; + let patterns: String = cond_row + .try_get::("patterns") + .unwrap_or_default(); + + let condition_data = crate::sync::ConditionData { + key: condition_type.to_string(), + value: patterns.split(',').map(|s| s.to_string()).collect(), + }; + + // condition_type 0 = union, 1 = subtracting (convention) + if condition_type >= 100 { + subtracting_conditions.push(condition_data); + } else { + union_conditions.push(condition_data); + } + } + + watchers.push(crate::sync::WatcherData { + watcher_id: client_watcher_id, + folder, + union_conditions, + subtracting_conditions, + recursive_path: is_recursive, + preset, + custom_type, + update_mode, + is_active, + extra_json, + }); + } + + Ok(Some(WatcherGroupData { + group_id: client_group_id, + title: group_name, + watchers, + last_updated: None, + })) + } + + /// Cleanup soft-deleted records older than retention_days + /// Permanently deletes from database (S3 cleanup would need file_storage reference) + async fn cleanup_soft_deleted(&self, retention_days: i32) -> Result { + use sqlx::Row; + const BATCH_SIZE: i64 = 100; + + info!( + "Starting cleanup of soft-deleted records older than {} days", + retention_days + ); + + let mut stats = CleanupStats::default(); + + // 1. Delete files in batches (with S3 path info for future S3 cleanup) + loop { + // Get batch of files to delete + let files_to_delete: Vec<(i64, String)> = sqlx::query( + r#"SELECT id, COALESCE(s3_path, '') as s3_path FROM files + WHERE is_deleted = TRUE + AND deleted_at < DATE_SUB(NOW(), INTERVAL ? DAY) + LIMIT ?"#, + ) + .bind(retention_days) + .bind(BATCH_SIZE) + .fetch_all(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to query files for cleanup: {}", e); + StorageError::Database(format!("Failed to query files for cleanup: {}", e)) + })? + .iter() + .map(|row| { + let id: i64 = row.try_get("id").unwrap_or(0); + let s3_path: String = row.try_get("s3_path").unwrap_or_default(); + (id, s3_path) + }) + .collect(); + + if files_to_delete.is_empty() { + break; + } + + // Log S3 paths for manual cleanup or future integration + for (file_id, s3_path) in &files_to_delete { + if !s3_path.is_empty() { + debug!( + "File {} has S3 path: {} (should be deleted from S3)", + file_id, s3_path + ); + // TODO: Integrate with S3 client when available + // self.s3_client.delete_object(s3_path).await?; + stats.s3_objects_deleted += 1; + } + } + + // Delete files from database + let file_ids: Vec = files_to_delete.iter().map(|(id, _)| *id).collect(); + let placeholders = file_ids.iter().map(|_| "?").collect::>().join(","); + let query = format!("DELETE FROM files WHERE id IN ({})", placeholders); + + let mut query_builder = sqlx::query(&query); + for id in &file_ids { + query_builder = query_builder.bind(id); + } + + let result = query_builder + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to delete files: {}", e); + StorageError::Database(format!("Failed to delete files: {}", e)) + })?; + + stats.files_deleted += result.rows_affected() as i64; + + // Brief pause to avoid overwhelming the database + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + } + + // 2. Delete watchers + let watchers_result = sqlx::query( + r#"DELETE FROM watchers + WHERE is_deleted = TRUE + AND deleted_at < DATE_SUB(NOW(), INTERVAL ? DAY)"#, + ) + .bind(retention_days) + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to delete watchers: {}", e); + StorageError::Database(format!("Failed to delete watchers: {}", e)) + })?; + stats.watchers_deleted = watchers_result.rows_affected() as i64; + + // 3. Delete watcher groups + let groups_result = sqlx::query( + r#"DELETE FROM watcher_groups + WHERE is_deleted = TRUE + AND deleted_at < DATE_SUB(NOW(), INTERVAL ? DAY)"#, + ) + .bind(retention_days) + .execute(self.get_sqlx_pool()) + .await + .map_err(|e| { + error!("Failed to delete watcher groups: {}", e); + StorageError::Database(format!("Failed to delete watcher groups: {}", e)) + })?; + stats.groups_deleted = groups_result.rows_affected() as i64; + + info!( + "Cleanup completed: {} files, {} watchers, {} groups deleted", + stats.files_deleted, stats.watchers_deleted, stats.groups_deleted + ); + + Ok(stats) + } } diff --git a/src/utils/response.rs b/src/utils/response.rs index ab9c505..87d38ec 100644 --- a/src/utils/response.rs +++ b/src/utils/response.rs @@ -50,6 +50,28 @@ pub fn file_download_error(message: impl Into) -> DownloadFileResponse { file_size: 0, key_id: String::new(), unix_permissions: None, + error_code: ErrorCode::UnknownError as i32, + } +} + +/// Create error response for file download with specific error code +pub fn file_download_error_with_code( + message: impl Into, + error_code: ErrorCode, +) -> DownloadFileResponse { + DownloadFileResponse { + success: false, + file_data: Vec::new(), + file_hash: String::new(), + is_encrypted: false, + return_message: message.into(), + filename: String::new(), + file_path: String::new(), + updated_time: None, + file_size: 0, + key_id: String::new(), + unix_permissions: None, + error_code: error_code as i32, } }