From ba193a6c6a76e616d37b4b56b6e8665bb34b2e59 Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Fri, 27 May 2016 00:24:51 -0400 Subject: [PATCH 1/2] added UnixSeqpacket and UnixSeqpacketListener, minor refactoring to consolidate into Inner --- src/lib.rs | 438 +++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 406 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 42cf0ef..3f41988 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -163,6 +163,26 @@ impl Inner { Ok(Some(io::Error::from_raw_os_error(errno))) } } + + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + unsafe { + let count = try!(cvt_s(libc::recv(self.0, + buf.as_mut_ptr() as *mut _, + buf.len(), + 0))); + Ok(count as usize) + } + } + + pub fn send(&self, buf: &[u8]) -> io::Result { + unsafe { + let count = try!(cvt_s(libc::send(self.0, + buf.as_ptr() as *const _, + buf.len(), + 0))); + Ok(count as usize) + } + } } unsafe fn sockaddr_un>(path: P) -> io::Result<(libc::sockaddr_un, libc::socklen_t)> { @@ -579,10 +599,7 @@ impl io::Read for UnixStream { impl<'a> io::Read for &'a UnixStream { fn read(&mut self, buf: &mut [u8]) -> io::Result { - unsafe { - cvt_s(libc::recv(self.inner.0, buf.as_mut_ptr() as *mut _, buf.len(), 0)) - .map(|r| r as usize) - } + self.inner.recv(buf) } } @@ -598,10 +615,7 @@ impl io::Write for UnixStream { impl<'a> io::Write for &'a UnixStream { fn write(&mut self, buf: &[u8]) -> io::Result { - unsafe { - cvt_s(libc::send(self.inner.0, buf.as_ptr() as *const _, buf.len(), 0)) - .map(|r| r as usize) - } + self.inner.send(buf) } fn flush(&mut self) -> io::Result<()> { @@ -629,7 +643,175 @@ impl IntoRawFd for UnixStream { } } -/// A structure representing a Unix domain socket server. + +/// A structure representing a Unix domain seqpacket socket server. +/// +/// # Examples +/// +/// ```rust,no_run +/// use std::thread; +/// use unix_socket::{UnixSeqpacket, UnixSeqpacketListener}; +/// +/// fn handle_client(_stream: UnixSeqpacket) { +/// // ... +/// } +/// +/// let listener = UnixSeqpacketListener::bind("/path/to/the/socket").unwrap(); +/// +/// // accept connections and process them, spawning a new thread for each one +/// for sock in listener.incoming() { +/// match sock { +/// Ok(sock) => { +/// /* connection succeeded */ +/// thread::spawn(|| handle_client(sock)); +/// } +/// Err(_err) => { +/// /* connection failed */ +/// break; +/// } +/// } +/// } +/// +/// // close the listener socket +/// drop(listener); +/// ``` +pub struct UnixSeqpacketListener { + inner: Inner, +} + +impl fmt::Debug for UnixSeqpacketListener { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut builder = fmt.debug_struct("UnixSeqpacketListener"); + builder.field("fd", &self.inner.0); + if let Ok(addr) = self.local_addr() { + builder.field("local", &addr); + } + builder.finish() + } +} + +impl UnixSeqpacketListener { + /// Creates a new `UnixSeqpacketListener` bound to the specified socket. + /// + /// Linux provides, as a nonportable extension, a separate "abstract" + /// address namespace as opposed to filesystem-based addressing. If `path` + /// begins with a null byte, it will be interpreted as an "abstract" + /// address. Otherwise, it will be interpreted as a "pathname" address, + /// corresponding to a path on the filesystem. + pub fn bind>(path: P) -> io::Result { + unsafe { + let inner = try!(Inner::new(libc::SOCK_SEQPACKET)); + let (addr, len) = try!(sockaddr_un(path)); + + try!(cvt(libc::bind(inner.0, &addr as *const _ as *const _, len))); + try!(cvt(libc::listen(inner.0, 128))); + + Ok(UnixSeqpacketListener { inner: inner }) + } + } + + /// Accepts a new incoming connection to this listener. + /// + /// This function will block the calling thread until a new Unix connection + /// is established. When established, the corersponding `UnixSeqpacket` and + /// the remote peer's address will be returned. + pub fn accept(&self) -> io::Result<(UnixSeqpacket, SocketAddr)> { + unsafe { + let mut fd = 0; + let addr = try!(SocketAddr::new(|addr, len| { + fd = libc::accept(self.inner.0, addr, len); + fd + })); + + Ok((UnixSeqpacket { inner: Inner(fd) }, addr)) + } + } + + /// Creates a new independently owned handle to the underlying socket. + /// + /// The returned `UnixSeqpacketListener` is a reference to the same socket that this + /// object references. Both handles can be used to accept incoming + /// connections and options set on one listener will affect the other. + pub fn try_clone(&self) -> io::Result { + Ok(UnixSeqpacketListener { inner: try!(self.inner.try_clone()) }) + } + + /// Returns the local socket address of this listener. + pub fn local_addr(&self) -> io::Result { + SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) }) + } + + /// Moves the socket into or out of nonblocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.inner.set_nonblocking(nonblocking) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.inner.take_error() + } + + /// Returns an iterator over incoming connections. + /// + /// The iterator will never return `None` and will also not yield the + /// peer's `SocketAddr` structure. + pub fn incoming<'a>(&'a self) -> IncomingSeqpacket<'a> { + IncomingSeqpacket { listener: self } + } +} + +impl AsRawFd for UnixSeqpacketListener { + fn as_raw_fd(&self) -> RawFd { + self.inner.0 + } +} + +impl FromRawFd for UnixSeqpacketListener { + unsafe fn from_raw_fd(fd: RawFd) -> UnixSeqpacketListener { + UnixSeqpacketListener { inner: Inner(fd) } + } +} + +impl IntoRawFd for UnixSeqpacketListener { + fn into_raw_fd(self) -> RawFd { + let fd = self.inner.0; + mem::forget(self); + fd + } +} + +impl<'a> IntoIterator for &'a UnixSeqpacketListener { + type Item = io::Result; + type IntoIter = IncomingSeqpacket<'a>; + + fn into_iter(self) -> IncomingSeqpacket<'a> { + self.incoming() + } +} + +/// An iterator over incoming connections to a `UnixSeqpacketListener`. +/// +/// It will never return `None`. +#[derive(Debug)] +pub struct IncomingSeqpacket<'a> { + listener: &'a UnixSeqpacketListener, +} + +impl<'a> Iterator for IncomingSeqpacket<'a> { + type Item = io::Result; + + fn next(&mut self) -> Option> { + Some(self.listener.accept().map(|s| s.0)) + } + + fn size_hint(&self) -> (usize, Option) { + (usize::max_value(), None) + } +} + + + +/// A structure representing a Unix domain stream socket server. /// /// # Examples /// @@ -637,7 +819,7 @@ impl IntoRawFd for UnixStream { /// use std::thread; /// use unix_socket::{UnixStream, UnixListener}; /// -/// fn handle_client(stream: UnixStream) { +/// fn handle_client(_stream: UnixStream) { /// // ... /// } /// @@ -650,7 +832,7 @@ impl IntoRawFd for UnixStream { /// /* connection succeeded */ /// thread::spawn(|| handle_client(stream)); /// } -/// Err(err) => { +/// Err(_err) => { /// /* connection failed */ /// break; /// } @@ -827,8 +1009,8 @@ impl UnixListener { /// } /// } /// ``` - pub fn incoming<'a>(&'a self) -> Incoming<'a> { - Incoming { listener: self } + pub fn incoming<'a>(&'a self) -> IncomingStream<'a> { + IncomingStream { listener: self } } } @@ -854,9 +1036,9 @@ impl IntoRawFd for UnixListener { impl<'a> IntoIterator for &'a UnixListener { type Item = io::Result; - type IntoIter = Incoming<'a>; + type IntoIter = IncomingStream<'a>; - fn into_iter(self) -> Incoming<'a> { + fn into_iter(self) -> IncomingStream<'a> { self.incoming() } } @@ -865,11 +1047,11 @@ impl<'a> IntoIterator for &'a UnixListener { /// /// It will never return `None`. #[derive(Debug)] -pub struct Incoming<'a> { +pub struct IncomingStream<'a> { listener: &'a UnixListener, } -impl<'a> Iterator for Incoming<'a> { +impl<'a> Iterator for IncomingStream<'a> { type Item = io::Result; fn next(&mut self) -> Option> { @@ -1100,13 +1282,7 @@ impl UnixDatagram { /// println!("we received {:?}", &buf[..count]); /// ``` pub fn recv(&self, buf: &mut [u8]) -> io::Result { - unsafe { - let count = try!(cvt_s(libc::recv(self.inner.0, - buf.as_mut_ptr() as *mut _, - buf.len(), - 0))); - Ok(count as usize) - } + self.inner.recv(buf) } /// Sends data on the socket to the specified address. @@ -1174,13 +1350,7 @@ impl UnixDatagram { /// } /// ``` pub fn send(&self, buf: &[u8]) -> io::Result { - unsafe { - let count = try!(cvt_s(libc::send(self.inner.0, - buf.as_ptr() as *const _, - buf.len(), - 0))); - Ok(count as usize) - } + self.inner.send(buf) } /// Sets the read timeout for the socket. @@ -1327,6 +1497,177 @@ impl IntoRawFd for UnixDatagram { } } +/// A Unix seqpacket socket. +/// +/// A Unix Seqpacket socket is connection oriented but sends and receives +/// datagrams with guaranteed ordering. +/// +/// # Examples +/// +/// ```rust,no_run +/// use unix_socket::UnixSeqpacket; +/// +/// let path = "/path/to/my/socket"; +/// let socket = UnixSeqpacket::connect(path).unwrap(); +/// let _count = socket.send(b"hello world").unwrap(); +/// let mut buf = [0; 100]; +/// let count = socket.recv(&mut buf).unwrap(); +/// println!("socket {:?} sent {:?}", path, &buf[..count]); +/// ``` +pub struct UnixSeqpacket { + inner: Inner, +} + +impl fmt::Debug for UnixSeqpacket { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { + let mut builder = fmt.debug_struct("UnixSeqpacket"); + builder.field("fd", &self.inner.0); + if let Ok(addr) = self.local_addr() { + builder.field("local", &addr); + } + if let Ok(addr) = self.peer_addr() { + builder.field("peer", &addr); + } + builder.finish() + } +} + +impl UnixSeqpacket { + /// Connects to the socket named by `path`. + /// + /// Linux provides, as a nonportable extension, a separate "abstract" + /// address namespace as opposed to filesystem-based addressing. If `path` + /// begins with a null byte, it will be interpreted as an "abstract" + /// address. Otherwise, it will be interpreted as a "pathname" address, + /// corresponding to a path on the filesystem. + pub fn connect>(path: P) -> io::Result { + unsafe { + let inner = try!(Inner::new(libc::SOCK_SEQPACKET)); + let (addr, len) = try!(sockaddr_un(path)); + + let ret = libc::connect(inner.0, &addr as *const _ as *const _, len); + if ret < 0 { + Err(io::Error::last_os_error()) + } else { + Ok(UnixSeqpacket { inner: inner }) + } + } + } + + /// Create an unnamed pair of connected sockets. + /// + /// Returns two `UnixSeqpackets`s which are connected to each other. + pub fn pair() -> io::Result<(UnixSeqpacket, UnixSeqpacket)> { + let (i1, i2) = try!(Inner::new_pair(libc::SOCK_SEQPACKET)); + Ok((UnixSeqpacket { inner: i1 }, UnixSeqpacket { inner: i2 })) + } + + /// Creates a new independently owned handle to the underlying socket. + /// + /// The returned `UnixListener` is a reference to the same socket that this + /// object references. Both handles can be used to accept incoming + /// connections and options set on one listener will affect the other. + pub fn try_clone(&self) -> io::Result { + Ok(UnixSeqpacket { inner: try!(self.inner.try_clone()) }) + } + + /// Returns the address of this socket. + pub fn local_addr(&self) -> io::Result { + SocketAddr::new(|addr, len| unsafe { libc::getsockname(self.inner.0, addr, len) }) + } + + /// Returns the address of this socket's peer. + /// + /// Returns the SocketAddr (path) of the peer of this connected socket + pub fn peer_addr(&self) -> io::Result { + SocketAddr::new(|addr, len| unsafe { libc::getpeername(self.inner.0, addr, len) }) + } + + /// Receives data from the socket from the connected peer. + /// + /// On success, returns the number of bytes read. + pub fn recv(&self, buf: &mut [u8]) -> io::Result { + self.inner.recv(buf) + } + + /// Sends data on the socket to the socket's peer. + /// + /// will return an error if the socket has not already been connected. + /// + /// On success, returns the number of bytes written. + pub fn send(&self, buf: &[u8]) -> io::Result { + self.inner.send(buf) + } + + /// Sets the read timeout for the socket. + /// + /// If the provided value is `None`, then `recv` and `recv_from` calls will + /// block indefinitely. It is an error to pass the zero `Duration` to this + /// method. + pub fn set_read_timeout(&self, timeout: Option) -> io::Result<()> { + self.inner.set_timeout(timeout, libc::SO_RCVTIMEO) + } + + /// Sets the write timeout for the socket. + /// + /// If the provided value is `None`, then `send` and `send_to` calls will + /// block indefinitely. It is an error to pass the zero `Duration` to this + /// method. + pub fn set_write_timeout(&self, timeout: Option) -> io::Result<()> { + self.inner.set_timeout(timeout, libc::SO_SNDTIMEO) + } + + /// Returns the read timeout of this socket. + pub fn read_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_RCVTIMEO) + } + + /// Returns the write timeout of this socket. + pub fn write_timeout(&self) -> io::Result> { + self.inner.timeout(libc::SO_SNDTIMEO) + } + + /// Moves the socket into or out of nonblocking mode. + pub fn set_nonblocking(&self, nonblocking: bool) -> io::Result<()> { + self.inner.set_nonblocking(nonblocking) + } + + /// Returns the value of the `SO_ERROR` option. + pub fn take_error(&self) -> io::Result> { + self.inner.take_error() + } + + /// Shut down the read, write, or both halves of this connection. + /// + /// This function will cause all pending and future I/O calls on the + /// specified portions to immediately return with an appropriate value + /// (see the documentation of `Shutdown`). + pub fn shutdown(&self, how: Shutdown) -> io::Result<()> { + self.inner.shutdown(how) + } +} + +impl AsRawFd for UnixSeqpacket { + fn as_raw_fd(&self) -> RawFd { + self.inner.0 + } +} + +impl FromRawFd for UnixSeqpacket { + unsafe fn from_raw_fd(fd: RawFd) -> UnixSeqpacket { + UnixSeqpacket { inner: Inner(fd) } + } +} + +impl IntoRawFd for UnixSeqpacket { + fn into_raw_fd(self) -> RawFd { + let fd = self.inner.0; + mem::forget(self); + fd + } +} + + #[cfg(test)] mod test { extern crate tempdir; @@ -1351,7 +1692,7 @@ mod test { } #[test] - fn basic() { + fn basic_stream() { let dir = or_panic!(TempDir::new("unix_socket")); let socket_path = dir.path().join("sock"); let msg1 = b"hello"; @@ -1378,6 +1719,39 @@ mod test { thread.join().unwrap(); } + #[test] + fn basic_seqpacket() { + let dir = or_panic!(TempDir::new("unix_socket")); + let socket_path = dir.path().join("sock"); + let msg1 = b"hello"; + let msg2 = b"world!"; + + let listener = or_panic!(UnixSeqpacketListener::bind(&socket_path)); + let thread = thread::spawn(move || { + let stream = or_panic!(listener.accept()).0; + let mut buf = [0; 5]; + let res = or_panic!(stream.recv(&mut buf)); + println!("recv in thread result was {}", res); + assert_eq!(&msg1[..], &buf[..]); + let res = or_panic!(stream.send(msg2)); + println!("send in thread result was {}", res); + }); + + let stream = or_panic!(UnixSeqpacket::connect(&socket_path)); + assert_eq!(Some(&*socket_path), + stream.peer_addr().unwrap().as_pathname()); + let res = or_panic!(stream.send(msg1)); + println!("outer send result was {}", res); + let mut buf = vec![0,0,0,0,0,0]; + let res = or_panic!(stream.recv(&mut buf)); + println!("outer recv result was {}", res); + assert_eq!(&msg2[..], &buf[..]); + drop(stream); + + thread.join().unwrap(); + } + + #[test] fn pair() { let msg1 = b"hello"; From 25f505760db3adaa25b71500e5d5752eddab92df Mon Sep 17 00:00:00 2001 From: Rick Richardson Date: Fri, 10 Jun 2016 11:39:20 -0400 Subject: [PATCH 2/2] bumped dependency on libc to 0.2.12 --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 186b4f0..5dd1b4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,7 +10,7 @@ readme = "README.md" keywords = ["posix", "unix", "socket", "domain"] [dependencies] -libc = "0.2.1" +libc = "0.2.12" [dev-dependencies] tempdir = "0.3"