Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions proto/sync.proto
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ message UploadFileRequest {
int64 expected_revision = 16; // Expected server revision (0 for new files, or current revision for updates)
ConflictInfo.ResolutionStrategy conflict_resolution = 17; // How to resolve conflicts (default: MANUAL)
int64 client_timestamp = 18; // Client file modification time (Unix epoch seconds)
optional string operation_type = 19; // "CREATE", "UPDATE", "DELETE", "RENAME", "MOVE"
}

message UploadFileResponse {
Expand Down Expand Up @@ -430,8 +431,9 @@ message UploadFileChunk {
uint64 seq = 14; // sequential index starting at 0
bool last = 15; // true for the final chunk
optional uint32 unix_permissions = 16;
int64 expected_revision = 17; // Expected server revision for streaming uploads
optional string operation_type = 17; // "CREATE", "UPDATE", "DELETE", "RENAME", "MOVE" (first chunk only)
optional int64 client_timestamp = 18; // Client modification time (first chunk only)
optional int64 expected_revision = 19; // Expected server revision for streaming uploads
}

message DownloadFileRequest {
Expand Down Expand Up @@ -549,8 +551,9 @@ message RenameFileRequest {
int32 watcher_id = 8; // Watcher ID
int64 timestamp = 9; // Rename timestamp (Unix seconds)
int64 expected_revision = 10; // Expected server revision (0 to skip check, or current revision for validation)
ConflictInfo.ResolutionStrategy conflict_resolution = 11; // How to resolve conflicts (default: MANUAL)
google.protobuf.Timestamp updated_time = 12; // Client's last known update time for timestamp-based conflict resolution
optional string operation_type = 11; // e.g., "RENAME" or "MOVE"
ConflictInfo.ResolutionStrategy conflict_resolution = 12; // How to resolve conflicts (default: MANUAL)
google.protobuf.Timestamp updated_time = 13; // Client's last known update time for timestamp-based conflict resolution
}

// NEW: Rename file response
Expand Down Expand Up @@ -1141,4 +1144,3 @@ message BatchOperationsResponse {
int32 successful_operations = 5;
int32 failed_operations = 6;
}

4 changes: 2 additions & 2 deletions src/domain/entities.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ impl From<crate::models::device::Device> for Device {
Self {
device_hash: model.device_hash,
account_hash: model.account_hash,
device_name: model.user_id.clone(), // Use user_id as device name
device_name: model.device_name.clone(),
device_type,
last_seen: model.last_sync.timestamp(),
is_active: model.is_active,
Expand All @@ -458,9 +458,9 @@ impl From<Device> for crate::models::device::Device {
};

Self {
user_id: entity.device_name,
account_hash: entity.account_hash,
device_hash: entity.device_hash,
device_name: entity.device_name,
updated_at: chrono::Utc::now(), // Default to now
registered_at: chrono::Utc
.timestamp_opt(entity.created_at, 0)
Expand Down
3 changes: 2 additions & 1 deletion src/handlers/auth_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,8 @@ impl AuthHandler {
let new_device = Device::new(
account_hash.to_string(),
device_hash.to_string(),
true, // activate
device_hash.to_string(), // Use device_hash as default device_name
true, // activate
os_version.to_string(),
app_version.to_string(),
);
Expand Down
4 changes: 3 additions & 1 deletion src/handlers/device_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl DeviceHandler {
let device = Device::new(
server_account_hash.clone(),
req.device_hash.clone(),
req.device_name.clone(),
req.is_active,
req.os_version.clone(),
req.app_version.clone(),
Expand Down Expand Up @@ -302,6 +303,7 @@ impl DeviceHandler {
let new_device = Device::new(
req.account_hash.clone(),
req.device_hash.clone(),
req.device_name.clone(),
req.is_active,
req.os_version.clone(),
req.app_version.clone(),
Expand Down Expand Up @@ -374,7 +376,7 @@ impl DeviceHandler {
seconds: device.last_sync.timestamp(),
nanos: 0,
}),
device_name: device.user_id.clone(),
device_name: device.device_name.clone(),
})
.collect();

Expand Down
124 changes: 112 additions & 12 deletions src/handlers/file/rename.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,46 @@ pub async fn handle_rename_file(
current_revision: 0,
}));
}
// Use normalized account hash for all downstream operations/broadcasts
let account_hash = server_account_hash;

// Map client group/watcher IDs to server IDs for all downstream operations
let (server_group_id, server_watcher_id) = match handler
.app_state
.file
.convert_client_ids_to_server(&account_hash, req.group_id, req.watcher_id)
.await
{
Ok(Some(ids)) => ids,
Ok(None) => {
error!(
"Failed to map client watcher IDs to server IDs: account={}, client_group={}, client_watcher={}",
account_hash, req.group_id, req.watcher_id
);
return Ok(Response::new(RenameFileResponse {
success: false,
return_message: "Watcher mapping not found on server".to_string(),
new_revision: 0,
conflict: None,
error_code: ErrorCode::DbError as i32,
current_revision: 0,
}));
}
Err(e) => {
error!(
"Error converting client IDs to server IDs: account={}, client_group={}, client_watcher={}, error={}",
account_hash, req.group_id, req.watcher_id, e
);
return Ok(Response::new(RenameFileResponse {
success: false,
return_message: format!("Failed to resolve watcher mapping: {}", e),
new_revision: 0,
conflict: None,
error_code: ErrorCode::DbError as i32,
current_revision: 0,
}));
}
};

// 2. Check if file exists and verify old_file_path matches
let file_info = match handler.app_state.file.get_file_info(req.file_id).await {
Expand Down Expand Up @@ -108,13 +148,37 @@ pub async fn handle_rename_file(
.app_state
.file
.find_file_by_path(
&req.account_hash,
req.group_id,
req.watcher_id,
&account_hash,
server_group_id,
server_watcher_id,
&req.new_file_path,
)
.await
{
// Fix P2 #5: Idempotency check
// If file at target path is the SAME file_id, consider it a successful retry
if existing.file_id == req.file_id {
info!(
"Idempotency check: File {} is already at target path {}",
req.file_id, req.new_file_path
);

// If client provided expected_revision, check if we already met it or exceeded it
if req.expected_revision > 0 && existing.revision != req.expected_revision {
warn!("Idempotency warning: File is at target path but revision mismatch (current={}, expected={})", existing.revision, req.expected_revision);
// We can still treat it as success if we assume the client missed the response
}

return Ok(Response::new(RenameFileResponse {
success: true,
return_message: "File renamed successfully (idempotent)".to_string(),
new_revision: existing.revision,
conflict: None,
error_code: ErrorCode::Success as i32,
current_revision: existing.revision,
}));
}

if existing.file_id != req.file_id {
warn!(
"Target path '{}' already exists (file_id: {})",
Expand All @@ -138,12 +202,12 @@ pub async fn handle_rename_file(

// 4. Determine if this is a cross-watcher MOVE
let is_cross_watcher_move =
req.group_id != file_info.group_id || req.watcher_id != file_info.watcher_id;
server_group_id != file_info.group_id || server_watcher_id != file_info.watcher_id;

if is_cross_watcher_move {
info!(
"Cross-watcher MOVE request: file_id={}, from group={}/watcher={} to group={}/watcher={}",
req.file_id, file_info.group_id, file_info.watcher_id, req.group_id, req.watcher_id
req.file_id, file_info.group_id, file_info.watcher_id, server_group_id, server_watcher_id
);
}

Expand All @@ -157,12 +221,12 @@ pub async fn handle_rename_file(

// Pass new group_id/watcher_id for cross-watcher MOVE, None for same-watcher rename
let new_group_id = if is_cross_watcher_move {
Some(req.group_id)
Some(server_group_id)
} else {
None
};
let new_watcher_id = if is_cross_watcher_move {
Some(req.watcher_id)
Some(server_watcher_id)
} else {
None
};
Expand Down Expand Up @@ -204,7 +268,7 @@ pub async fn handle_rename_file(
req.file_id,
&req.old_file_path,
&req.new_file_path,
&req.account_hash,
&account_hash,
&req.device_hash,
req.timestamp,
new_revision,
Expand All @@ -216,15 +280,30 @@ pub async fn handle_rename_file(
}

// 6. Broadcast rename notification to other devices
// Fix P0 #3: Include updated file info with Server IDs
let mut broadcast_file_info = file_info.clone();
broadcast_file_info.file_path = req.new_file_path.clone();
broadcast_file_info.revision = new_revision;
broadcast_file_info.updated_time.seconds = req.timestamp;
broadcast_file_info.updated_time.nanos = 0;

if let Some(gid) = new_group_id {
broadcast_file_info.group_id = gid;
}
if let Some(wid) = new_watcher_id {
broadcast_file_info.watcher_id = wid;
}

if let Err(e) = handler
.broadcast_rename_notification(
&req.account_hash,
&account_hash,
&req.device_hash,
req.file_id,
&req.old_file_path,
&req.new_file_path,
new_revision,
req.timestamp,
Some(broadcast_file_info),
)
.await
{
Expand Down Expand Up @@ -313,7 +392,7 @@ pub async fn handle_rename_file(
req.file_id,
&req.old_file_path,
&req.new_file_path,
&req.account_hash,
&account_hash,
&req.device_hash,
req.timestamp,
new_revision,
Expand All @@ -323,16 +402,31 @@ pub async fn handle_rename_file(
warn!("Failed to record rename history: {}", e);
}

// Fix P0 #3: Include updated file info with Server IDs
let mut broadcast_file_info = file_info.clone();
broadcast_file_info.file_path = req.new_file_path.clone();
broadcast_file_info.revision = new_revision;
broadcast_file_info.updated_time.seconds = req.timestamp;
broadcast_file_info.updated_time.nanos = 0;

if let Some(gid) = new_group_id {
broadcast_file_info.group_id = gid;
}
if let Some(wid) = new_watcher_id {
broadcast_file_info.watcher_id = wid;
}

// Broadcast rename notification to other devices
if let Err(e) = handler
.broadcast_rename_notification(
&req.account_hash,
&account_hash,
&req.device_hash,
req.file_id,
&req.old_file_path,
&req.new_file_path,
new_revision,
req.timestamp,
Some(broadcast_file_info),
)
.await
{
Expand Down Expand Up @@ -414,19 +508,23 @@ pub async fn handle_rename_file(
|| error_msg.contains("File not found")
|| error_msg.contains("already deleted");

let is_path_conflict = error_msg.contains("Path conflict");

error!(
"Rename operation failed - file_id: {}, old_path: {}, new_path: {}, error: {}, error_type: {:?}, is_permanent: {}",
req.file_id,
req.old_file_path,
req.new_file_path,
error_msg,
if is_permanent_error { "permanent" } else if is_not_found { "not_found" } else { "temporary" },
if is_permanent_error { "permanent" } else if is_not_found { "not_found" } else if is_path_conflict { "path_conflict" } else { "temporary" },
is_permanent_error
);

// Format error message for client
let client_error_msg = if is_not_found {
format!("File not found: already deleted")
} else if is_path_conflict {
format!("Target path already exists (concurrent operation)")
} else if is_permanent_error {
format!("DB schema error: column type mismatch - {}", error_msg)
} else {
Expand All @@ -435,6 +533,8 @@ pub async fn handle_rename_file(

let error_code = if is_not_found {
ErrorCode::FileNotFound
} else if is_path_conflict {
ErrorCode::PathConflict
} else if is_permanent_error {
ErrorCode::DbSchemaError
} else {
Expand Down
7 changes: 6 additions & 1 deletion src/handlers/file_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ impl FileHandler {
info!(" watcher_id: {}", req.watcher_id);
info!(" file_path: {}", req.file_path);
info!(" file_data length: {} bytes", req.file_data.len());
info!(
" operation_type: {}",
req.operation_type.as_deref().unwrap_or("<unspecified>")
);
let key_id_log = if req.key_id.is_empty() {
"<empty>"
} else {
Expand Down Expand Up @@ -455,6 +459,7 @@ impl FileHandler {
new_path: &str,
revision: i64,
timestamp: i64,
file_info: Option<crate::models::file::FileInfo>,
) -> Result<(), String> {
use crate::sync::{FileUpdateNotification, RenameInfo};

Expand All @@ -470,7 +475,7 @@ impl FileHandler {
let notification = FileUpdateNotification {
account_hash: account_hash.to_string(),
device_hash: device_hash.to_string(),
file_info: None, // Rename doesn't need full file info
file_info: file_info.map(|fi| fi.to_sync_file()), // Use provided file_info (with Server IDs)
update_type: crate::sync::file_update_notification::UpdateType::Renamed as i32,
timestamp,
rename_info: Some(rename_info),
Expand Down
Loading