Skip to content
56 changes: 28 additions & 28 deletions cli/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ impl ApiClient {

match response {
HostResponse::ContractResponse(ContractResponse::PutResponse { key }) => {
info!("Room republished successfully with contract key: {}", key.id());
info!(
"Room republished successfully with contract key: {}",
key.id()
);
if key != contract_key {
return Err(anyhow!(
"Contract key mismatch: expected {}, got {}",
Expand All @@ -311,7 +314,7 @@ impl ApiClient {
info!("Getting room state for contract: {}", contract_key.id());

let get_request = ContractRequest::Get {
key: *contract_key.id(), // GET uses ContractInstanceId
key: *contract_key.id(), // GET uses ContractInstanceId
return_contract_code: true, // Request full contract to enable caching
subscribe: false, // Always false, we'll subscribe separately if needed
};
Expand Down Expand Up @@ -487,7 +490,7 @@ impl ApiClient {

// Perform a GET request to fetch the room state
let get_request = ContractRequest::Get {
key: *contract_key.id(), // GET uses ContractInstanceId
key: *contract_key.id(), // GET uses ContractInstanceId
return_contract_code: true, // Request full contract to enable caching
subscribe: false, // We'll subscribe separately after GET succeeds
};
Expand Down Expand Up @@ -886,7 +889,11 @@ impl ApiClient {
match tokio::time::timeout(std::time::Duration::from_secs(60), web_api.recv()).await {
Ok(Ok(response)) => response,
Ok(Err(e)) => return Err(anyhow!("Failed to receive response: {}", e)),
Err(_) => return Err(anyhow!("Timeout waiting for update response after 60 seconds")),
Err(_) => {
return Err(anyhow!(
"Timeout waiting for update response after 60 seconds"
))
}
};

match response {
Expand Down Expand Up @@ -1126,8 +1133,7 @@ impl ApiClient {
}

// Save the updated state locally
self.storage
.update_room_state(room_owner_key, room_state)?;
self.storage.update_room_state(room_owner_key, room_state)?;

// Create delta with just the member info update
let delta = ChatRoomStateV1Delta {
Expand Down Expand Up @@ -1173,10 +1179,7 @@ impl ApiClient {

match response {
HostResponse::ContractResponse(ContractResponse::UpdateResponse { key, .. }) => {
info!(
"Nickname updated successfully for contract: {}",
key.id()
);
info!("Nickname updated successfully for contract: {}", key.id());
Ok(())
}
_ => Err(anyhow!("Unexpected response type: {:?}", response)),
Expand Down Expand Up @@ -1272,9 +1275,9 @@ impl ApiClient {
return Err(anyhow!("Circular invite chain detected"));
}

let inviter = members_by_id.get(&current_id).ok_or_else(|| {
anyhow!("Invite chain broken: inviter not found")
})?;
let inviter = members_by_id
.get(&current_id)
.ok_or_else(|| anyhow!("Invite chain broken: inviter not found"))?;
current_id = inviter.member.invited_by;
}

Expand All @@ -1285,10 +1288,7 @@ impl ApiClient {
}
}

info!(
"Banning member with ID: {}",
banned_member_id.to_string()
);
info!("Banning member with ID: {}", banned_member_id.to_string());

// Create the ban
let user_ban = UserBan {
Expand Down Expand Up @@ -1474,11 +1474,8 @@ impl ApiClient {

// Wait for next message with a short timeout to allow checking shutdown
let mut web_api = self.web_api.lock().await;
let recv_result = tokio::time::timeout(
std::time::Duration::from_millis(500),
web_api.recv(),
)
.await;
let recv_result =
tokio::time::timeout(std::time::Duration::from_millis(500), web_api.recv()).await;

match recv_result {
Ok(Ok(HostResponse::ContractResponse(ContractResponse::UpdateNotification {
Expand All @@ -1492,14 +1489,16 @@ impl ApiClient {
match update {
UpdateData::Delta(delta_bytes) => {
// Parse the delta
if let Ok(delta) =
ciborium::de::from_reader::<ChatRoomStateV1Delta, _>(&delta_bytes[..])
{
if let Ok(delta) = ciborium::de::from_reader::<ChatRoomStateV1Delta, _>(
&delta_bytes[..],
) {
// Check for new messages in the delta
if let Some(messages) = &delta.recent_messages {
for msg in messages {
let msg_id =
format!("{:?}:{:?}", msg.message.author, msg.message.time);
let msg_id = format!(
"{:?}:{:?}",
msg.message.author, msg.message.time
);

if seen_messages.insert(msg_id.clone()) {
// Need to get current room state for nickname lookup
Expand All @@ -1517,7 +1516,8 @@ impl ApiClient {
new_message_count += 1;
web_api = self.web_api.lock().await;

if max_messages > 0 && new_message_count >= max_messages {
if max_messages > 0 && new_message_count >= max_messages
{
return Ok(());
}
}
Expand Down
5 changes: 4 additions & 1 deletion cli/src/commands/member.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,10 @@ pub async fn execute(command: MemberCommands, api: ApiClient, format: OutputForm
match api.ban_member(&owner_vk, &member_id).await {
Ok(()) => match format {
OutputFormat::Human => {
println!("{}", format!("Member '{}' has been banned.", member_id).green());
println!(
"{}",
format!("Member '{}' has been banned.", member_id).green()
);
}
OutputFormat::Json => {
println!(
Expand Down
51 changes: 39 additions & 12 deletions cli/tests/message_flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,19 @@ fn owner_key_to_contract_key(owner_key_str: &str) -> Result<ContractKey> {
.context("Invalid owner key encoding")?
.try_into()
.map_err(|_| anyhow!("Owner key must be 32 bytes"))?;
let owner_vk = VerifyingKey::from_bytes(&owner_bytes)
.context("Invalid owner verifying key")?;
let owner_vk = VerifyingKey::from_bytes(&owner_bytes).context("Invalid owner verifying key")?;

let params = ChatRoomParametersV1 { owner: owner_vk };
let params_bytes = {
let mut buf = Vec::new();
ciborium::ser::into_writer(&params, &mut buf)
.context("Failed to serialize parameters")?;
ciborium::ser::into_writer(&params, &mut buf).context("Failed to serialize parameters")?;
buf
};
let contract_code = ContractCode::from(ROOM_CONTRACT_WASM);
Ok(ContractKey::from_params_and_code(Parameters::from(params_bytes), &contract_code))
Ok(ContractKey::from_params_and_code(
Parameters::from(params_bytes),
&contract_code,
))
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -1417,7 +1418,10 @@ async fn run_late_joiner_test() -> Result<()> {
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(30);
println!("Waiting {}s for topology stabilization...", stabilization_secs);
println!(
"Waiting {}s for topology stabilization...",
stabilization_secs
);
sleep(Duration::from_secs(stabilization_secs)).await;

dump_topology(&network).await;
Expand All @@ -1435,7 +1439,14 @@ async fn run_late_joiner_test() -> Result<()> {
let create_stdout = run_riverctl(
owner_config.path(),
&owner_url,
&["room", "create", "--name", "Late Joiner Test Room", "--nickname", "owner"],
&[
"room",
"create",
"--name",
"Late Joiner Test Room",
"--nickname",
"owner",
],
)
.await
.context("Failed to create room")?;
Expand Down Expand Up @@ -1465,7 +1476,13 @@ async fn run_late_joiner_test() -> Result<()> {
run_riverctl(
early_joiner_config.path(),
&early_url,
&["invite", "accept", &invite1.invitation_code, "--nickname", "early_joiner"],
&[
"invite",
"accept",
&invite1.invitation_code,
"--nickname",
"early_joiner",
],
)
.await
.context("Failed to accept invite for early joiner")?;
Expand All @@ -1482,8 +1499,12 @@ async fn run_late_joiner_test() -> Result<()> {
Ok(handle) => break handle,
Err(err) => {
if subscribe_attempts >= 3 {
return Err(err)
.with_context(|| format!("Failed to subscribe peer {} after {} attempts", peer_idx, subscribe_attempts));
return Err(err).with_context(|| {
format!(
"Failed to subscribe peer {} after {} attempts",
peer_idx, subscribe_attempts
)
});
}
println!(
"Subscribe attempt {} failed for peer {}, retrying: {}",
Expand Down Expand Up @@ -1558,7 +1579,13 @@ async fn run_late_joiner_test() -> Result<()> {
run_riverctl(
late_joiner_config.path(),
&late_url,
&["invite", "accept", &invite2.invitation_code, "--nickname", "late_joiner"],
&[
"invite",
"accept",
&invite2.invitation_code,
"--nickname",
"late_joiner",
],
)
.await
.context("Failed to accept invite for late joiner")?;
Expand Down Expand Up @@ -1628,7 +1655,7 @@ async fn run_late_joiner_test() -> Result<()> {
"REGRESSION: Late joiner failed to receive post-join messages. \
This indicates the contract key mismatch bug (PR #2360) may have regressed. \
The late joiner fetched the contract via GET, but subsequent UPDATEs failed \
because the contract store couldn't find the contract with the key used in the UPDATE."
because the contract store couldn't find the contract with the key used in the UPDATE.",
)?;

// Also verify early joiner received all messages
Expand Down
Loading