Skip to content

Commit

Permalink
fix: flush after webosocket sink send
Browse files Browse the repository at this point in the history
  • Loading branch information
TheWaWaR committed Nov 18, 2023
1 parent b29749a commit 310de73
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 2 deletions.
4 changes: 2 additions & 2 deletions akasa-core/src/protocols/mqtt/v5/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async fn handle_online<

let packet = match Connect::decode_with_protocol(&mut conn, header, protocol)
.or(async {
log::info!("connection timeout: {}", peer);
log::info!("connection timeout when decode connect packet: {}", peer);
let _ = timeout_receiver.recv_async().await;
Err(Error::IoError(io::ErrorKind::TimedOut, String::new()).into())
})
Expand Down Expand Up @@ -212,7 +212,7 @@ async fn handle_online<
}
}
.or(async {
log::info!("connection timeout: {}", peer);
log::info!("connection timeout when decode auth packet: {}", peer);
let _ = timeout_receiver.recv_async().await;
Err(io::Error::from(io::ErrorKind::TimedOut))
})
Expand Down
4 changes: 4 additions & 0 deletions akasa-core/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,10 @@ impl<S: AsyncRead + AsyncWrite + Unpin> AsyncWrite for WebSocketWrapper<S> {
log::debug!("WebSocket write error: {:?}", err);
return Poll::Ready(Err(io::ErrorKind::BrokenPipe.into()));
}
let _ignore = Pin::new(&mut *stream)
.as_mut()
.poll_flush(cx)
.map_err::<io::Error, _>(|_| Into::into(io::ErrorKind::BrokenPipe))?;
Poll::Ready(Ok(buf.len()))
}
}
Expand Down

0 comments on commit 310de73

Please sign in to comment.