Skip to content

Commit

Permalink
check examples/websocket's behavior & fix on ohkami way
Browse files Browse the repository at this point in the history
  • Loading branch information
kanarus committed Oct 29, 2024
1 parent 4cdad6e commit a25781e
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 33 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,10 @@ Currently, WebSocket on `rt_worker` is *not* supported.

```rust,no_run
use ohkami::prelude::*;
use ohkami::ws::{WebSocketContext, WebSocket, Message, Connection};
use ohkami::ws::{WebSocketContext, WebSocket, Message};
async fn echo_text(ctx: WebSocketContext<'_>) -> WebSocket {
ctx.upgrade(|mut conn: Connection| async move {
ctx.upgrade(|mut conn| async move {
while let Ok(Some(Message::Text(text))) = conn.recv().await {
conn.send(text).await.expect("failed to send text");
}
Expand Down
12 changes: 7 additions & 5 deletions examples/websocket/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use ohkami::prelude::*;
use ohkami::ws::{WebSocketContext, WebSocket, Message, Connection, ReadHalf, WriteHalf};
use ohkami::ws::{WebSocketContext, WebSocket, Message};


async fn echo_text(c: WebSocketContext<'_>) -> WebSocket {
c.upgrade(|mut c: Connection| async move {
c.upgrade(|mut c| async move {
while let Ok(Some(Message::Text(text))) = c.recv().await {
if text == "close" {
break
Expand All @@ -27,7 +27,7 @@ struct EchoTextSession<'ws> {
}
impl IntoResponse for EchoTextSession<'_> {
fn into_response(self) -> Response {
self.ctx.upgrade(|mut c: Connection| async move {
self.ctx.upgrade(|mut c| async move {
c.send(format!("Hello, {}!", self.name)).await.expect("failed to send");

while let Ok(Some(Message::Text(text))) = c.recv().await {
Expand All @@ -44,7 +44,9 @@ impl IntoResponse for EchoTextSession<'_> {
async fn echo_text_3(name: String,
ctx: WebSocketContext<'_>
) -> WebSocket {
ctx.upgrade(|mut r: ReadHalf, mut w: WriteHalf| async {
ctx.upgrade(|c| async {
let (mut r, mut w) = c.split();

let incoming = std::sync::Arc::new(tokio::sync::RwLock::new(std::collections::VecDeque::new()));
let (close_tx, close_rx) = tokio::sync::watch::channel(());

Expand Down Expand Up @@ -110,7 +112,7 @@ async fn echo_text_3(name: String,


async fn echo4(name: String, ws: WebSocketContext<'_>) -> WebSocket {
ws.upgrade(|mut c: Connection| async {
ws.upgrade(|mut c| async {
/* spawn but not join the handle */
tokio::spawn(async move {
#[cfg(feature="DEBUG")] println!("\n{c:#?}");
Expand Down
14 changes: 11 additions & 3 deletions ohkami/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,9 @@ impl Session {
}
}).await {
None => crate::warning!("[WARNING] \
Session timeouted. In Ohkami, Keep-Alive timeout \
Session timeouted. In Ohkami, Keep-Alive timeout \
is set to 42 seconds by default and is configurable \
via `OHKAMI_KEEPALIVE_TIMEOUT` environment variable. \
by `OHKAMI_KEEPALIVE_TIMEOUT` environment variable.\
"),

Some(Upgrade::None) => crate::DEBUG!("about to shutdown connection"),
Expand All @@ -103,10 +103,18 @@ impl Session {
Some(Upgrade::WebSocket(ws)) => {
crate::DEBUG!("WebSocket session started");

ws.manage_with_timeout(
let aborted = ws.manage_with_timeout(
Duration::from_secs(env::OHKAMI_WEBSOCKET_TIMEOUT()),
self.connection
).await;
if aborted {
crate::warning!("[WARNING] \
WebSocket session aborted by timeout. In Ohkami, \
WebSocket timeout is set to 3600 seconds (1 hour) \
by default and is configurable by `OHKAMI_WEBSOCKET_TIMEOUT` \
environment variable.\
");
}

crate::DEBUG!("WebSocket session finished");
}
Expand Down
63 changes: 40 additions & 23 deletions ohkami/src/ws/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ use crate::{__rt__, FromRequest, IntoResponse, Request, Response};
///
/// *example.rs*
/// ```
/// use ohkami::ws::{WebSocketContext, WebSocket, Connection};
/// use ohkami::ws::{WebSocketContext, WebSocket};
///
/// async fn ws(ctx: WebSocketContext<'_>) -> WebSocket {
/// ctx.upgrade(|mut conn: Connection| async move {
/// ctx.upgrade(|mut conn| async move {
/// conn.send("Hello, WebSocket! and bye...").await
/// .expect("failed to send")
/// })
/// }
/// ```
Expand All @@ -41,10 +42,10 @@ const _: () = {
} {
return Some(Err((|| Response::BadRequest().with_text("upgrade request must have `Connection: Upgrade`"))()))
}
if req.headers.Upgrade()?.eq_ignore_ascii_case("websocket") {
if !(req.headers.Upgrade()?.eq_ignore_ascii_case("websocket")) {
return Some(Err((|| Response::BadRequest().with_text("upgrade request must have `Upgrade: websocket`"))()))
}
if req.headers.SecWebSocketVersion()? != "13" {
if !(req.headers.SecWebSocketVersion()? == "13") {
return Some(Err((|| Response::BadRequest().with_text("upgrade request must have `Sec-WebSocket-Version: 13`"))()))
}

Expand All @@ -60,28 +61,31 @@ const _: () = {
///
/// ## handler
///
/// any `FnOnce + Send + Sync` returning `Future + Send` with following signature:
///
/// * `(Connection) -> () | std::io::Result<()>`
/// * `(ReadHalf, WriteHalf) -> () | std::io::Result<()>`
pub fn upgrade<T, C: mews::connection::UnderlyingConnection>(self,
handler: impl mews::handler::IntoHandler<C, T>
) -> WebSocket<C> {
/// any `FnOnce(Connection) -> {impl Future<Output = ()> + Send} + Send + Sync`
pub fn upgrade<H, F, C: mews::connection::UnderlyingConnection>(
self,
handler: H
) -> WebSocket<C>
where
H: FnOnce(Connection<C>) -> F + Send + Sync + 'static,
F: std::future::Future<Output = ()> + Send + 'static,
{
self.upgrade_with(Config::default(), handler)
}

/// create a `WebSocket` with the config and handler.
///
/// ## handler
///
/// any `FnOnce + Send + Sync` returning `Future + Send` with following signature:
///
/// * `(Connection) -> () | std::io::Result<()>`
/// * `(ReadHalf, WriteHalf) -> () | std::io::Result<()>`
pub fn upgrade_with<T, C: mews::connection::UnderlyingConnection>(self,
/// any `FnOnce(Connection) -> {impl Future<Output = ()> + Send} + Send + Sync`
pub fn upgrade_with<H, F, C: mews::connection::UnderlyingConnection>(self,
config: Config,
handler: impl mews::handler::IntoHandler<C, T>
) -> WebSocket<C> {
handler: H
) -> WebSocket<C>
where
H: FnOnce(Connection<C>) -> F + Send + Sync + 'static,
F: std::future::Future<Output = ()> + Send + 'static,
{
let (sign, session) = mews::WebSocketContext::new(self.sec_websocket_key)
.with(config)
.on_upgrade(handler);
Expand All @@ -100,11 +104,12 @@ const _: () = {
///
/// *example.rs*
/// ```
/// use ohkami::ws::{WebSocketContext, WebSocket, Connection};
/// use ohkami::ws::{WebSocketContext, WebSocket};
///
/// async fn ws(ctx: WebSocketContext<'_>) -> WebSocket {
/// ctx.upgrade(|mut conn: Connection| async move {
/// ctx.upgrade(|mut conn| async move {
/// conn.send("Hello, WebSocket! and bye...").await
/// .expect("failed to send")
/// })
/// }
/// ```
Expand All @@ -116,20 +121,32 @@ const _: () = {
/// # use tokio::{join, spawn};
/// # use tokio::time::{Duration, sleep};
/// #
/// use ohkami::ws::{WebSocketContext, WebSocket, Message, ReadHalf, WriteHalf};
/// use ohkami::ws::{WebSocketContext, WebSocket, Message};
///
/// async fn ws(ctx: WebSocketContext<'_>) -> WebSocket {
/// ctx.upgrade(|mut r: ReadHalf, mut w: WriteHalf| async {
/// ctx.upgrade(|c| async {
/// let (mut r, mut w) = c.split();
/// tokio::join!( /* joining is necessary to prevent resource leak or unsafe situations */
/// tokio::spawn(async move {
/// while let Some(Message::Text(
/// text
/// )) = r.recv().await.expect("failed to recieve") {
/// println!("[->] {text}");
/// if text == "close" {break}
/// }
/// }),
/// tokio::spawn(async move {
/// for text in ["abc", "def", "ghi", "jk", "lmno", "pqr", "stuvw", "xyz"] {
/// for text in [
/// "abc",
/// "def",
/// "ghi",
/// "jk",
/// "lmno",
/// "pqr",
/// "stuvw",
/// "xyz"
/// ] {
/// println!("[<-] {text}");
/// w.send(text).await.expect("failed to send text");
/// sleep(Duration::from_secs(1)).await;
/// }
Expand Down

0 comments on commit a25781e

Please sign in to comment.