From 310de73e66af76890a71425d2645f13afb2d938e Mon Sep 17 00:00:00 2001 From: Linfeng Qian Date: Sat, 18 Nov 2023 22:10:26 +0800 Subject: [PATCH] fix: flush after webosocket sink send --- akasa-core/src/protocols/mqtt/v5/message.rs | 4 ++-- akasa-core/src/server/mod.rs | 4 ++++ 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/akasa-core/src/protocols/mqtt/v5/message.rs b/akasa-core/src/protocols/mqtt/v5/message.rs index 6bf5af6..111976e 100644 --- a/akasa-core/src/protocols/mqtt/v5/message.rs +++ b/akasa-core/src/protocols/mqtt/v5/message.rs @@ -135,7 +135,7 @@ async fn handle_online< let packet = match Connect::decode_with_protocol(&mut conn, header, protocol) .or(async { - log::info!("connection timeout: {}", peer); + log::info!("connection timeout when decode connect packet: {}", peer); let _ = timeout_receiver.recv_async().await; Err(Error::IoError(io::ErrorKind::TimedOut, String::new()).into()) }) @@ -212,7 +212,7 @@ async fn handle_online< } } .or(async { - log::info!("connection timeout: {}", peer); + log::info!("connection timeout when decode auth packet: {}", peer); let _ = timeout_receiver.recv_async().await; Err(io::Error::from(io::ErrorKind::TimedOut)) }) diff --git a/akasa-core/src/server/mod.rs b/akasa-core/src/server/mod.rs index 361d582..3b703fa 100644 --- a/akasa-core/src/server/mod.rs +++ b/akasa-core/src/server/mod.rs @@ -439,6 +439,10 @@ impl AsyncWrite for WebSocketWrapper { log::debug!("WebSocket write error: {:?}", err); return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into())); } + let _ignore = Pin::new(&mut *stream) + .as_mut() + .poll_flush(cx) + .map_err::(|_| Into::into(io::ErrorKind::BrokenPipe))?; Poll::Ready(Ok(buf.len())) } }