From 6b7afe54ed530d3f3d8e0cad1aca71ab34450ec2 Mon Sep 17 00:00:00 2001 From: xy Date: Tue, 17 Sep 2024 00:01:26 +0900 Subject: [PATCH] feat(websocket): add client examples for test --- websocket/Cargo.lock | 36 +++++++- websocket/Cargo.toml | 5 +- websocket/crates/infra/Cargo.toml | 2 + .../infra/examples/authenticated_client.rs | 89 +++++++++++++++++++ .../crates/infra/examples/simple_client.rs | 37 ++++++++ 5 files changed, 166 insertions(+), 3 deletions(-) create mode 100644 websocket/crates/infra/examples/authenticated_client.rs create mode 100644 websocket/crates/infra/examples/simple_client.rs diff --git a/websocket/Cargo.lock b/websocket/Cargo.lock index 3c5854a96..3e8fe0c71 100644 --- a/websocket/Cargo.lock +++ b/websocket/Cargo.lock @@ -304,7 +304,7 @@ dependencies = [ "sha1", "sync_wrapper 1.0.1", "tokio", - "tokio-tungstenite", + "tokio-tungstenite 0.21.0", "tower", "tower-layer", "tower-service", @@ -1359,6 +1359,7 @@ dependencies = [ "dashmap", "dotenv", "flow-websocket-domain", + "futures-util", "google-cloud-storage", "http", "http-body-util", @@ -1375,6 +1376,7 @@ dependencies = [ "services", "thiserror", "tokio", + "tokio-tungstenite 0.24.0", "tower", "tower-http", "tracing", @@ -2696,7 +2698,19 @@ dependencies = [ "futures-util", "log", "tokio", - "tungstenite", + "tungstenite 0.21.0", +] + +[[package]] +name = "tokio-tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edc5f74e248dc973e0dbb7b74c7e0d6fcc301c694ff50049504004ef4d0cdcd9" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.24.0", ] [[package]] @@ -2852,6 +2866,24 @@ dependencies = [ "utf-8", ] +[[package]] +name = "tungstenite" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "18e5b8366ee7a95b16d32197d0b2604b43a0be89dc5fac9f8e96ccafbaedda8a" +dependencies = [ + "byteorder", + "bytes", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "utf-8", +] + [[package]] name = "typenum" version = "1.17.0" diff --git a/websocket/Cargo.toml b/websocket/Cargo.toml index 12baa44b1..75623c872 100644 --- a/websocket/Cargo.toml +++ b/websocket/Cargo.toml @@ -35,7 +35,7 @@ strip = true [workspace.dependencies] flow-websocket-domain = { path = "crates/domain" } -async-trait = "0.1.82" +tokio_tungsteniteasync-trait = "0.1.82" axum = { version = "0.7", features = ["ws"] } axum-extra = { version = "0.9", features = ["typed-header"] } axum-macros = "0.4" @@ -72,3 +72,6 @@ url = "2.5.2" http = "1.1.0" dashmap = "6.1.0" cached = "0.53.1" +tokio-tungstenite = "0.24.0" +futures-util = "0.3" +async-trait = "0.1.82" diff --git a/websocket/crates/infra/Cargo.toml b/websocket/crates/infra/Cargo.toml index 5c259ea67..8bf796f04 100644 --- a/websocket/crates/infra/Cargo.toml +++ b/websocket/crates/infra/Cargo.toml @@ -48,3 +48,5 @@ url.workspace = true http.workspace = true dashmap.workspace = true cached.workspace = true +tokio-tungstenite.workspace = true +futures-util.workspace = true diff --git a/websocket/crates/infra/examples/authenticated_client.rs b/websocket/crates/infra/examples/authenticated_client.rs new file mode 100644 index 000000000..1700cf94d --- /dev/null +++ b/websocket/crates/infra/examples/authenticated_client.rs @@ -0,0 +1,89 @@ +use futures_util::{SinkExt, StreamExt}; +use http::Request; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; +use url::Url; + +#[tokio::main] +async fn main() -> Result<(), Box> { + println!("Starting authenticated client..."); + + let jwt_token = "your_jwt_token_here"; + println!("Using JWT token: {}", jwt_token); + + let url = Url::parse("ws://localhost:8000/room1?123")?; + println!("Connecting to URL: {}", url); + + // Extract the host from the URL + let host = url.host_str().ok_or("Invalid URL: missing host")?; + + // Create a request with the Authorization header + let request = Request::builder() + .uri(url.as_str()) + .header("Authorization", format!("Bearer {}", jwt_token)) + .header("Host", host) + .header("Connection", "Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", generate_key()) + .body(())?; + + println!("Attempting to connect..."); + match connect_async(request).await { + Ok((ws_stream, response)) => { + println!("WebSocket handshake has been successfully completed"); + println!("Response status: {}", response.status()); + println!("Response headers: {:#?}", response.headers()); + + let (mut write, mut read) = ws_stream.split(); + + // Spawn a task to handle incoming messages + let read_task = tokio::spawn(async move { + println!("Starting to listen for messages..."); + while let Some(message) = read.next().await { + match message { + Ok(msg) => println!("Received: {}", msg), + Err(e) => eprintln!("Error receiving message: {}", e), + } + } + println!("Stopped listening for messages."); + }); + + // Send messages in the main task + println!("Ready to send messages. Type 'exit' to quit."); + loop { + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + let input = input.trim(); + + if input == "exit" { + println!("Exiting..."); + break; + } + + match write.send(Message::Text(input.to_string())).await { + Ok(_) => println!("Sent message: {}", input), + Err(e) => eprintln!("Error sending message: {}", e), + } + } + + // Ensure the read task is properly closed + read_task.abort(); + } + Err(e) => { + eprintln!("Failed to connect: {:?}", e); + return Err(e.into()); + } + } + + println!("Client shutting down."); + Ok(()) +} + +fn generate_key() -> String { + use base64::{engine::general_purpose::STANDARD, Engine}; + use rand::Rng; + + let mut key = [0u8; 16]; + rand::thread_rng().fill(&mut key); + STANDARD.encode(key) +} diff --git a/websocket/crates/infra/examples/simple_client.rs b/websocket/crates/infra/examples/simple_client.rs new file mode 100644 index 000000000..e9598ab8b --- /dev/null +++ b/websocket/crates/infra/examples/simple_client.rs @@ -0,0 +1,37 @@ +use futures_util::{SinkExt, StreamExt}; +use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let url = "ws://localhost:8000/room1"; + + let (ws_stream, _) = connect_async(url).await?; + println!("WebSocket handshake has been successfully completed"); + + let (mut write, mut read) = ws_stream.split(); + + // Spawn a task to handle incoming messages + tokio::spawn(async move { + while let Some(message) = read.next().await { + match message { + Ok(msg) => println!("Received: {}", msg), + Err(e) => eprintln!("Error receiving message: {}", e), + } + } + }); + + // Send messages in the main task + loop { + let mut input = String::new(); + std::io::stdin().read_line(&mut input)?; + let input = input.trim(); + + if input == "exit" { + break; + } + + write.send(Message::Text(input.to_string())).await?; + } + + Ok(()) +}