Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 16 additions & 124 deletions cli/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,7 +568,7 @@ impl ApiClient {
info!("Successfully retrieved room state");

// Parse the actual room state from the response
let mut room_state: ChatRoomStateV1 = ciborium::de::from_reader(&state[..])
let room_state: ChatRoomStateV1 = ciborium::de::from_reader(&state[..])
.map_err(|e| anyhow!("Failed to deserialize room state: {}", e))?;

info!(
Expand All @@ -583,51 +583,7 @@ impl ApiClient {
room_state.recent_messages.messages.len()
);

// Apply the invitation's member data to add this user to the members list
let members_delta =
river_core::room_state::member::MembersDelta::new(vec![invitation
.invitee
.clone()]);

// Create parameters for applying delta
let parameters = ChatRoomParametersV1 {
owner: room_owner_vk,
};

// Apply the member delta to add ourselves to the room
room_state
.members
.apply_delta(&room_state.clone(), &parameters, &Some(members_delta))
.map_err(|e| anyhow!("Failed to add member to room: {}", e))?;

info!(
"Added self to members list, total members: {}",
room_state.members.members.len()
);

// Create member info entry with nickname
let member_info = MemberInfo {
member_id: invitation.invitee_signing_key.verifying_key().into(),
version: 0,
preferred_nickname: SealedBytes::public(
nickname.to_string().into_bytes(),
),
};
let authorized_member_info =
AuthorizedMemberInfo::new(member_info, &invitation.invitee_signing_key);

// Add the member info to the room state
room_state
.member_info
.member_info
.push(authorized_member_info.clone());

info!("Added member info with nickname: {}", nickname);

// Validate the room state is properly initialized
let self_member_id = invitation.invitee_signing_key.verifying_key().into();

// Check owner_member_id is set correctly
if room_state.configuration.configuration.owner_member_id
== river_core::room_state::member::MemberId(
freenet_scaffold::util::FastHash(0),
Expand All @@ -636,28 +592,9 @@ impl ApiClient {
return Err(anyhow!("Room state has invalid owner_member_id"));
}

// Check we're in the members list
let is_member = room_state.members.members.iter().any(|m| {
m.member.member_vk == invitation.invitee_signing_key.verifying_key()
});
if !is_member {
return Err(anyhow!("Failed to add self to members list"));
}

// Check we have member info
let has_member_info = room_state
.member_info
.member_info
.iter()
.any(|info| info.member_info.member_id == self_member_id);
if !has_member_info {
return Err(anyhow!("Failed to add member info"));
}

info!("Validation passed: owner_member_id={:?}, is_member={}, has_member_info={}",
room_state.configuration.configuration.owner_member_id, is_member, has_member_info);

// Compute invite chain before storing (need room_state reference)
// Compute invite chain before storing (walks up from invitee
// to owner through existing members — doesn't require the
// invitee to be in the members list)
let params_for_chain = ChatRoomParametersV1 {
owner: room_owner_vk,
};
Expand All @@ -666,77 +603,32 @@ impl ApiClient {
.get_invite_chain(&invitation.invitee, &params_for_chain)
.unwrap_or_default();

// Store the properly initialized room state locally
// Store the room state locally WITHOUT adding ourselves to
// members. Membership will be added to the network atomically
// with our first message via build_rejoin_delta, which bundles
// AuthorizedMember + message in a single delta. This avoids a
// race where post_apply_cleanup prunes a member with no
// messages before their first message arrives.
self.storage.add_room(
&room_owner_vk,
&invitation.invitee_signing_key,
room_state,
&contract_key,
)?;

// Store authorized member and invite chain for future re-join after pruning
// Store authorized member and invite chain so
// build_rejoin_delta can re-add us when we send a message
self.storage.store_authorized_member(
&room_owner_vk,
&invitation.invitee,
&invite_chain,
)?;

// Drop the original lock before update
drop(web_api);

// Note: Subscription removed as riverctl is a one-shot CLI tool
// that exits immediately. Subscription will be re-added when
// streaming functionality is implemented.

// Publish membership and member info to the network (non-blocking)
// Build a delta containing the authorized member and member info we just applied
let membership_delta = ChatRoomStateV1Delta {
members: Some(river_core::room_state::member::MembersDelta::new(vec![
invitation.invitee.clone(),
])),
member_info: Some(vec![authorized_member_info.clone()]),
..Default::default()
};
// Serialize delta
let delta_bytes = {
let mut buf = Vec::new();
ciborium::ser::into_writer(&membership_delta, &mut buf).map_err(
|e| anyhow!("Failed to serialize membership delta: {}", e),
)?;
buf
};
let mut web_api = self.web_api.lock().await;
let update_request = ContractRequest::Update {
key: contract_key,
data: UpdateData::Delta(delta_bytes.into()),
};
let update_client_request = ClientRequest::ContractOp(update_request);
tracing::info!(
"ACCEPT: sending membership UPDATE for contract {}",
contract_key.id()
info!(
"Invitation accepted: stored credentials for room, \
membership will be published with first message"
);
web_api
.send(update_client_request)
.await
.map_err(|e| anyhow!("Failed to send membership update: {}", e))?;
// Try to receive an ack briefly, but don't fail if none arrives quickly
match tokio::time::timeout(
std::time::Duration::from_secs(2),
web_api.recv(),
)
.await
{
Ok(Ok(HostResponse::ContractResponse(
ContractResponse::UpdateResponse { .. },
))) => {
tracing::info!("ACCEPT: received UPDATE ack");
info!("Membership published to network");
}
_ => {
tracing::info!("ACCEPT: no immediate UPDATE ack");
info!("Membership update sent (no immediate ack)");
}
}

drop(web_api);

Ok((room_owner_vk, contract_key))
Expand Down
94 changes: 10 additions & 84 deletions ui/src/components/app/freenet_api/response_handler/get_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ use dioxus::logger::tracing::{error, info, warn};
use dioxus::prelude::ReadableExt;
use freenet_scaffold::ComposableState;
use freenet_stdlib::prelude::ContractKey;
use river_core::room_state::member::{MemberId, MembersDelta};
use river_core::room_state::member::MemberId;
use river_core::room_state::member_info::{AuthorizedMemberInfo, MemberInfo};
use river_core::room_state::message::{MessageId, RoomMessageBody};
use river_core::room_state::privacy::{PrivacyMode, SealedBytes};
use river_core::room_state::{ChatRoomParametersV1, ChatRoomStateV1, ChatRoomStateV1Delta};
use river_core::room_state::{ChatRoomParametersV1, ChatRoomStateV1};
use std::collections::HashMap;
use x25519_dalek::PublicKey as X25519PublicKey;

Expand Down Expand Up @@ -225,69 +225,12 @@ pub async fn handle_get_response(
let authorized_member_info =
AuthorizedMemberInfo::new_with_member_key(member_info.clone(), &self_sk);

// Create a Delta from the invitation and merge it to ensure that the
// relevant information is part of the state

let invitation_delta = ChatRoomStateV1Delta {
configuration: None,
bans: None,
members: Some(MembersDelta::new(vec![authorized_member.clone()])),
member_info: Some(vec![authorized_member_info]),
secrets: None,
recent_messages: None,
upgrade: None,
version: None,
};

// Clone current state to avoid borrow issues during merge
let current_state = room_data.room_state.clone();

room_data
.room_state
.apply_delta(
&current_state,
&ChatRoomParametersV1 { owner: owner_vk },
&Some(invitation_delta),
)
.expect("Failed to apply invitation delta");

// Check if the member survived remove_excess_members.
// When the room is at max_members capacity, apply_delta adds
// the new member then immediately evicts the one with the
// longest invite chain — which is often the new invitee.
let invitee_vk = self_sk.verifying_key();
let member_retained = invitee_vk == owner_vk
|| room_data
.room_state
.members
.members
.iter()
.any(|m| m.member.member_vk == invitee_vk);
if !member_retained {
let max_members = room_data
.room_state
.configuration
.configuration
.max_members;
let err_msg = format!(
"This room is full ({max_members}/{max_members} members). \
The room owner needs to increase the member limit before \
new members can join."
);
error!("{err_msg}");
// Set error status so the invitation modal shows the message
PENDING_INVITES.with_mut(|invites| {
if let Some(join) = invites.map.get_mut(&owner_vk) {
join.status = PendingRoomStatus::Error(err_msg);
}
});
// Remove the incomplete room data
rooms.map.remove(&owner_vk);
return;
}

// Store the authorized member for future re-join after pruning
// Store membership credentials for future rejoin.
// We do NOT apply the member to room_state here — membership
// is published atomically with the first message to avoid
// post_apply_cleanup pruning a member with no messages.
room_data.self_authorized_member = Some(authorized_member.clone());
room_data.self_member_info = Some(authorized_member_info);
// Capture invite chain from current state
if let Ok(chain) = room_data.room_state.members.get_invite_chain(
&authorized_member,
Expand Down Expand Up @@ -411,28 +354,11 @@ pub async fn handle_get_response(
}
});

// Mark room as needing sync so it gets saved to delegate storage
// Mark room as needing sync so it gets saved to delegate storage.
// We do NOT trigger ProcessRooms because we haven't modified the
// room state — membership will be published with the first message.
use crate::components::app::NEEDS_SYNC;
NEEDS_SYNC.write().insert(owner_vk);

// Trigger synchronization to send the member update to the network
// This is critical for other users to see that this user has joined
info!("Triggering synchronization after accepting invitation to propagate member addition");
use crate::components::app::freenet_api::freenet_synchronizer::SynchronizerMessage;
use crate::components::app::SYNCHRONIZER;

if let Err(e) = SYNCHRONIZER
.read()
.get_message_sender()
.unbounded_send(SynchronizerMessage::ProcessRooms)
{
error!(
"Failed to trigger synchronization after joining room: {}",
e
);
} else {
info!("Successfully triggered synchronization after joining room");
}
}
} else if is_existing_room {
// This is a refresh GET for an already-subscribed room (e.g., after wake from suspension)
Expand Down
59 changes: 36 additions & 23 deletions ui/src/room_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,33 +161,46 @@ impl RoomData {
}
}

/// Check if the user can send a message in the room
/// Check if the user can send a message in the room.
/// A user is considered a member if they are the owner, are in the active
/// members list, or have a stored invitation (self_authorized_member).
pub fn can_send_message(&self) -> Result<(), SendMessageError> {
let verifying_key = self.self_sk.verifying_key();
// Must be owner or a member of the room to send a message
if verifying_key == self.owner_vk
|| self
.room_state
.members
.members
.iter()
.any(|m| m.member.member_vk == verifying_key)
let member_id = MemberId::from(&verifying_key);

// Check if banned first
if self
.room_state
.bans
.0
.iter()
.any(|b| b.ban.banned_user == member_id)
{
// Must not be banned from the room to send a message
if self
.room_state
.bans
.0
.iter()
.any(|b| b.ban.banned_user == verifying_key.into())
{
Err(SendMessageError::UserBanned)
} else {
Ok(())
}
} else {
Err(SendMessageError::UserNotMember)
return Err(SendMessageError::UserBanned);
}

// Owner can always send
if verifying_key == self.owner_vk {
return Ok(());
}

// Currently in members list
if self
.room_state
.members
.members
.iter()
.any(|m| m.member.member_vk == verifying_key)
{
return Ok(());
}

// Has stored invite (can re-add with first message)
if self.self_authorized_member.is_some() {
return Ok(());
}

Err(SendMessageError::UserNotMember)
}

/// Check if the user can participate in the room (send messages, edit profile).
Expand Down