diff --git a/.github/workflows/README.md b/.github/workflows/README.md new file mode 100644 index 0000000..648f62f --- /dev/null +++ b/.github/workflows/README.md @@ -0,0 +1,144 @@ +# GitHub Actions CI/CD + +This directory contains the continuous integration workflows for the Simple Chat application. + +## Workflows + +### `ci.yml` - Continuous Integration + +**Triggers:** +- Push to `main` branch +- Pull requests to `main` branch + +**Jobs:** + +#### 1. Code Formatting Check (`format`) +- **Platform:** Ubuntu Latest +- **Timeout:** 10 minutes +- **Purpose:** Ensures code follows Rust formatting standards +- **Command:** `cargo fmt --all -- --check` + +#### 2. Linting (`lint`) +- **Platform:** Ubuntu Latest +- **Timeout:** 15 minutes +- **Purpose:** Static analysis and linting with Clippy +- **Command:** `cargo clippy --all-targets --all-features -- -D warnings` +- **Features:** Treats warnings as errors for high code quality + +#### 3. Unit and Integration Tests (`test`) +- **Platforms:** Ubuntu, Windows, macOS +- **Timeout:** 30 minutes +- **Purpose:** Comprehensive testing across platforms +- **Tests:** + - Unit tests (`cargo test --lib`) + - Integration tests (`cargo test --tests`) + - Binary compilation verification + +#### 4. End-to-End Integration Test (`e2e`) +- **Platform:** Ubuntu Latest +- **Timeout:** 20 minutes +- **Purpose:** Full system integration testing +- **Tests:** + 1. **Basic Client-Server Communication** + - Starts server in background + - Connects client and sends test message + - Verifies successful communication + + 2. **Multiple Clients Test** + - Spawns multiple concurrent clients + - Tests message exchange between users + - Validates server handles concurrent connections + + 3. **Server Responsiveness** + - Checks server remains responsive after load + - Validates connection acceptance after multiple clients + - Ensures proper resource management + +#### 5. Build Verification (`build-check`) +- **Platforms:** Ubuntu, Windows, macOS +- **Timeout:** 15 minutes +- **Purpose:** Cross-platform build verification +- **Tests:** + - Debug build compilation + - Release build compilation + - Binary help command functionality + +#### 6. CI Success Summary (`ci-success`) +- **Platform:** Ubuntu Latest +- **Purpose:** Aggregates results from all jobs +- **Behavior:** Fails if any dependent job fails + +## Features + +### Robust Error Handling +- **Timeouts:** All jobs have appropriate timeout limits +- **Cleanup:** Proper server process cleanup in E2E tests +- **Trap handling:** Signal handling for graceful shutdowns +- **Exit codes:** Proper error reporting and propagation + +### Performance Optimization +- **Caching:** Rust dependencies cached using `Swatinem/rust-cache` +- **Parallel execution:** Jobs run concurrently where possible +- **Incremental builds:** Cache keys based on `Cargo.lock` checksums + +### Cross-Platform Support +- **Linux (Ubuntu):** Primary development and testing platform +- **Windows:** Compatibility verification +- **macOS:** Apple silicon and x86_64 support +- **Binary verification:** Platform-specific binary testing + +### Comprehensive Testing Strategy +- **Unit Tests:** Core functionality and message protocol +- **Integration Tests:** Server-client interaction +- **End-to-End Tests:** Real-world usage scenarios +- **Build Tests:** Compilation verification across platforms + +## Usage + +### Local Development +```bash +# Run the same checks locally before pushing +cargo fmt --all -- --check +cargo clippy --all-targets --all-features -- -D warnings +cargo test +cargo build --release +``` + +### Monitoring CI +- **Status:** Check the Actions tab in GitHub repository +- **Logs:** View detailed logs for each job step +- **Artifacts:** Download build artifacts if configured +- **Notifications:** Configure GitHub notifications for failures + +### Troubleshooting + +**Common Issues:** +- **Format failures:** Run `cargo fmt` to fix formatting +- **Clippy warnings:** Fix warnings or add `#[allow(...)]` where appropriate +- **Test timeouts:** Check for infinite loops or blocking operations +- **E2E failures:** Verify server starts correctly and ports are available + +**Debug Steps:** +1. Check job logs for specific error messages +2. Reproduce issues locally using the same commands +3. Verify environment variables and dependencies +4. Test on the same platform as failing CI job + +## Configuration + +### Environment Variables +- `CARGO_TERM_COLOR=always` - Colored output in CI logs +- `RUST_BACKTRACE=1` - Full backtraces on panics +- `RUST_LOG=info` - Logging level for integration tests + +### Timeouts +- **Format:** 10 minutes (formatting is fast) +- **Lint:** 15 minutes (clippy analysis takes time) +- **Tests:** 30 minutes (includes multiple platforms) +- **E2E:** 20 minutes (includes server startup and multiple test scenarios) +- **Build:** 15 minutes (cross-platform builds) + +### Dependencies +- **Rust toolchain:** Managed by `dtolnay/rust-toolchain` +- **Caching:** Handled by `Swatinem/rust-cache` +- **Checkout:** Uses `actions/checkout@v4` \ No newline at end of file diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..6ad0ac7 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,268 @@ +name: Continuous Integration + +on: + push: + branches: [main] + pull_request: + branches: [main] + +env: + CARGO_TERM_COLOR: always + RUST_BACKTRACE: 1 + +jobs: + # Code formatting check + format: + name: Code Formatting Check + runs-on: ubuntu-latest + timeout-minutes: 10 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: rustfmt + + - name: Check code formatting + run: cargo fmt --all -- --check + + # Linting with clippy + lint: + name: Linting (Clippy) + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + with: + components: clippy + + - name: Cache Rust dependencies + uses: Swatinem/rust-cache@v2 + with: + key: clippy-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Run clippy lints + run: cargo clippy --all-targets --all-features -- -D warnings + + # Unit and integration tests + test: + name: Unit and Integration Tests + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + rust-version: [stable] + runs-on: ${{ matrix.os }} + timeout-minutes: 30 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@master + with: + toolchain: ${{ matrix.rust-version }} + + - name: Cache Rust dependencies + uses: Swatinem/rust-cache@v2 + with: + key: test-${{ runner.os }}-${{ matrix.rust-version }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Run unit tests + run: cargo test --lib + timeout-minutes: 10 + + - name: Run integration tests + run: cargo test --tests + timeout-minutes: 15 + env: + RUST_LOG: info + + - name: Test binary compilation + run: | + cargo build --bin server --release + cargo build --bin client --release + + # End-to-end integration test + e2e: + name: End-to-End Integration Test + runs-on: ubuntu-latest + timeout-minutes: 20 + needs: [format, lint] + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache Rust dependencies + uses: Swatinem/rust-cache@v2 + with: + key: e2e-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build binaries + run: | + cargo build --bin server --release + cargo build --bin client --release + timeout-minutes: 10 + + - name: Run end-to-end test + timeout-minutes: 5 + run: | + set -e # Exit on any error + + echo "๐Ÿš€ Starting end-to-end integration test..." + + # Start server in background + echo "๐Ÿ“ก Starting chat server..." + timeout 300s ./target/release/server --host 127.0.0.1 --port 8080 & + SERVER_PID=$! + echo "Server started with PID: $SERVER_PID" + + # Wait for server to start up + echo "โณ Waiting for server startup..." + sleep 3 + + # Function to cleanup server + cleanup() { + echo "๐Ÿงน Cleaning up server process..." + if kill -0 $SERVER_PID 2>/dev/null; then + kill $SERVER_PID 2>/dev/null || true + sleep 2 + if kill -0 $SERVER_PID 2>/dev/null; then + kill -9 $SERVER_PID 2>/dev/null || true + fi + fi + } + + # Set trap for cleanup + trap cleanup EXIT + + # Test 1: Basic client connection and message sending + echo "๐Ÿ‘ค Testing client connection and message sending..." + timeout 30s bash -c ' + echo "send Hello from CI test!" | ./target/release/client --username ci_user --host 127.0.0.1 --port 8080 || exit 1 + ' & + CLIENT_PID=$! + + # Wait for client with timeout + if ! wait $CLIENT_PID; then + echo "โŒ Client test failed or timed out" + exit 1 + fi + + echo "โœ… Client connection and message test passed" + + # Test 2: Multiple clients test + echo "๐Ÿ‘ฅ Testing multiple clients..." + timeout 30s bash -c ' + echo "send Message from user1" | ./target/release/client --username user1 --host 127.0.0.1 --port 8080 & + CLIENT1_PID=$! + + sleep 1 + + echo "send Message from user2" | ./target/release/client --username user2 --host 127.0.0.1 --port 8080 & + CLIENT2_PID=$! + + wait $CLIENT1_PID && wait $CLIENT2_PID + ' || { + echo "โŒ Multiple clients test failed" + exit 1 + } + + echo "โœ… Multiple clients test passed" + + # Test 3: Server responsiveness check + echo "๐Ÿ” Checking server responsiveness..." + if ! kill -0 $SERVER_PID 2>/dev/null; then + echo "โŒ Server process died unexpectedly" + exit 1 + fi + + # Test connection to server is still possible + timeout 10s bash -c ' + echo "send Final test message" | ./target/release/client --username final_user --host 127.0.0.1 --port 8080 + ' || { + echo "โŒ Server responsiveness test failed" + exit 1 + } + + echo "โœ… Server responsiveness test passed" + echo "๐ŸŽ‰ All end-to-end tests completed successfully!" + + # Build verification on all platforms + build-check: + name: Build Verification + strategy: + fail-fast: false + matrix: + os: [ubuntu-latest, windows-latest, macos-latest] + runs-on: ${{ matrix.os }} + timeout-minutes: 15 + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - name: Cache Rust dependencies + uses: Swatinem/rust-cache@v2 + with: + key: build-${{ runner.os }}-${{ hashFiles('**/Cargo.lock') }} + + - name: Build all binaries (debug) + run: cargo build --all-targets + + - name: Build all binaries (release) + run: cargo build --all-targets --release + + - name: Verify binary functionality (Unix) + if: runner.os != 'Windows' + run: | + ./target/release/server --help + ./target/release/client --help + + - name: Verify binary functionality (Windows) + if: runner.os == 'Windows' + run: | + ./target/release/server.exe --help + ./target/release/client.exe --help + + # Summary job that depends on all others + ci-success: + name: CI Success + runs-on: ubuntu-latest + needs: [format, lint, test, e2e, build-check] + if: always() + steps: + - name: Check all jobs + run: | + if [[ "${{ needs.format.result }}" != "success" ]]; then + echo "โŒ Format job failed" + exit 1 + fi + if [[ "${{ needs.lint.result }}" != "success" ]]; then + echo "โŒ Lint job failed" + exit 1 + fi + if [[ "${{ needs.test.result }}" != "success" ]]; then + echo "โŒ Test job failed" + exit 1 + fi + if [[ "${{ needs.e2e.result }}" != "success" ]]; then + echo "โŒ E2E job failed" + exit 1 + fi + if [[ "${{ needs.build-check.result }}" != "success" ]]; then + echo "โŒ Build check job failed" + exit 1 + fi + echo "โœ… All CI jobs passed successfully!" \ No newline at end of file diff --git a/.gitignore b/.gitignore index 6985cf1..196e176 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,8 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + + +# Added by cargo + +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..1da6a6d --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "simple-chat" +version = "0.1.0" +edition = "2021" + +[[bin]] +name = "server" +path = "src/bin/server.rs" + +[[bin]] +name = "client" +path = "src/bin/client.rs" + +[dependencies] +tokio = { version = "1.47.1", features = ["full"] } +serde = { version = "1.0.219", features = ["derive"] } +serde_json = "1.0.143" +clap = { version = "4.5.46", features = ["derive", "env"] } +anyhow = "1.0.99" +uuid = { version = "1.18.1", features = ["v4", "serde"] } +bytes = "1.10.1" +tokio-util = { version = "0.7.16", features = ["codec"] } +futures = "0.3.31" +log = "0.4.27" +env_logger = "0.11.8" +tokio-stream = "0.1.17" +crossterm = "0.29.0" + +[dev-dependencies] +tokio-test = "0.4.4" +chrono = { version = "0.4", features = ["std"] } diff --git a/src/bin/client.rs b/src/bin/client.rs new file mode 100644 index 0000000..d2afd4e --- /dev/null +++ b/src/bin/client.rs @@ -0,0 +1,426 @@ +use anyhow::{anyhow, Context, Result}; +use clap::Parser; +use futures::{SinkExt, StreamExt}; +use log::{error, info}; +use simple_chat::Message; +use std::io::{self, Write}; +use std::time::Duration; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::TcpStream; +use tokio::sync::mpsc; +use tokio::time::timeout; +use tokio_util::codec::{FramedRead, FramedWrite}; + +#[derive(Parser)] +#[command(author, version, about = "Simple Chat Client", long_about = None)] +struct ClientArgs { + #[arg(short = 'H', long, env = "CHAT_HOST", default_value = "localhost")] + host: String, + + #[arg(short, long, env = "CHAT_PORT", default_value_t = 8080)] + port: u16, + + #[arg(short, long, env = "CHAT_USERNAME")] + username: String, +} + +#[derive(Debug, Clone)] +enum ClientCommand { + Send(String), + Leave, + InvalidCommand(String), +} + +impl ClientCommand { + fn parse(input: &str) -> Self { + let input = input.trim(); + + if input.is_empty() { + return ClientCommand::InvalidCommand("Empty command".to_string()); + } + + if input == "leave" { + return ClientCommand::Leave; + } + + if let Some(message) = input.strip_prefix("send ") { + if message.trim().is_empty() { + return ClientCommand::InvalidCommand("Empty message content".to_string()); + } + return ClientCommand::Send(message.to_string()); + } + + if input == "send" { + return ClientCommand::InvalidCommand("Empty message content".to_string()); + } + + ClientCommand::InvalidCommand(format!("Unknown command: {}", input)) + } +} + +struct ChatClient { + username: String, + server_addr: String, + framed_read: + Option>, + framed_write: + Option>, +} + +impl ChatClient { + fn new(username: String, host: String, port: u16) -> Self { + Self { + username, + server_addr: format!("{}:{}", host, port), + framed_read: None, + framed_write: None, + } + } + + async fn connect(&mut self) -> Result<()> { + info!("Connecting to server at {}", self.server_addr); + + let stream = timeout( + Duration::from_secs(10), + TcpStream::connect(&self.server_addr), + ) + .await + .context("Connection timeout")? + .context("Failed to connect to server")?; + + let (read_half, write_half) = stream.into_split(); + + self.framed_read = Some(FramedRead::new(read_half, simple_chat::codec::MessageCodec)); + self.framed_write = Some(FramedWrite::new( + write_half, + simple_chat::codec::MessageCodec, + )); + + // Send join message + let join_message = Message::Join { + username: self.username.clone(), + }; + + if let Some(ref mut writer) = self.framed_write { + writer + .send(join_message) + .await + .context("Failed to send join message")?; + info!("Sent join request for user '{}'", self.username); + } + + Ok(()) + } + + async fn run(&mut self) -> Result<()> { + self.connect().await?; + + let (cmd_tx, mut cmd_rx) = mpsc::channel::(100); + let (shutdown_tx, shutdown_rx) = mpsc::channel::<()>(1); + let mut main_shutdown_rx = shutdown_rx; + + // Spawn task for reading server messages + let mut reader = self.framed_read.take().unwrap(); + let shutdown_tx_clone = shutdown_tx.clone(); + let server_task = tokio::spawn(async move { + loop { + match reader.next().await { + Some(Ok(message)) => { + Self::display_message(&message); + + // Check for server disconnection messages + if let Message::Error { message: error_msg } = &message { + if error_msg.contains("Server error") + || error_msg.contains("Connection") + { + error!("Server error received: {}", error_msg); + let _ = shutdown_tx_clone.send(()).await; + break; + } + } + } + Some(Err(e)) => { + error!("Error reading from server: {}", e); + let _ = shutdown_tx_clone.send(()).await; + break; + } + None => { + info!("Server disconnected"); + let _ = shutdown_tx_clone.send(()).await; + break; + } + } + } + }); + + // Spawn task for reading user input + let cmd_tx_clone = cmd_tx.clone(); + let shutdown_tx_clone = shutdown_tx.clone(); + let input_task = + tokio::spawn( + async move { Self::handle_user_input(cmd_tx_clone, shutdown_tx_clone).await }, + ); + + // Main command processing loop + let mut writer = self.framed_write.take().unwrap(); + + println!("๐Ÿš€ Connected to chat server!"); + println!("๐Ÿ’ก Commands:"); + println!(" send - Send a message"); + println!(" leave - Leave the chat"); + println!(); + + loop { + tokio::select! { + command = cmd_rx.recv() => { + match command { + Some(ClientCommand::Send(content)) => { + let message = Message::ChatMessage { + username: self.username.clone(), + content, + }; + + if let Err(e) = writer.send(message).await { + error!("Failed to send message: {}", e); + break; + } + }, + Some(ClientCommand::Leave) => { + info!("User requested to leave"); + let leave_message = Message::Leave { + username: self.username.clone(), + }; + let _ = writer.send(leave_message).await; + break; + }, + Some(ClientCommand::InvalidCommand(error)) => { + println!("โŒ Error: {}", error); + println!("๐Ÿ’ก Use 'send ' to send a message or 'leave' to exit"); + }, + None => { + info!("Command channel closed"); + break; + } + } + }, + _ = main_shutdown_rx.recv() => { + info!("Main loop received shutdown signal"); + break; + } + } + } + + // Cleanup + server_task.abort(); + input_task.abort(); + + Ok(()) + } + + fn display_message(message: &Message) { + match message { + Message::ChatMessage { username, content } => { + println!("[{}]: {}", username, content); + } + Message::UserJoined { username } => { + println!("๐ŸŸข {} joined the chat", username); + } + Message::UserLeft { username } => { + println!("๐Ÿ”ด {} left the chat", username); + } + Message::Error { message } => { + println!("โŒ Server error: {}", message); + } + _ => { + // Handle other message types if needed + info!("Received message: {:?}", message); + } + } + + // Flush stdout to ensure immediate display + io::stdout().flush().unwrap_or(()); + } + + async fn handle_user_input(cmd_tx: mpsc::Sender, shutdown_tx: mpsc::Sender<()>) { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin); + let mut line = String::new(); + + loop { + print!("> "); + io::stdout().flush().unwrap_or(()); + + line.clear(); + + match reader.read_line(&mut line).await { + Ok(0) => { + // EOF reached (Ctrl+D) + info!("EOF received, shutting down"); + let _ = shutdown_tx.send(()).await; + break; + } + Ok(_) => { + let command = ClientCommand::parse(&line); + + // Check for leave command to trigger shutdown + if matches!(command, ClientCommand::Leave) { + let _ = cmd_tx.send(command).await; + let _ = shutdown_tx.send(()).await; + break; + } + + if cmd_tx.send(command).await.is_err() { + error!("Failed to send command"); + break; + } + } + Err(e) => { + error!("Error reading input: {}", e); + let _ = shutdown_tx.send(()).await; + break; + } + } + } + } +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let args = ClientArgs::parse(); + + // Validate username + if let Err(e) = simple_chat::validate_username(&args.username) { + return Err(anyhow!("Invalid username: {}", e)); + } + + info!("Starting chat client for user '{}'", args.username); + info!("Connecting to server at {}:{}", args.host, args.port); + + let mut client = ChatClient::new(args.username.clone(), args.host, args.port); + + match client.run().await { + Ok(_) => { + println!("๐Ÿ‘‹ Goodbye, {}!", args.username); + Ok(()) + } + Err(e) => { + error!("Client error: {}", e); + println!("โŒ Connection error: {}", e); + println!("๐Ÿ’ก Please check that the server is running and try again."); + Err(e) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_command_parsing_send() { + let cmd = ClientCommand::parse("send Hello world!"); + match cmd { + ClientCommand::Send(msg) => assert_eq!(msg, "Hello world!"), + _ => panic!("Expected Send command"), + } + } + + #[test] + fn test_command_parsing_send_with_extra_spaces() { + let cmd = ClientCommand::parse(" send Hello world! "); + match cmd { + ClientCommand::Send(msg) => assert_eq!(msg, " Hello world!"), + _ => panic!("Expected Send command"), + } + } + + #[test] + fn test_command_parsing_leave() { + let cmd = ClientCommand::parse("leave"); + matches!(cmd, ClientCommand::Leave); + + let cmd = ClientCommand::parse(" leave "); + matches!(cmd, ClientCommand::Leave); + } + + #[test] + fn test_command_parsing_invalid_empty() { + let cmd = ClientCommand::parse(""); + match cmd { + ClientCommand::InvalidCommand(msg) => assert_eq!(msg, "Empty command"), + _ => panic!("Expected InvalidCommand"), + } + + let cmd = ClientCommand::parse(" "); + match cmd { + ClientCommand::InvalidCommand(msg) => assert_eq!(msg, "Empty command"), + _ => panic!("Expected InvalidCommand"), + } + } + + #[test] + fn test_command_parsing_invalid_unknown() { + let cmd = ClientCommand::parse("unknown command"); + match cmd { + ClientCommand::InvalidCommand(msg) => { + assert_eq!(msg, "Unknown command: unknown command") + } + _ => panic!("Expected InvalidCommand"), + } + } + + #[test] + fn test_command_parsing_send_empty_message() { + let cmd = ClientCommand::parse("send "); + match cmd { + ClientCommand::InvalidCommand(msg) => assert_eq!(msg, "Empty message content"), + _ => panic!("Expected InvalidCommand"), + } + + let cmd = ClientCommand::parse("send "); + match cmd { + ClientCommand::InvalidCommand(msg) => assert_eq!(msg, "Empty message content"), + _ => panic!("Expected InvalidCommand"), + } + } + + #[test] + fn test_message_display_formatting() { + // Test that display_message doesn't panic + // Since it prints to stdout, we can't easily test the output + // but we can ensure it doesn't crash + + let messages = vec![ + Message::ChatMessage { + username: "alice".to_string(), + content: "Hello world!".to_string(), + }, + Message::UserJoined { + username: "bob".to_string(), + }, + Message::UserLeft { + username: "charlie".to_string(), + }, + Message::Error { + message: "Test error".to_string(), + }, + ]; + + for message in messages { + ChatClient::display_message(&message); + } + } + + #[test] + fn test_client_creation() { + let client = ChatClient::new("testuser".to_string(), "localhost".to_string(), 8080); + + assert_eq!(client.username, "testuser"); + assert_eq!(client.server_addr, "localhost:8080"); + assert!(client.framed_read.is_none()); + assert!(client.framed_write.is_none()); + } +} diff --git a/src/bin/server.rs b/src/bin/server.rs new file mode 100644 index 0000000..44047e1 --- /dev/null +++ b/src/bin/server.rs @@ -0,0 +1,337 @@ +use anyhow::Result; +use clap::Parser; +use futures::{SinkExt, StreamExt}; +use log::{error, info, warn}; +use simple_chat::{ChatResult, Message}; +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::net::{TcpListener, TcpStream}; +use tokio::signal; +use tokio::sync::{broadcast, RwLock}; +use tokio::time::{timeout, Duration}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +#[derive(Parser)] +#[command(author, version, about = "Simple Chat Server", long_about = None)] +struct ServerArgs { + #[arg(short = 'H', long, default_value = "127.0.0.1")] + host: String, + + #[arg(short, long, default_value_t = 8080)] + port: u16, + + #[arg(long, default_value_t = 100)] + max_users: usize, +} + +#[derive(Clone)] +struct UserConnection { + #[allow(dead_code)] + sender: broadcast::Sender, + #[allow(dead_code)] + addr: SocketAddr, +} + +type UserManager = Arc>>; + +#[derive(Clone)] +struct ChatServer { + users: UserManager, + message_tx: broadcast::Sender, + max_users: usize, +} + +impl ChatServer { + fn new(max_users: usize) -> Self { + let (message_tx, _) = broadcast::channel(1000); + + Self { + users: Arc::new(RwLock::new(HashMap::new())), + message_tx, + max_users, + } + } + + async fn handle_connection(&self, stream: TcpStream, addr: SocketAddr) -> Result<()> { + info!("New connection from {}", addr); + + let (reader, writer) = stream.into_split(); + let mut framed_read = FramedRead::new(reader, simple_chat::codec::MessageCodec); + let mut framed_write = FramedWrite::new(writer, simple_chat::codec::MessageCodec); + + let mut username: Option = None; + let mut message_rx = self.message_tx.subscribe(); + + // Set connection timeout for initial handshake + let handshake_timeout = Duration::from_secs(30); + + loop { + tokio::select! { + // Handle incoming messages from client + result = timeout(handshake_timeout, framed_read.next()) => { + + match result { + Ok(Some(Ok(message))) => { + match self.process_client_message(message, &mut username, addr, &mut framed_write).await { + Ok(should_continue) => { + if !should_continue { + break; + } + }, + Err(e) => { + error!("Error processing message from {}: {}", addr, e); + let error_msg = Message::Error { + message: format!("Server error: {}", e) + }; + let _ = framed_write.send(error_msg).await; + break; + } + } + }, + Ok(Some(Err(e))) => { + error!("Decode error from {}: {}", addr, e); + break; + }, + Ok(None) => { + info!("Client {} disconnected", addr); + break; + }, + Err(_) => { + warn!("Connection timeout from {}", addr); + break; + } + } + }, + + // Handle broadcast messages to send to client + result = message_rx.recv() => { + match result { + Ok(message) => { + // Only send messages to authenticated users + if username.is_some() { + if let Err(e) = framed_write.send(message).await { + error!("Failed to send message to {}: {}", addr, e); + break; + } + } + }, + Err(broadcast::error::RecvError::Closed) => { + info!("Message channel closed, shutting down connection {}", addr); + break; + }, + Err(broadcast::error::RecvError::Lagged(_)) => { + warn!("Message channel lagged for {}", addr); + continue; + } + } + } + } + } + + // Cleanup on disconnect + if let Some(ref user) = username { + self.remove_user(user).await; + info!("User '{}' disconnected from {}", user, addr); + + // Broadcast user left message + let leave_message = Message::UserLeft { + username: user.clone(), + }; + let _ = self.message_tx.send(leave_message); + } + + Ok(()) + } + + async fn process_client_message( + &self, + message: Message, + username: &mut Option, + addr: SocketAddr, + writer: &mut FramedWrite, + ) -> Result { + match message { + Message::Join { + username: requested_username, + } => { + if username.is_some() { + let error_msg = Message::Error { + message: "Already authenticated".to_string(), + }; + writer.send(error_msg).await?; + return Ok(true); + } + + match self.add_user(&requested_username, addr).await { + Ok(()) => { + *username = Some(requested_username.clone()); + info!("User '{}' joined from {}", requested_username, addr); + + // Send confirmation to the joining user + let join_confirm = Message::UserJoined { + username: requested_username.clone(), + }; + writer.send(join_confirm).await?; + + // Broadcast to all other users + let broadcast_message = Message::UserJoined { + username: requested_username, + }; + let _ = self.message_tx.send(broadcast_message); + + Ok(true) + } + Err(e) => { + let error_msg = Message::Error { + message: e.to_string(), + }; + writer.send(error_msg).await?; + Ok(false) // Disconnect user + } + } + } + + Message::Leave { .. } => { + // Client requesting to leave + Ok(false) // This will trigger cleanup in the main loop + } + + Message::ChatMessage { + username: msg_username, + content, + } => { + if let Some(ref current_user) = username { + if msg_username != *current_user { + let error_msg = Message::Error { + message: "Username mismatch".to_string(), + }; + writer.send(error_msg).await?; + return Ok(true); + } + + // Broadcast the chat message + let chat_message = Message::ChatMessage { + username: current_user.clone(), + content, + }; + let _ = self.message_tx.send(chat_message); + Ok(true) + } else { + let error_msg = Message::Error { + message: "Not authenticated".to_string(), + }; + writer.send(error_msg).await?; + Ok(true) + } + } + + _ => { + let error_msg = Message::Error { + message: "Invalid message type".to_string(), + }; + writer.send(error_msg).await?; + Ok(true) + } + } + } + + async fn add_user(&self, username: &str, addr: SocketAddr) -> ChatResult<()> { + let mut users = self.users.write().await; + + if users.len() >= self.max_users { + return Err(simple_chat::ChatError::InvalidUsername( + "Server is full".to_string(), + )); + } + + if users.contains_key(username) { + return Err(simple_chat::ChatError::InvalidUsername( + "Username already taken".to_string(), + )); + } + + let (sender, _) = broadcast::channel(100); + let connection = UserConnection { sender, addr }; + users.insert(username.to_string(), connection); + + Ok(()) + } + + async fn remove_user(&self, username: &str) { + let mut users = self.users.write().await; + users.remove(username); + } + + async fn get_user_count(&self) -> usize { + let users = self.users.read().await; + users.len() + } +} + +#[tokio::main] +async fn main() -> Result<()> { + // Initialize logging + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + let args = ServerArgs::parse(); + let addr = format!("{}:{}", args.host, args.port); + + info!("Starting chat server on {}", addr); + info!("Maximum users: {}", args.max_users); + + // Create the chat server + let server = ChatServer::new(args.max_users); + + // Bind TCP listener + let listener = TcpListener::bind(&addr).await?; + info!("Server listening on {}", addr); + + // Setup graceful shutdown + let server_clone = server.clone(); + let mut shutdown_task = tokio::spawn(async move { + match signal::ctrl_c().await { + Ok(_) => { + info!("Received SIGINT, shutting down gracefully..."); + let user_count = server_clone.get_user_count().await; + if user_count > 0 { + info!("Disconnecting {} active users...", user_count); + // The broadcast channel will be closed when server is dropped + } + } + Err(e) => { + error!("Failed to listen for shutdown signal: {}", e); + } + } + }); + + // Main server loop + loop { + tokio::select! { + result = listener.accept() => { + match result { + Ok((stream, addr)) => { + let server_clone = server.clone(); + tokio::spawn(async move { + if let Err(e) = server_clone.handle_connection(stream, addr).await { + error!("Connection handler error for {}: {}", addr, e); + } + }); + } + Err(e) => { + error!("Failed to accept connection: {}", e); + continue; + } + } + } + + _ = &mut shutdown_task => { + info!("Shutdown signal received, stopping server..."); + break; + } + } + } + + info!("Chat server shutdown complete"); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..3db3520 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,430 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum ChatError { + InvalidUsername(String), + SerializationError(String), + NetworkError(String), +} + +impl fmt::Display for ChatError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ChatError::InvalidUsername(msg) => write!(f, "Invalid username: {}", msg), + ChatError::SerializationError(msg) => write!(f, "Serialization error: {}", msg), + ChatError::NetworkError(msg) => write!(f, "Network error: {}", msg), + } + } +} + +impl std::error::Error for ChatError {} + +impl From for ChatError { + fn from(error: std::io::Error) -> Self { + ChatError::NetworkError(error.to_string()) + } +} + +pub type ChatResult = Result; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub enum Message { + Join { username: String }, + Leave { username: String }, + ChatMessage { username: String, content: String }, + UserJoined { username: String }, + UserLeft { username: String }, + Error { message: String }, +} + +impl Message { + pub fn validate(&self) -> ChatResult<()> { + match self { + Message::Join { username } + | Message::Leave { username } + | Message::UserJoined { username } + | Message::UserLeft { username } => validate_username(username), + Message::ChatMessage { username, content } => { + validate_username(username)?; + if content.trim().is_empty() { + return Err(ChatError::InvalidUsername( + "Message content cannot be empty".to_string(), + )); + } + if content.len() > 1000 { + return Err(ChatError::InvalidUsername( + "Message content too long (max 1000 characters)".to_string(), + )); + } + Ok(()) + } + Message::Error { .. } => Ok(()), + } + } +} + +pub fn validate_username(username: &str) -> ChatResult<()> { + if username.trim().is_empty() { + return Err(ChatError::InvalidUsername( + "Username cannot be empty".to_string(), + )); + } + + if username.len() < 2 { + return Err(ChatError::InvalidUsername( + "Username must be at least 2 characters long".to_string(), + )); + } + + if username.len() > 20 { + return Err(ChatError::InvalidUsername( + "Username must be at most 20 characters long".to_string(), + )); + } + + if !username + .chars() + .all(|c| c.is_alphanumeric() || c == '_' || c == '-') + { + return Err(ChatError::InvalidUsername( + "Username can only contain letters, numbers, underscores, and hyphens".to_string(), + )); + } + + Ok(()) +} + +pub mod codec { + use super::{ChatError, Message}; + use bytes::{BufMut, BytesMut}; + use tokio_util::codec::{Decoder, Encoder}; + + pub struct MessageCodec; + + impl Decoder for MessageCodec { + type Item = Message; + type Error = ChatError; + + fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { + if src.is_empty() { + return Ok(None); + } + + if let Some(newline_pos) = src.iter().position(|&b| b == b'\n') { + let line = src.split_to(newline_pos + 1); + let line = &line[..line.len() - 1]; + let message: Message = serde_json::from_slice(line) + .map_err(|e| ChatError::SerializationError(e.to_string()))?; + message.validate()?; + Ok(Some(message)) + } else { + Ok(None) + } + } + } + + impl Encoder for MessageCodec { + type Error = ChatError; + + fn encode(&mut self, item: Message, dst: &mut BytesMut) -> Result<(), Self::Error> { + item.validate()?; + let json = serde_json::to_vec(&item) + .map_err(|e| ChatError::SerializationError(e.to_string()))?; + dst.reserve(json.len() + 1); + dst.put_slice(&json); + dst.put_u8(b'\n'); + Ok(()) + } + } +} + +pub mod stream_utils { + use super::codec::MessageCodec; + use super::{ChatError, ChatResult, Message}; + use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader}; + use tokio_util::codec::{FramedRead, FramedWrite}; + + pub async fn send_message(writer: &mut W, message: &Message) -> ChatResult<()> + where + W: AsyncWrite + Unpin, + { + message.validate()?; + let json = serde_json::to_string(message) + .map_err(|e| ChatError::SerializationError(e.to_string()))?; + let data = format!("{}\n", json); + writer + .write_all(data.as_bytes()) + .await + .map_err(|e| ChatError::NetworkError(e.to_string()))?; + writer + .flush() + .await + .map_err(|e| ChatError::NetworkError(e.to_string()))?; + Ok(()) + } + + pub async fn receive_message(reader: &mut BufReader) -> ChatResult + where + R: AsyncRead + Unpin, + { + let mut line = String::new(); + reader + .read_line(&mut line) + .await + .map_err(|e| ChatError::NetworkError(e.to_string()))?; + + if line.is_empty() { + return Err(ChatError::NetworkError("Connection closed".to_string())); + } + + let line = line.trim(); + let message: Message = + serde_json::from_str(line).map_err(|e| ChatError::SerializationError(e.to_string()))?; + message.validate()?; + Ok(message) + } + + pub fn create_framed_read(reader: R) -> FramedRead + where + R: AsyncRead, + { + FramedRead::new(reader, MessageCodec) + } + + pub fn create_framed_write(writer: W) -> FramedWrite + where + W: AsyncWrite, + { + FramedWrite::new(writer, MessageCodec) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_message_serialization() { + let message = Message::ChatMessage { + username: "alice".to_string(), + content: "Hello, world!".to_string(), + }; + + let json = serde_json::to_string(&message).unwrap(); + let deserialized: Message = serde_json::from_str(&json).unwrap(); + + assert_eq!(message, deserialized); + } + + #[test] + fn test_all_message_variants_serialization() { + let messages = vec![ + Message::Join { + username: "alice".to_string(), + }, + Message::Leave { + username: "bob".to_string(), + }, + Message::ChatMessage { + username: "charlie".to_string(), + content: "Hello!".to_string(), + }, + Message::UserJoined { + username: "dave".to_string(), + }, + Message::UserLeft { + username: "eve".to_string(), + }, + Message::Error { + message: "Something went wrong".to_string(), + }, + ]; + + for message in messages { + let json = serde_json::to_string(&message).unwrap(); + let deserialized: Message = serde_json::from_str(&json).unwrap(); + assert_eq!(message, deserialized); + } + } + + #[test] + fn test_username_validation_valid() { + assert!(validate_username("alice").is_ok()); + assert!(validate_username("bob123").is_ok()); + assert!(validate_username("charlie_doe").is_ok()); + assert!(validate_username("dave-smith").is_ok()); + assert!(validate_username("a1").is_ok()); + assert!(validate_username("abcdefghij1234567890").is_ok()); // 20 chars + } + + #[test] + fn test_username_validation_invalid() { + assert!(validate_username("").is_err()); + assert!(validate_username(" ").is_err()); + assert!(validate_username("a").is_err()); + assert!(validate_username("abcdefghij12345678901").is_err()); // 21 chars + assert!(validate_username("alice@domain").is_err()); + assert!(validate_username("bob!").is_err()); + assert!(validate_username("charlie#").is_err()); + assert!(validate_username("dave.smith").is_err()); + assert!(validate_username("eve space").is_err()); + } + + #[test] + fn test_message_validation_valid() { + let valid_messages = vec![ + Message::Join { + username: "alice".to_string(), + }, + Message::Leave { + username: "bob".to_string(), + }, + Message::ChatMessage { + username: "charlie".to_string(), + content: "Hello, world!".to_string(), + }, + Message::UserJoined { + username: "dave".to_string(), + }, + Message::UserLeft { + username: "eve".to_string(), + }, + Message::Error { + message: "Any error message".to_string(), + }, + ]; + + for message in valid_messages { + assert!( + message.validate().is_ok(), + "Message should be valid: {:?}", + message + ); + } + } + + #[test] + fn test_message_validation_invalid_username() { + let invalid_messages = vec![ + Message::Join { + username: "a".to_string(), + }, + Message::Leave { + username: "".to_string(), + }, + Message::ChatMessage { + username: "alice@domain".to_string(), + content: "Hello!".to_string(), + }, + Message::UserJoined { + username: "abcdefghij12345678901".to_string(), + }, + Message::UserLeft { + username: "bob!".to_string(), + }, + ]; + + for message in invalid_messages { + assert!( + message.validate().is_err(), + "Message should be invalid: {:?}", + message + ); + } + } + + #[test] + fn test_message_validation_invalid_content() { + let message = Message::ChatMessage { + username: "alice".to_string(), + content: "".to_string(), + }; + assert!(message.validate().is_err()); + + let message = Message::ChatMessage { + username: "alice".to_string(), + content: " ".to_string(), + }; + assert!(message.validate().is_err()); + + let long_content = "a".repeat(1001); + let message = Message::ChatMessage { + username: "alice".to_string(), + content: long_content, + }; + assert!(message.validate().is_err()); + } + + #[tokio::test] + async fn test_codec_encode_decode() { + use crate::codec::MessageCodec; + use bytes::BytesMut; + use tokio_util::codec::{Decoder, Encoder}; + + let mut codec = MessageCodec; + let message = Message::ChatMessage { + username: "alice".to_string(), + content: "Hello, world!".to_string(), + }; + + let mut buffer = BytesMut::new(); + codec.encode(message.clone(), &mut buffer).unwrap(); + + let decoded = codec.decode(&mut buffer).unwrap().unwrap(); + assert_eq!(message, decoded); + } + + #[tokio::test] + async fn test_codec_invalid_message() { + use crate::codec::MessageCodec; + use bytes::BytesMut; + use tokio_util::codec::Encoder; + + let mut codec = MessageCodec; + let invalid_message = Message::Join { + username: "a".to_string(), + }; // Too short + + let mut buffer = BytesMut::new(); + let result = codec.encode(invalid_message, &mut buffer); + assert!(result.is_err()); + } + + #[test] + fn test_error_display() { + let error = ChatError::InvalidUsername("Test error".to_string()); + assert_eq!(format!("{}", error), "Invalid username: Test error"); + + let error = ChatError::SerializationError("JSON error".to_string()); + assert_eq!(format!("{}", error), "Serialization error: JSON error"); + + let error = ChatError::NetworkError("Connection failed".to_string()); + assert_eq!(format!("{}", error), "Network error: Connection failed"); + } + + #[tokio::test] + async fn test_stream_utils_send_receive() { + use crate::stream_utils::{receive_message, send_message}; + use tokio::io::BufReader; + + let message = Message::ChatMessage { + username: "alice".to_string(), + content: "Hello!".to_string(), + }; + + // Create an in-memory buffer to simulate a stream + let mut buffer = Vec::new(); + + // Send message + send_message(&mut buffer, &message).await.unwrap(); + + // Create a reader from the buffer + let cursor = std::io::Cursor::new(buffer); + let mut reader = BufReader::new(cursor); + + // Receive message + let received = receive_message(&mut reader).await.unwrap(); + assert_eq!(message, received); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..e7a11a9 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,3 @@ +fn main() { + println!("Hello, world!"); +} diff --git a/tests/README.md b/tests/README.md new file mode 100644 index 0000000..9a9124e --- /dev/null +++ b/tests/README.md @@ -0,0 +1,101 @@ +# Integration Tests + +This directory contains comprehensive integration tests for the Simple Chat application. + +## Test Structure + +### Server Integration Tests (`server_integration.rs`) +- **Server startup and basic connection**: Tests server startup and client connections +- **Multiple client connections**: Verifies concurrent client handling +- **Message broadcasting**: Tests message distribution between clients +- **Username uniqueness**: Ensures usernames cannot be duplicated +- **Client disconnect cleanup**: Verifies proper cleanup when clients disconnect +- **Server max users limit**: Tests user capacity limits +- **Invalid message handling**: Tests error handling for malformed messages + +### Client-Server Integration Tests (`client_server_integration.rs`) +- **Full communication flow**: End-to-end message exchange testing +- **Join, send, leave operations**: Tests complete user lifecycle +- **Error handling scenarios**: Tests various error conditions +- **Concurrent client operations**: Tests multiple clients operating simultaneously +- **Connection failure scenarios**: Tests network error handling + +### End-to-End Scenario Tests (`end_to_end_scenarios.rs`) +- **Multiple clients chatting**: Realistic chat scenario with many participants +- **Client disconnect during chat**: Tests mid-conversation disconnections +- **Server shutdown with active clients**: Tests graceful/forced server shutdown +- **High load stress testing**: Performance testing under load + +## Running Tests + +### Run All Tests +```bash +cargo test +``` + +### Run Specific Test Suites +```bash +# Server integration tests only +cargo test --test server_integration + +# Client-server integration tests only +cargo test --test client_server_integration + +# End-to-end scenario tests only +cargo test --test end_to_end_scenarios +``` + +### Run Individual Tests +```bash +# Run a specific test +cargo test test_server_startup_and_basic_connection + +# Run tests matching a pattern +cargo test message_broadcasting +``` + +### Run Tests with Output +```bash +# Show test output (including server logs) +cargo test -- --nocapture + +# Run with debug logging +RUST_LOG=debug cargo test +``` + +## Test Features + +### Automatic Server Management +- Tests automatically start server processes on available ports +- Proper cleanup of server processes after tests complete +- No manual setup required + +### Realistic Scenarios +- Tests use actual TCP connections +- Message serialization/deserialization is fully tested +- Concurrent operations are properly tested + +### Comprehensive Coverage +- **Unit tests**: Message protocol and validation (in `src/lib.rs`) +- **Integration tests**: Server-client interactions +- **End-to-end tests**: Complete application workflows +- **Error scenarios**: Network failures, invalid inputs, edge cases + +### Test Reliability +- Automatic port allocation to avoid conflicts +- Proper message sequencing and timing +- Robust error handling for test flakiness + +## Test Requirements + +- All tests are self-contained and require no external dependencies +- Tests can run in parallel safely +- No manual server startup required +- Compatible with standard `cargo test` workflow + +## Performance Notes + +- Integration tests spawn actual server processes, so they're slower than unit tests +- Some tests include intentional delays for message propagation +- High-load tests may take longer on slower systems +- Tests are designed to be reliable across different system loads \ No newline at end of file diff --git a/tests/client_server_integration.rs b/tests/client_server_integration.rs new file mode 100644 index 0000000..ffcefe8 --- /dev/null +++ b/tests/client_server_integration.rs @@ -0,0 +1,483 @@ +use anyhow::Result; +use futures::{SinkExt, StreamExt}; +use simple_chat::{codec::MessageCodec, Message}; +use std::process::Stdio; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::Command; +use tokio::sync::mpsc; +use tokio::time::{sleep, timeout}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +// Test helper to find an available port +async fn find_available_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + addr.port() +} + +// Test helper to start server process +async fn start_server_process(port: u16) -> Result { + let child = Command::new("cargo") + .args([ + "run", + "--bin", + "server", + "--", + "--host", + "127.0.0.1", + "--port", + &port.to_string(), + "--max-users", + "20", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + + // Wait a bit for server to start + sleep(Duration::from_millis(1500)).await; + + Ok(child) +} + +// Test helper for client connection with full protocol +async fn create_authenticated_client( + host: &str, + port: u16, + username: &str, +) -> Result<( + FramedRead, + FramedWrite, +)> { + let stream = timeout( + Duration::from_secs(5), + TcpStream::connect(format!("{}:{}", host, port)), + ) + .await??; + + let (read_half, write_half) = stream.into_split(); + let mut framed_read = FramedRead::new(read_half, MessageCodec); + let mut framed_write = FramedWrite::new(write_half, MessageCodec); + + // Send join message + let join_msg = Message::Join { + username: username.to_string(), + }; + framed_write.send(join_msg).await?; + + // Wait for join confirmation + match timeout(Duration::from_secs(3), framed_read.next()).await { + Ok(Some(Ok(Message::UserJoined { + username: joined_user, + }))) => { + if joined_user == username { + Ok((framed_read, framed_write)) + } else { + Err(anyhow::anyhow!("Username mismatch in join confirmation")) + } + } + Ok(Some(Ok(Message::Error { message }))) => { + Err(anyhow::anyhow!("Join failed: {}", message)) + } + other => Err(anyhow::anyhow!("Unexpected join response: {:?}", other)), + } +} + +// Test helper to send a chat message +async fn send_chat_message( + writer: &mut FramedWrite, + username: &str, + content: &str, +) -> Result<()> { + let msg = Message::ChatMessage { + username: username.to_string(), + content: content.to_string(), + }; + writer.send(msg).await?; + Ok(()) +} + +// Test helper to expect a specific message +async fn expect_message( + reader: &mut FramedRead, + expected: Message, + timeout_secs: u64, +) -> Result<()> { + match timeout(Duration::from_secs(timeout_secs), reader.next()).await { + Ok(Some(Ok(actual))) => { + if actual == expected { + Ok(()) + } else { + Err(anyhow::anyhow!("Expected {:?}, got {:?}", expected, actual)) + } + } + Ok(Some(Err(e))) => Err(anyhow::anyhow!("Read error: {}", e)), + Ok(None) => Err(anyhow::anyhow!("Connection closed unexpectedly")), + Err(_) => Err(anyhow::anyhow!("Timeout waiting for message")), + } +} + +#[tokio::test] +async fn test_full_client_server_communication_flow() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Create authenticated client + let (mut reader, mut writer) = create_authenticated_client("127.0.0.1", port, "testuser") + .await + .expect("Failed to create authenticated client"); + + // First, we might receive a UserJoined message (the server broadcasts join confirmations) + // Let's clear any pending messages first + tokio::time::sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader.next()).await {} + + // Send a chat message + send_chat_message(&mut writer, "testuser", "Hello world!") + .await + .expect("Failed to send chat message"); + + // Should receive our own message back (broadcast) + let expected = Message::ChatMessage { + username: "testuser".to_string(), + content: "Hello world!".to_string(), + }; + expect_message(&mut reader, expected, 3) + .await + .expect("Should receive chat message broadcast"); + + // Send leave message + let leave_msg = Message::Leave { + username: "testuser".to_string(), + }; + writer + .send(leave_msg) + .await + .expect("Failed to send leave message"); + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_join_send_leave_operations() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Test 1: Join operation + let (mut reader1, mut writer1) = create_authenticated_client("127.0.0.1", port, "alice") + .await + .expect("Alice should be able to join"); + + // Clear any remaining messages from alice's join + tokio::time::sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader1.next()).await {} + + // Test 2: Second user join - first user should see notification + let (mut reader2, mut writer2) = create_authenticated_client("127.0.0.1", port, "bob") + .await + .expect("Bob should be able to join"); + + // Alice should receive bob's join notification + let expected = Message::UserJoined { + username: "bob".to_string(), + }; + expect_message(&mut reader1, expected, 3) + .await + .expect("Alice should receive bob's join notification"); + + // Clear any remaining messages from bob's join + tokio::time::sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader2.next()).await {} + + // Test 3: Send operations + send_chat_message(&mut writer1, "alice", "Hi Bob!") + .await + .expect("Alice should be able to send message"); + + // Bob should receive Alice's message + let expected = Message::ChatMessage { + username: "alice".to_string(), + content: "Hi Bob!".to_string(), + }; + expect_message(&mut reader2, expected, 3) + .await + .expect("Bob should receive Alice's message"); + + // Clear Alice's echo of her own message + tokio::time::sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader1.next()).await {} + + // Bob replies + send_chat_message(&mut writer2, "bob", "Hi Alice!") + .await + .expect("Bob should be able to send message"); + + // Alice should receive Bob's message + let expected = Message::ChatMessage { + username: "bob".to_string(), + content: "Hi Alice!".to_string(), + }; + expect_message(&mut reader1, expected, 3) + .await + .expect("Alice should receive Bob's message"); + + // Clear Bob's echo of his own message + tokio::time::sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader2.next()).await {} + + // Test 4: Leave operation + let leave_msg = Message::Leave { + username: "alice".to_string(), + }; + writer1 + .send(leave_msg) + .await + .expect("Alice should be able to leave"); + + // Bob should receive leave notification + let expected = Message::UserLeft { + username: "alice".to_string(), + }; + expect_message(&mut reader2, expected, 3) + .await + .expect("Bob should receive Alice's leave notification"); + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_error_handling_scenarios() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Test 1: Invalid username + let stream = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Should be able to connect"); + + let (read_half, write_half) = stream.into_split(); + let mut framed_read = FramedRead::new(read_half, MessageCodec); + let mut framed_write = FramedWrite::new(write_half, MessageCodec); + + // Try to join with invalid username + let join_msg = Message::Join { + username: "a".to_string(), + }; // Too short + framed_write + .send(join_msg) + .await + .expect("Should be able to send join message"); + + // Should receive error + match timeout(Duration::from_secs(2), framed_read.next()).await { + Ok(Some(Ok(Message::Error { message }))) => { + assert!( + message.contains("at least 2 characters"), + "Error should mention username length requirement" + ); + } + other => panic!( + "Expected error message for invalid username, got: {:?}", + other + ), + } + + // Test 2: Duplicate username + let (_reader1, _writer1) = create_authenticated_client("127.0.0.1", port, "testuser") + .await + .expect("First user should join successfully"); + + // Try to join with same username + let stream2 = TcpStream::connect(format!("127.0.0.1:{}", port)) + .await + .expect("Should be able to connect"); + + let (read_half2, write_half2) = stream2.into_split(); + let mut framed_read2 = FramedRead::new(read_half2, MessageCodec); + let mut framed_write2 = FramedWrite::new(write_half2, MessageCodec); + + let join_msg2 = Message::Join { + username: "testuser".to_string(), + }; + framed_write2 + .send(join_msg2) + .await + .expect("Should be able to send join message"); + + // Should receive error + match timeout(Duration::from_secs(2), framed_read2.next()).await { + Ok(Some(Ok(Message::Error { message }))) => { + assert!( + message.contains("already taken"), + "Error should mention username is taken" + ); + } + other => panic!( + "Expected error message for duplicate username, got: {:?}", + other + ), + } + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_concurrent_client_operations() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Create multiple clients concurrently + let client_count = 5; + let mut join_handles = Vec::new(); + let (tx, mut rx) = mpsc::channel(client_count); + + for i in 0..client_count { + let tx = tx.clone(); + let username = format!("user{}", i); + + let handle = tokio::spawn(async move { + let result = create_authenticated_client("127.0.0.1", port, &username).await; + let _ = tx.send((i, username, result)).await; + }); + + join_handles.push(handle); + } + + // Collect results + let mut successful_clients = Vec::new(); + for _ in 0..client_count { + if let Some((i, username, result)) = rx.recv().await { + match result { + Ok((reader, writer)) => { + successful_clients.push((i, username, reader, writer)); + } + Err(e) => panic!("Client {} failed to connect: {}", i, e), + } + } + } + + assert_eq!( + successful_clients.len(), + client_count, + "All clients should connect successfully" + ); + + // Test concurrent message sending + let message_handles: Vec<_> = successful_clients + .into_iter() + .map(|(i, username, mut reader, mut writer)| { + tokio::spawn(async move { + // Send message + let content = format!("Message from user {}", i); + send_chat_message(&mut writer, &username, &content) + .await + .expect("Should be able to send message"); + + // Count received messages (should receive from all other users) + let mut message_count = 0; + let timeout_duration = Duration::from_secs(3); + + while message_count < client_count - 1 { + // -1 because we don't receive our own + match timeout(timeout_duration, reader.next()).await { + Ok(Some(Ok(Message::ChatMessage { .. }))) => { + message_count += 1; + } + Ok(Some(Ok(_))) => { + // Other message types, continue + continue; + } + _ => break, + } + } + + message_count + }) + }) + .collect(); + + // Wait for all message exchanges + for handle in message_handles { + let received_count = handle.await.expect("Message exchange should complete"); + // Each client should receive messages from other clients + // (allowing some margin for test reliability in concurrent scenarios) + assert!( + received_count >= client_count - 2, + "Should receive most messages from other clients" + ); + } + + // Wait for handles to complete + for handle in join_handles { + let _ = handle.await; + } + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_connection_failure_scenarios() { + // Test 1: Connection to non-existent server + let non_existent_port = find_available_port().await; + + let result = timeout( + Duration::from_secs(2), + TcpStream::connect(format!("127.0.0.1:{}", non_existent_port)), + ) + .await; + + assert!(result.is_ok(), "Timeout should complete"); + assert!( + result.unwrap().is_err(), + "Connection should fail to non-existent server" + ); + + // Test 2: Server shutdown during client operation + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Connect client + let (mut reader, mut writer) = create_authenticated_client("127.0.0.1", port, "testuser") + .await + .expect("Client should connect initially"); + + // Kill server + server.kill().await.expect("Should be able to kill server"); + sleep(Duration::from_millis(500)).await; // Give time for shutdown + + // Try to send message - should fail + let msg = Message::ChatMessage { + username: "testuser".to_string(), + content: "This should fail".to_string(), + }; + + let send_result = writer.send(msg).await; + assert!( + send_result.is_err(), + "Send should fail after server shutdown" + ); + + // Try to read - should fail or return None + match timeout(Duration::from_secs(1), reader.next()).await { + Ok(Some(Ok(_))) => panic!("Should not receive message after server shutdown"), + Ok(Some(Err(_))) => {} // Expected - read error + Ok(None) => {} // Expected - connection closed + Err(_) => {} // Expected - timeout + } +} diff --git a/tests/end_to_end_scenarios.rs b/tests/end_to_end_scenarios.rs new file mode 100644 index 0000000..c27cdfa --- /dev/null +++ b/tests/end_to_end_scenarios.rs @@ -0,0 +1,605 @@ +use anyhow::Result; +use futures::{SinkExt, StreamExt}; +use simple_chat::{codec::MessageCodec, Message}; +use std::process::Stdio; +use std::sync::Arc; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +use tokio::process::Command; +use tokio::sync::{mpsc, Barrier}; +use tokio::time::{sleep, timeout}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +// Test helper to find an available port +async fn find_available_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + addr.port() +} + +// Test helper to start server process +async fn start_server_process(port: u16) -> Result { + let child = Command::new("cargo") + .args([ + "run", + "--bin", + "server", + "--", + "--host", + "127.0.0.1", + "--port", + &port.to_string(), + "--max-users", + "50", + ]) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .kill_on_drop(true) + .spawn()?; + + // Wait for server to start + sleep(Duration::from_millis(2000)).await; + + Ok(child) +} + +// Test helper to create and authenticate a client +async fn create_authenticated_client( + host: &str, + port: u16, + username: &str, +) -> Result<( + FramedRead, + FramedWrite, +)> { + let stream = timeout( + Duration::from_secs(5), + TcpStream::connect(format!("{}:{}", host, port)), + ) + .await??; + + let (read_half, write_half) = stream.into_split(); + let mut framed_read = FramedRead::new(read_half, MessageCodec); + let mut framed_write = FramedWrite::new(write_half, MessageCodec); + + // Send join message + let join_msg = Message::Join { + username: username.to_string(), + }; + framed_write.send(join_msg).await?; + + // Wait for join confirmation + match timeout(Duration::from_secs(3), framed_read.next()).await { + Ok(Some(Ok(Message::UserJoined { + username: joined_user, + }))) => { + if joined_user == username { + Ok((framed_read, framed_write)) + } else { + Err(anyhow::anyhow!("Username mismatch in join confirmation")) + } + } + Ok(Some(Ok(Message::Error { message }))) => { + Err(anyhow::anyhow!("Join failed: {}", message)) + } + other => Err(anyhow::anyhow!("Unexpected join response: {:?}", other)), + } +} + +// Simulate a chat participant +async fn simulate_chat_participant( + port: u16, + username: String, + barrier: Arc, + message_count: usize, + chat_duration_secs: u64, +) -> Result { + // Wait for all participants to be ready + barrier.wait().await; + + let (mut reader, mut writer) = + create_authenticated_client("127.0.0.1", port, &username).await?; + + // Start background task to count received messages + let (tx, mut rx) = mpsc::channel(100); + let reader_handle = tokio::spawn(async move { + let mut received_messages = 0; + let mut user_events = 0; + + loop { + match timeout(Duration::from_secs(1), reader.next()).await { + Ok(Some(Ok(Message::ChatMessage { .. }))) => { + received_messages += 1; + } + Ok(Some(Ok(Message::UserJoined { .. }))) => { + user_events += 1; + } + Ok(Some(Ok(Message::UserLeft { .. }))) => { + user_events += 1; + } + Ok(Some(Ok(Message::Error { .. }))) => { + // Continue on errors + } + Ok(Some(Ok(Message::Join { .. }))) => { + // Shouldn't receive join messages normally, but handle gracefully + continue; + } + Ok(Some(Ok(Message::Leave { .. }))) => { + // Shouldn't receive leave messages normally, but handle gracefully + continue; + } + Ok(Some(Err(_))) => break, // Connection error + Ok(None) => break, // Connection closed + Err(_) => continue, // Timeout, continue reading + } + + // Check if we should stop + if let Ok(()) = rx.try_recv() { + break; + } + } + + (received_messages, user_events) + }); + + // Send messages periodically + let message_interval = + Duration::from_millis((chat_duration_secs * 1000) / message_count as u64); + + for i in 0..message_count { + let content = format!("Message {} from {}", i + 1, username); + + let msg = Message::ChatMessage { + username: username.clone(), + content, + }; + + if (writer.send(msg).await).is_err() { + break; // Connection failed + } + + sleep(message_interval).await; + } + + // Send leave message + let leave_msg = Message::Leave { + username: username.clone(), + }; + let _ = writer.send(leave_msg).await; + + // Signal reader to stop and get results + let _ = tx.send(()).await; + let (received_messages, _user_events) = reader_handle.await.unwrap_or((0, 0)); + + Ok(received_messages) +} + +#[tokio::test] +async fn test_multiple_clients_chatting_scenario() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + let client_count = 8; + let messages_per_client = 5; + let chat_duration_secs = 10; + + // Create barrier for synchronized start + let barrier = Arc::new(Barrier::new(client_count)); + + // Spawn chat participants + let mut handles = Vec::new(); + + for i in 0..client_count { + let barrier = Arc::clone(&barrier); + let username = format!("user{:02}", i); + + let handle = tokio::spawn(async move { + simulate_chat_participant( + port, + username, + barrier, + messages_per_client, + chat_duration_secs, + ) + .await + }); + + handles.push(handle); + } + + // Wait for all participants to complete + let mut total_messages_received = 0; + let mut successful_participants = 0; + + for handle in handles { + match handle.await { + Ok(Ok(messages_received)) => { + total_messages_received += messages_received; + successful_participants += 1; + } + Ok(Err(e)) => eprintln!("Participant failed: {}", e), + Err(e) => eprintln!("Task failed: {}", e), + } + } + + // Verify results + assert!( + successful_participants >= client_count - 1, + "Most participants should complete successfully" + ); + + // Each client sends messages_per_client messages, and should receive + // messages from other clients. Account for some message loss in concurrent scenarios. + let expected_total_messages = client_count * messages_per_client * (client_count - 1); + let message_delivery_rate = total_messages_received as f64 / expected_total_messages as f64; + + assert!( + message_delivery_rate > 0.7, + "Message delivery rate should be > 70%, got {:.2}%", + message_delivery_rate * 100.0 + ); + + println!("โœ… Multi-client chat test completed:"); + println!( + " Participants: {}/{}", + successful_participants, client_count + ); + println!( + " Messages received: {}/{} ({:.1}%)", + total_messages_received, + expected_total_messages, + message_delivery_rate * 100.0 + ); + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_client_disconnect_during_chat() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Create multiple clients + let (mut reader1, mut writer1) = create_authenticated_client("127.0.0.1", port, "alice") + .await + .expect("Alice should connect"); + + let (mut reader2, writer2) = create_authenticated_client("127.0.0.1", port, "bob") + .await + .expect("Bob should connect"); + + let (mut reader3, mut writer3) = create_authenticated_client("127.0.0.1", port, "charlie") + .await + .expect("Charlie should connect"); + + // Clear initial join notifications + sleep(Duration::from_millis(500)).await; + for reader in [&mut reader1, &mut reader2, &mut reader3] { + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader.next()).await {} + } + + // Start chat between all participants + let msg1 = Message::ChatMessage { + username: "alice".to_string(), + content: "Hello everyone!".to_string(), + }; + writer1 + .send(msg1.clone()) + .await + .expect("Alice should send message"); + + // Bob and Charlie should receive the message + let timeout_duration = Duration::from_secs(2); + + match timeout(timeout_duration, reader2.next()).await { + Ok(Some(Ok(received))) => assert_eq!(received, msg1), + other => panic!("Bob should receive Alice's message, got: {:?}", other), + } + + match timeout(timeout_duration, reader3.next()).await { + Ok(Some(Ok(received))) => assert_eq!(received, msg1), + other => panic!("Charlie should receive Alice's message, got: {:?}", other), + } + + // Bob suddenly disconnects (simulated by dropping connection) + drop(writer2); + drop(reader2); + + // Wait for disconnect to be processed + sleep(Duration::from_millis(500)).await; + + // Alice and Charlie should receive disconnect notification + let expected_leave = Message::UserLeft { + username: "bob".to_string(), + }; + + let mut found_leave_alice = false; + let mut found_leave_charlie = false; + + // Check multiple messages for leave notification + for _ in 0..5 { + if let Ok(Some(Ok(msg))) = timeout(Duration::from_millis(300), reader1.next()).await { + if msg == expected_leave { + found_leave_alice = true; + break; + } + } + } + + for _ in 0..5 { + if let Ok(Some(Ok(msg))) = timeout(Duration::from_millis(300), reader3.next()).await { + if msg == expected_leave { + found_leave_charlie = true; + break; + } + } + } + + assert!( + found_leave_alice, + "Alice should receive Bob's leave notification" + ); + assert!( + found_leave_charlie, + "Charlie should receive Bob's leave notification" + ); + + // Chat should continue between remaining participants + let msg3 = Message::ChatMessage { + username: "charlie".to_string(), + content: "Bob left, but we can still chat".to_string(), + }; + writer3 + .send(msg3.clone()) + .await + .expect("Charlie should send message"); + + match timeout(Duration::from_secs(2), reader1.next()).await { + Ok(Some(Ok(received))) => assert_eq!(received, msg3), + other => panic!("Alice should receive Charlie's message, got: {:?}", other), + } + + // New user should be able to join with Bob's old username + let (_reader4, _writer4) = create_authenticated_client("127.0.0.1", port, "bob") + .await + .expect("New Bob should be able to join"); + + // Cleanup + let _ = server.kill().await; +} + +#[tokio::test] +async fn test_server_shutdown_with_active_clients() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + // Create multiple active clients + let mut clients = Vec::new(); + for i in 0..5 { + let username = format!("user{}", i); + match create_authenticated_client("127.0.0.1", port, &username).await { + Ok((reader, writer)) => clients.push((username, reader, writer)), + Err(e) => panic!("Failed to create client {}: {}", i, e), + } + } + + assert_eq!(clients.len(), 5, "All clients should connect successfully"); + + // Start some chat activity + for (username, _, writer) in &mut clients { + let msg = Message::ChatMessage { + username: username.clone(), + content: "Active before shutdown".to_string(), + }; + let _ = writer.send(msg).await; // Ignore errors + } + + // Give some time for messages to propagate + sleep(Duration::from_millis(200)).await; + + // Shutdown server abruptly + server.kill().await.expect("Should be able to kill server"); + + // Wait for shutdown to propagate + sleep(Duration::from_millis(500)).await; + + // All clients should detect server shutdown + let mut clients_detected_shutdown = 0; + + for (username, mut reader, mut writer) in clients { + // Try to send a message - should fail + let msg = Message::ChatMessage { + username: username.clone(), + content: "This should fail".to_string(), + }; + + let send_failed = writer.send(msg).await.is_err(); + + // Try to read - should fail or timeout + let read_failed = match timeout(Duration::from_millis(500), reader.next()).await { + Ok(Some(Ok(_))) => false, // Unexpectedly got a message + Ok(Some(Err(_))) => true, // Read error (expected) + Ok(None) => true, // Connection closed (expected) + Err(_) => true, // Timeout (expected) + }; + + if send_failed || read_failed { + clients_detected_shutdown += 1; + } + } + + assert!( + clients_detected_shutdown >= 4, + "Most clients should detect server shutdown (detected: {})", + clients_detected_shutdown + ); +} + +#[tokio::test] +async fn test_high_load_stress_scenario() { + let port = find_available_port().await; + let mut server = start_server_process(port) + .await + .expect("Failed to start server"); + + let client_count = 20; + let messages_per_client = 10; + + // Create all clients first + let mut clients = Vec::new(); + + for i in 0..client_count { + let username = format!("stress_user_{:03}", i); + match timeout( + Duration::from_secs(10), + create_authenticated_client("127.0.0.1", port, &username), + ) + .await + { + Ok(Ok((reader, writer))) => clients.push((username, reader, writer)), + Ok(Err(e)) => { + eprintln!("Failed to create client {}: {}", i, e); + break; // Stop if we can't create more clients + } + Err(_) => { + eprintln!("Timeout creating client {}", i); + break; + } + } + } + + let actual_client_count = clients.len(); + assert!( + actual_client_count >= client_count / 2, + "Should be able to create at least half the intended clients" + ); + + // Split into senders and receivers for concurrent operations + let (senders, receivers): (Vec<_>, Vec<_>) = clients + .into_iter() + .enumerate() + .map(|(i, (username, reader, writer))| { + ((i, username.clone(), writer), (i, username, reader)) + }) + .unzip(); + + // Start message sending tasks + let send_handles: Vec<_> = senders + .into_iter() + .map(|(i, username, mut writer)| { + tokio::spawn(async move { + let mut sent_count = 0; + for msg_num in 0..messages_per_client { + let content = format!("Stress message {} from {}", msg_num, username); + let msg = Message::ChatMessage { + username: username.clone(), + content, + }; + + if writer.send(msg).await.is_ok() { + sent_count += 1; + } else { + break; // Stop on send error + } + + // Small delay to avoid overwhelming + sleep(Duration::from_millis(10)).await; + } + + // Send leave message + let leave_msg = Message::Leave { username }; + let _ = writer.send(leave_msg).await; + + (i, sent_count) + }) + }) + .collect(); + + // Start message receiving tasks + let receive_handles: Vec<_> = receivers + .into_iter() + .map(|(i, username, mut reader)| { + tokio::spawn(async move { + let mut received_count = 0; + let start_time = std::time::Instant::now(); + let max_duration = Duration::from_secs(30); // Max test duration + + while start_time.elapsed() < max_duration { + match timeout(Duration::from_millis(100), reader.next()).await { + Ok(Some(Ok(Message::ChatMessage { .. }))) => { + received_count += 1; + } + Ok(Some(Ok(Message::UserLeft { .. }))) => { + // User left, might be end of test + continue; + } + Ok(Some(Ok(_))) => { + // Other message types + continue; + } + Ok(Some(Err(_))) => break, // Connection error + Ok(None) => break, // Connection closed + Err(_) => continue, // Timeout, continue + } + } + + (i, username, received_count) + }) + }) + .collect(); + + // Wait for all senders to complete + let mut total_sent = 0; + for handle in send_handles { + if let Ok((_, sent_count)) = handle.await { + total_sent += sent_count; + } + } + + // Wait for receivers (with timeout) + let mut total_received = 0; + for handle in receive_handles { + if let Ok(Ok((_, _, count))) = timeout(Duration::from_secs(5), handle).await { + total_received += count; + } + } + + // Verify stress test results + let expected_total = actual_client_count * messages_per_client; + let send_success_rate = total_sent as f64 / expected_total as f64; + + assert!( + send_success_rate > 0.8, + "Send success rate should be > 80%, got {:.2}%", + send_success_rate * 100.0 + ); + + // In high load scenarios, some message loss is acceptable + assert!(total_received > 0, "Should receive at least some messages"); + + println!("โœ… High load stress test completed:"); + println!( + " Clients created: {}/{}", + actual_client_count, client_count + ); + println!( + " Messages sent: {}/{} ({:.1}%)", + total_sent, + expected_total, + send_success_rate * 100.0 + ); + println!( + " Messages received: {} (delivery varies under load)", + total_received + ); + + // Cleanup + let _ = server.kill().await; +} diff --git a/tests/server_integration.rs b/tests/server_integration.rs new file mode 100644 index 0000000..f3ba91b --- /dev/null +++ b/tests/server_integration.rs @@ -0,0 +1,370 @@ +use anyhow::Result; +use futures::{SinkExt, StreamExt}; +use simple_chat::{codec::MessageCodec, Message}; +use std::time::Duration; +use tokio::net::{TcpListener, TcpStream}; +use tokio::time::{sleep, timeout}; +use tokio_util::codec::{FramedRead, FramedWrite}; + +// Test helper to find an available port +async fn find_available_port() -> u16 { + let listener = TcpListener::bind("127.0.0.1:0").await.unwrap(); + let addr = listener.local_addr().unwrap(); + addr.port() +} + +// Test helper to create a test client connection +async fn create_test_client( + host: &str, + port: u16, +) -> Result<( + FramedRead, + FramedWrite, +)> { + let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; + let (read_half, write_half) = stream.into_split(); + + let framed_read = FramedRead::new(read_half, MessageCodec); + let framed_write = FramedWrite::new(write_half, MessageCodec); + + Ok((framed_read, framed_write)) +} + +// Test helper to join a user and return success/failure +async fn join_user( + writer: &mut FramedWrite, + reader: &mut FramedRead, + username: &str, +) -> Result { + let join_msg = Message::Join { + username: username.to_string(), + }; + writer.send(join_msg).await?; + + match timeout(Duration::from_secs(2), reader.next()).await { + Ok(Some(Ok(Message::UserJoined { + username: joined_user, + }))) => Ok(joined_user == username), + Ok(Some(Ok(Message::Error { .. }))) => Ok(false), + _ => Ok(false), + } +} + +// Test helper to start server in background +async fn start_test_server(port: u16) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let result = tokio::process::Command::new("cargo") + .args([ + "run", + "--bin", + "server", + "--", + "--port", + &port.to_string(), + "--max-users", + "10", + ]) + .status() + .await; + + if let Err(e) = result { + eprintln!("Server failed to start: {}", e); + } + }) +} + +#[tokio::test] +async fn test_server_startup_and_basic_connection() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Test basic connection + let result = timeout( + Duration::from_secs(5), + TcpStream::connect(format!("127.0.0.1:{}", port)), + ) + .await; + + assert!(result.is_ok(), "Should be able to connect to server"); + let stream = result.unwrap(); + assert!(stream.is_ok(), "Connection should succeed"); +} + +#[tokio::test] +async fn test_multiple_client_connections() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Create multiple clients + let mut clients = Vec::new(); + for i in 0..3 { + match create_test_client("127.0.0.1", port).await { + Ok(client) => clients.push((format!("user{}", i), client)), + Err(e) => panic!("Failed to create client {}: {}", i, e), + } + } + + assert_eq!(clients.len(), 3, "Should have 3 connected clients"); + + // Test that all clients can join successfully + for (username, (mut reader, mut writer)) in clients.into_iter() { + let joined = join_user(&mut writer, &mut reader, &username) + .await + .unwrap_or(false); + assert!(joined, "User {} should be able to join", username); + } +} + +#[tokio::test] +async fn test_message_broadcasting() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Create two clients + let (mut reader1, mut writer1) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 1"); + let (mut reader2, mut writer2) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 2"); + + // Join both users + assert!(join_user(&mut writer1, &mut reader1, "alice") + .await + .unwrap_or(false)); + assert!(join_user(&mut writer2, &mut reader2, "bob") + .await + .unwrap_or(false)); + + // Clear any pending join notifications + sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader1.next()).await {} + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader2.next()).await {} + + // Send message from alice + let chat_msg = Message::ChatMessage { + username: "alice".to_string(), + content: "Hello everyone!".to_string(), + }; + writer1 + .send(chat_msg.clone()) + .await + .expect("Failed to send message"); + + // Bob should receive the message + match timeout(Duration::from_secs(2), reader2.next()).await { + Ok(Some(Ok(Message::ChatMessage { username, content }))) => { + assert_eq!(username, "alice"); + assert_eq!(content, "Hello everyone!"); + } + other => panic!("Expected chat message, got: {:?}", other), + } +} + +#[tokio::test] +async fn test_username_uniqueness_enforcement() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Create two clients + let (mut reader1, mut writer1) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 1"); + let (mut reader2, mut writer2) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 2"); + + // First user joins successfully + assert!(join_user(&mut writer1, &mut reader1, "testuser") + .await + .unwrap_or(false)); + + // Second user tries to join with same username - should fail + let join_msg = Message::Join { + username: "testuser".to_string(), + }; + writer2 + .send(join_msg) + .await + .expect("Failed to send join message"); + + match timeout(Duration::from_secs(2), reader2.next()).await { + Ok(Some(Ok(Message::Error { message }))) => { + assert!( + message.contains("already taken"), + "Error should mention username is taken" + ); + } + other => panic!("Expected error message, got: {:?}", other), + } +} + +#[tokio::test] +async fn test_client_disconnect_cleanup() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Create three clients + let (mut reader1, mut writer1) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 1"); + let (mut reader2, mut writer2) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 2"); + let (mut reader3, mut writer3) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 3"); + + // All users join + assert!(join_user(&mut writer1, &mut reader1, "alice") + .await + .unwrap_or(false)); + assert!(join_user(&mut writer2, &mut reader2, "bob") + .await + .unwrap_or(false)); + assert!(join_user(&mut writer3, &mut reader3, "charlie") + .await + .unwrap_or(false)); + + // Clear join notifications + sleep(Duration::from_millis(200)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader1.next()).await {} + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader3.next()).await {} + + // Bob leaves gracefully + let leave_msg = Message::Leave { + username: "bob".to_string(), + }; + writer2 + .send(leave_msg) + .await + .expect("Failed to send leave message"); + + // Drop bob's connection to simulate disconnect + drop(writer2); + drop(reader2); + + // Alice and Charlie should receive leave notification + sleep(Duration::from_millis(100)).await; + + let mut found_leave_notification = false; + for _ in 0..3 { + // Check a few messages + if let Ok(Some(Ok(Message::UserLeft { username }))) = + timeout(Duration::from_millis(500), reader1.next()).await + { + if username == "bob" { + found_leave_notification = true; + break; + } + } + } + + assert!( + found_leave_notification, + "Should receive leave notification for bob" + ); + + // New user should be able to take bob's username + let (mut reader4, mut writer4) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client 4"); + assert!(join_user(&mut writer4, &mut reader4, "bob") + .await + .unwrap_or(false)); +} + +#[tokio::test] +async fn test_server_max_users_limit() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + // Try to connect more users than the limit (server started with --max-users 10) + let mut successful_joins = 0; + let mut clients = Vec::new(); + + for i in 0..12 { + // Try to create 12 users (limit is 10) + if let Ok((mut reader, mut writer)) = create_test_client("127.0.0.1", port).await { + let username = format!("user{}", i); + if join_user(&mut writer, &mut reader, &username) + .await + .unwrap_or(false) + { + successful_joins += 1; + clients.push((reader, writer)); + } else { + // Check if we got a "server is full" error + break; + } + } + } + + // Should be able to join up to the limit + assert!(successful_joins <= 10, "Should not exceed max user limit"); + assert!( + successful_joins >= 8, + "Should be able to join at least 8 users" + ); // Allow some margin for test reliability +} + +#[tokio::test] +async fn test_invalid_message_handling() { + let port = find_available_port().await; + let _server_handle = start_test_server(port).await; + + // Wait for server to start + sleep(Duration::from_millis(1000)).await; + + let (mut reader, mut writer) = create_test_client("127.0.0.1", port) + .await + .expect("Failed to create client"); + + // Join first + assert!(join_user(&mut writer, &mut reader, "testuser") + .await + .unwrap_or(false)); + + // Clear join notification + sleep(Duration::from_millis(100)).await; + while let Ok(Some(_)) = timeout(Duration::from_millis(50), reader.next()).await {} + + // Try to send chat message with wrong username + let invalid_msg = Message::ChatMessage { + username: "wronguser".to_string(), + content: "This should fail".to_string(), + }; + writer + .send(invalid_msg) + .await + .expect("Failed to send message"); + + // Should receive error message + match timeout(Duration::from_secs(2), reader.next()).await { + Ok(Some(Ok(Message::Error { message }))) => { + assert!( + message.contains("mismatch"), + "Error should mention username mismatch" + ); + } + other => panic!("Expected error message, got: {:?}", other), + } +}