diff --git a/.env b/.env new file mode 100644 index 0000000..c4204f2 --- /dev/null +++ b/.env @@ -0,0 +1,2 @@ +# .env +RUST_LOG=info \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6985cf1..196e176 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,8 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..a694f55 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,7 @@ +[workspace] +members = ["server", "client"] + +[profile.release] +lto = true +codegen-units = 1 +panic = "abort" \ No newline at end of file diff --git a/client/Cargo.toml b/client/Cargo.toml new file mode 100644 index 0000000..86d94d1 --- /dev/null +++ b/client/Cargo.toml @@ -0,0 +1,17 @@ +[package] +name = "client" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { version = "1.0", features = ["full"] } +tokio-tungstenite = "0.20" +url = "2.0" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +futures-util = "0.3" +uuid = { version = "1.0", features = ["v4"] } +chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +dotenv = "0.15" diff --git a/client/src/main.rs b/client/src/main.rs new file mode 100644 index 0000000..d2918c0 --- /dev/null +++ b/client/src/main.rs @@ -0,0 +1,146 @@ +use std::env; +use std::io::Write; +use std::sync::Arc; +use tokio::sync::Mutex; +use tracing_subscriber::EnvFilter; + +use chrono::TimeZone; +use futures_util::{SinkExt, StreamExt}; +use tokio::io::{AsyncBufReadExt, BufReader, stdin}; +use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage}; +use url::Url; + +mod shared_types; +use shared_types::*; + +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let args: Vec = env::args().collect(); + if args.len() != 4 { + eprintln!("Usage: {} ", args[0]); + return Ok(()); + } + + let host = &args[1]; + let port = &args[2]; + let username = &args[3]; + + let url = Url::parse(&format!("ws://{}:{}/", host, port))?; + let (ws_stream, _) = connect_async(url).await?; + + let (write_half, read_half) = ws_stream.split(); + let shared_write = Arc::new(Mutex::new(write_half)); + let shared_write_for_read_task = shared_write.clone(); + + // Send join message + let join_msg = ClientMessage::Join { + username: username.to_string(), + }; + let json = serde_json::to_string(&join_msg)?; + { + let mut write_guard = shared_write.lock().await; + write_guard.send(WsMessage::Text(json)).await?; + } + + // Spawn task to handle incoming messages + let read_handle = tokio::spawn(async move { + let mut read = read_half; + + while let Some(msg) = read.next().await { + match msg { + Ok(WsMessage::Text(text)) => { + match serde_json::from_str::(&text) { + Ok(ServerMessage::Welcome { username: _ }) => { + println!("✅ Welcome to the chat!"); + println!("➡️ Type 'send ' to chat or 'leave' to exit."); + } + Ok(ServerMessage::Joined { username }) => { + println!("📥 {} joined the chat", username); + } + Ok(ServerMessage::Left { username }) => { + println!("📤 {} left the chat", username); + } + Ok(ServerMessage::NewMessage(msg)) => { + let ts = chrono::Local + .timestamp_opt((msg.timestamp / 1000) as i64, 0) + .single() + .unwrap_or_else(|| chrono::Local::now()) + .format("%H:%M:%S"); + println!("[{}] [{}] {}", ts, msg.sender, msg.content); + + // Send ACK + let ack = ClientMessage::AckReceived { + message_id: msg.id.clone(), + intended_recipient: msg.sender.clone(), + }; + if let Ok(ack_json) = serde_json::to_string(&ack) { + let mut write_guard = shared_write_for_read_task.lock().await; + let _ = write_guard.send(WsMessage::Text(ack_json)).await; + } + } + Ok(ServerMessage::DeliveryReceipt { + message_id, + received_by, + }) => { + println!("✅ Message {} delivered to {}", message_id, received_by); + } + Ok(ServerMessage::Error { message }) => { + eprintln!("Error: {}", message); + } + Err(_) => {} + } + } + Ok(WsMessage::Close(_)) | Err(_) => { + println!("Disconnected from server."); + break; + } + _ => {} + } + } + }); + + // Handle user input + let stdin = stdin(); + let mut lines = BufReader::new(stdin).lines(); + + loop { + print!("> "); + std::io::stdout().flush().unwrap(); + + match lines.next_line().await { + Ok(Some(line)) => { + let line = line.trim(); + if line == "leave" { + let leave_msg = ClientMessage::Leave; + let json = serde_json::to_string(&leave_msg)?; + { + let mut write_guard = shared_write.lock().await; + let _ = write_guard.send(WsMessage::Text(json)).await; + } + println!("Goodbye!"); + break; + } else if line.starts_with("send ") { + let content = line[5..].to_string(); + let msg = ClientMessage::SendMessage { content }; + let json = serde_json::to_string(&msg)?; + { + let mut write_guard = shared_write.lock().await; + let _ = write_guard.send(WsMessage::Text(json)).await; + } + } else { + println!("Unknown command. Use 'send ' or 'leave'"); + } + } + Err(_) => break, + _ => break, + } + } + + let _ = read_handle.await; + Ok(()) +} diff --git a/client/src/shared_types.rs b/client/src/shared_types.rs new file mode 100644 index 0000000..73f2389 --- /dev/null +++ b/client/src/shared_types.rs @@ -0,0 +1,47 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ClientMessage { + Join { + username: String, + }, + Leave, + SendMessage { + content: String, + }, + AckReceived { + message_id: String, + intended_recipient: String, // 👈 NEW: who sent the original message + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ChatMessage { + pub id: String, + pub sender: String, + pub content: String, + pub timestamp: u64, + pub delivered_to: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerMessage { + Welcome { + username: String, + }, + Joined { + username: String, + }, + Left { + username: String, + }, + NewMessage(ChatMessage), + DeliveryReceipt { + message_id: String, + received_by: String, + }, + Error { + message: String, + }, +} diff --git a/recording_client.webm b/recording_client.webm new file mode 100644 index 0000000..6bc0ca1 Binary files /dev/null and b/recording_client.webm differ diff --git a/server/Cargo.toml b/server/Cargo.toml new file mode 100644 index 0000000..f5f0409 --- /dev/null +++ b/server/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "server" +version = "0.1.0" +edition = "2024" + +[dependencies] +tokio = { version = "1.0", features = ["full"] } +tokio-tungstenite = "0.20" +tungstenite = "0.20" +sled = "0.34" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +futures-util = "0.3" +uuid = { version = "1.0", features = ["v4"] } +chrono = "0.4" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter"] } +dotenv = "0.15" diff --git a/server/src/main.rs b/server/src/main.rs new file mode 100644 index 0000000..2cb614c --- /dev/null +++ b/server/src/main.rs @@ -0,0 +1,254 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::SystemTime; + +use futures_util::{SinkExt, StreamExt}; +use tokio::net::TcpListener; +use tokio::sync::{Mutex, broadcast}; +use tokio_tungstenite::{accept_async, tungstenite::Message as WsMessage}; + +mod shared_types; +use shared_types::*; +use tracing_subscriber::EnvFilter; + +type WsSink = futures_util::stream::SplitSink< + tokio_tungstenite::WebSocketStream, + WsMessage, +>; +type SharedUsers = Arc>>>>; + +struct ChatServer { + users: SharedUsers, + tx: broadcast::Sender, + user_store: sled::Db, +} + +impl ChatServer { + async fn new() -> Result> { + let (tx, _) = broadcast::channel(100); + let user_store = sled::open("./chat_data")?; + + let mut cleaned = 0; + for item in user_store.scan_prefix(b"user:") { + let (key, _) = item?; + user_store.remove(key)?; + cleaned += 1; + } + user_store.flush()?; + + let users: SharedUsers = Arc::new(Mutex::new(HashMap::new())); + + let tx_clone = tx.clone(); + let users_clone = users.clone(); + + tokio::spawn(async move { + let mut rx = tx_clone.subscribe(); + loop { + match rx.recv().await { + Ok(msg) => { + let sender = match &msg { + ServerMessage::NewMessage(chat_msg) => Some(chat_msg.sender.clone()), + ServerMessage::Joined { username } => Some(username.clone()), + ServerMessage::Left { username } => Some(username.clone()), + _ => None, + }; + + let sinks: Vec<(String, Arc>)> = { + let users_guard = users_clone.lock().await; + users_guard + .iter() + .filter(|(username, _)| { + sender.as_ref().map_or(true, |s| s != *username) + }) + .map(|(username, sink)| (username.clone(), sink.clone())) + .collect() + }; + + for (username, sink) in sinks { + let json = match serde_json::to_string(&msg) { + Ok(json) => json, + Err(_) => continue, + }; + + let _ = sink.lock().await.send(WsMessage::Text(json)).await; + } + } + Err(_) => break, + } + } + }); + + Ok(Self { + users, + tx, + user_store, + }) + } + + pub async fn handle_connection( + users: SharedUsers, + tx: broadcast::Sender, + user_store: sled::Db, + ws_stream: tokio_tungstenite::WebSocketStream, + ) { + let (write_half, mut read_half) = ws_stream.split(); + let mut current_user: Option = None; + let write_arc = Arc::new(Mutex::new(write_half)); + + while let Some(msg) = read_half.next().await { + match msg { + Ok(WsMessage::Text(text)) => match serde_json::from_str::(&text) { + Ok(ClientMessage::Join { username }) => { + let key = format!("user:{}", username); + if user_store.contains_key(&key).unwrap_or(false) { + let error = ServerMessage::Error { + message: "Username already taken".to_string(), + }; + let json = serde_json::to_string(&error).unwrap(); + let _ = write_arc.lock().await.send(WsMessage::Text(json)).await; + break; + } + + user_store.insert(key.clone(), b"1").unwrap(); + user_store.flush().unwrap(); + + current_user = Some(username.clone()); + + { + let mut users_guard = users.lock().await; + users_guard.insert(username.clone(), write_arc.clone()); + } + + let welcome = ServerMessage::Welcome { + username: username.clone(), + }; + let _ = tx.send(welcome); + + let join_msg = ServerMessage::Joined { + username: username.clone(), + }; + let _ = tx.send(join_msg); + } + Ok(ClientMessage::SendMessage { content }) => { + if let Some(ref sender) = current_user { + let msg = ChatMessage { + id: uuid::Uuid::new_v4().to_string(), + sender: sender.clone(), + content, + timestamp: SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_millis() as u64, + delivered_to: vec![], + }; + + let server_msg = ServerMessage::NewMessage(msg); + let _ = tx.send(server_msg); + } + } + Ok(ClientMessage::AckReceived { + message_id, + intended_recipient, + }) => { + if let Some(ref username) = current_user { + let receipt = ServerMessage::DeliveryReceipt { + message_id, + received_by: username.clone(), + }; + + { + let users_guard = users.lock().await; + if let Some(recipient_sink) = users_guard.get(&intended_recipient) { + let json = match serde_json::to_string(&receipt) { + Ok(json) => json, + Err(_) => return, + }; + + let _ = recipient_sink + .lock() + .await + .send(WsMessage::Text(json)) + .await; + } + } + } + } + Ok(ClientMessage::Leave) => { + if let Some(ref username) = current_user { + let key = format!("user:{}", username); + user_store.remove(key).unwrap(); + user_store.flush().unwrap(); + + { + let mut users_guard = users.lock().await; + users_guard.remove(username); + } + + let leave_msg = ServerMessage::Left { + username: username.clone(), + }; + let _ = tx.send(leave_msg); + } + break; + } + Err(_) => {} + }, + Ok(WsMessage::Close(_)) | Err(_) => { + if let Some(ref username) = current_user { + let key = format!("user:{}", username); + user_store.remove(key).unwrap(); + { + let mut users_guard = users.lock().await; + users_guard.remove(username); + } + let leave_msg = ServerMessage::Left { + username: username.clone(), + }; + let _ = tx.send(leave_msg); + } + break; + } + _ => {} + } + } + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenv::dotenv().ok(); + tracing_subscriber::fmt() + .with_env_filter(EnvFilter::from_default_env()) + .init(); + + let server = Arc::new(Mutex::new(ChatServer::new().await?)); + let addr = "127.0.0.1:8080"; + let listener = TcpListener::bind(addr).await?; + + loop { + match listener.accept().await { + Ok((stream, _)) => { + let server_clone = server.clone(); + + tokio::spawn(async move { + match accept_async(stream).await { + Ok(ws_stream) => { + let (users, tx, user_store) = { + let server = server_clone.lock().await; + ( + server.users.clone(), + server.tx.clone(), + server.user_store.clone(), + ) + }; + + ChatServer::handle_connection(users, tx, user_store, ws_stream).await; + } + Err(_) => {} + } + }); + } + Err(_) => continue, + } + } +} diff --git a/server/src/shared_types.rs b/server/src/shared_types.rs new file mode 100644 index 0000000..c3bed90 --- /dev/null +++ b/server/src/shared_types.rs @@ -0,0 +1,46 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ClientMessage { + Join { + username: String, + }, + Leave, + SendMessage { + content: String, + }, + AckReceived { + message_id: String, + intended_recipient: String, + }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct ChatMessage { + pub id: String, + pub sender: String, + pub content: String, + pub timestamp: u64, + pub delivered_to: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum ServerMessage { + Welcome { + username: String, + }, + Joined { + username: String, + }, + Left { + username: String, + }, + NewMessage(ChatMessage), + DeliveryReceipt { + message_id: String, + received_by: String, + }, + Error { + message: String, + }, +}