Skip to content

Commit

Permalink
Add peek_msg_len()
Browse files Browse the repository at this point in the history
Non-Tokio because there's a risk of blocking there.
  • Loading branch information
kotauskas committed Jun 13, 2024
1 parent 316d130 commit 0735668
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
1 change: 1 addition & 0 deletions src/local_socket/concurrency_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ impl<S: ConcurrencyDetectionSite> ConcurrencyDetector<S> {
Self(AtomicBool::new(false), PhantomData)
}
#[track_caller]
#[must_use]
pub fn lock(&self) -> LockDetectorGuard<'_> {
if self
.0
Expand Down
7 changes: 4 additions & 3 deletions src/os/windows/named_pipe/c_wrappers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::{
os::windows::{
decode_eof,
named_pipe::{PipeMode, WaitTimeout},
winprelude::*,
FileHandle,
Expand Down Expand Up @@ -124,10 +125,9 @@ pub(crate) fn get_np_handle_mode(handle: BorrowedHandle<'_>) -> io::Result<u32>
Ok(mode)
}

#[allow(dead_code)] // TODO(2.3.0) give this thing a public API
pub(crate) fn peek_msg_len(handle: BorrowedHandle<'_>) -> io::Result<usize> {
let mut msglen: u32 = 0;
unsafe {
let rslt = unsafe {
PeekNamedPipe(
handle.as_int_handle(),
ptr::null_mut(),
Expand All @@ -137,7 +137,8 @@ pub(crate) fn peek_msg_len(handle: BorrowedHandle<'_>) -> io::Result<usize> {
msglen.as_mut_ptr(),
)
}
.true_val_or_errno(msglen.to_usize())
.true_val_or_errno(msglen.to_usize());
decode_eof(rslt)
}

fn modes_to_access_flags(recv: Option<PipeMode>, send: Option<PipeMode>) -> u32 {
Expand Down
22 changes: 21 additions & 1 deletion src/os/windows/named_pipe/stream/impl/recv_msg.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use crate::{os::windows::downgrade_eof, RawOsErrorExt};
use crate::{os::windows::downgrade_eof, RawOsErrorExt as _};
use recvmsg::{prelude::*, NoAddrBuf, RecvResult};
use windows_sys::Win32::Foundation::ERROR_MORE_DATA;

Expand All @@ -13,6 +13,11 @@ pub(crate) const DISCARD_BUF_SIZE: usize = {
};

impl RawPipeStream {
fn peek_msg_len(&self) -> io::Result<usize> {
let _guard = self.concurrency_detector.lock();
c_wrappers::peek_msg_len(self.as_handle())
}

#[track_caller]
fn discard_msg(&self) -> io::Result<()> {
let _guard = self.concurrency_detector.lock();
Expand Down Expand Up @@ -100,6 +105,21 @@ impl RawPipeStream {
}
}

impl<Sm: PipeModeTag> PipeStream<pipe_mode::Messages, Sm> {
/// Returns the length of the next incoming message without receiving it or blocking the
/// thread. Note that a return value of `Ok(0)` does not allow the lack of an incoming message
/// to be distinguished from a zero-length message.
///
/// If the message stream has been closed, this returns a
/// [`BrokenPipe`](io::ErrorKind::BrokenPipe) error.
///
/// Interacts with [concurrency prevention](#concurrency-prevention).
#[inline]
pub fn peek_msg_len(&self) -> io::Result<usize> {
self.raw.peek_msg_len()
}
}

/// Interacts with [concurrency prevention](#concurrency-prevention).
impl<Sm: PipeModeTag> RecvMsg for &PipeStream<pipe_mode::Messages, Sm> {
type Error = io::Error;
Expand Down

0 comments on commit 0735668

Please sign in to comment.