From 1e9595b2d7fb4b2263c705ab4ddd6d797c4ad666 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Fri, 10 Jul 2020 15:17:20 -0400 Subject: [PATCH 1/2] Add .idea/ to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 49bdc4424..bff250dca 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,5 @@ Cargo.lock /target/ +# JetBrains +.idea/ From 509a9bb5169b643f51dd730eb56c49cbc7efb4b3 Mon Sep 17 00:00:00 2001 From: Nick Mosher Date: Fri, 10 Jul 2020 15:19:11 -0400 Subject: [PATCH 2/2] WIP: Make capnp-rpc and capnp-futures no_std compatible --- capnp-futures/Cargo.toml | 24 +- capnp-futures/src/async_io/mod.rs | 349 ++++++++++++++++++++++++++++++ capnp-futures/src/lib.rs | 5 + capnp-futures/src/read_stream.rs | 9 +- capnp-futures/src/serialize.rs | 11 +- capnp-futures/src/write_queue.rs | 7 +- capnp-rpc/Cargo.toml | 17 +- capnp-rpc/src/lib.rs | 17 +- capnp-rpc/src/rpc.rs | 3 +- capnp-rpc/src/twoparty.rs | 3 +- capnp/Cargo.toml | 1 - 11 files changed, 414 insertions(+), 32 deletions(-) create mode 100644 capnp-futures/src/async_io/mod.rs diff --git a/capnp-futures/Cargo.toml b/capnp-futures/Cargo.toml index b7046c396..f74513fae 100644 --- a/capnp-futures/Cargo.toml +++ b/capnp-futures/Cargo.toml @@ -11,19 +11,29 @@ edition = "2018" keywords = ["async"] -[dependencies] -capnp = { version = "0.13.0", path = "../capnp" } +[dependencies.capnp] +version = "0.13.0" +path = "../capnp" +default-features = false [dependencies.futures] version = "0.3.0" default-features = false -features = ["std", "executor"] +features = ["alloc"] [dev-dependencies.futures] version = "0.3.0" default-features = false -features = ["executor"] -[dev-dependencies] -capnp = { version = "0.13.0", path = "../capnp", features = ["quickcheck"] } -quickcheck = "0.9" +[dev-dependencies.capnp] +version = "0.13.0" +path = "../capnp" +default-features = false + +#[dev-dependencies] +#capnp = { version = "0.13.0", path = "../capnp", features = ["quickcheck"] } +#quickcheck = "0.9" + +[features] +default = ["std"] +std = ["futures/std", "futures/executor", "capnp/std"] diff --git a/capnp-futures/src/async_io/mod.rs b/capnp-futures/src/async_io/mod.rs new file mode 100644 index 000000000..7899958c6 --- /dev/null +++ b/capnp-futures/src/async_io/mod.rs @@ -0,0 +1,349 @@ + +use core::pin::Pin; +use core::task::Poll; +use core::task::Context; + +use capnp::Result; + +pub use async_ext::AsyncReadExt; +pub use async_ext::AsyncWriteExt; + +pub trait AsyncRead { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) + -> Poll>; +} + +pub trait AsyncBufRead: AsyncRead { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) + -> Poll>; + + fn consume(self: Pin<&mut Self>, amt: usize); +} + +pub trait AsyncWrite { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) + -> Poll>; + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; +} + +#[cfg(feature="std")] +mod std_impls { + use crate::async_io::{AsyncRead, AsyncBufRead, AsyncWrite}; + use capnp::Result; + use futures::task::{Context, Poll}; + use std::pin::Pin; + + impl AsyncRead for R where R: futures::AsyncRead { + fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + match futures::AsyncRead::poll_read(self, cx, buf) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)), + Poll::Pending => Poll::Pending, + } + } + } + + impl AsyncBufRead for R where R: futures::AsyncBufRead { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match futures::AsyncBufRead::poll_fill_buf(self, cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Ready(Ok(buf)) => Poll::Ready(Ok(buf)), + Poll::Pending => Poll::Pending, + } + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + futures::AsyncBufRead::consume(self, amt) + } + } + + impl AsyncWrite for W where W: futures::AsyncWrite { + fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + match futures::AsyncWrite::poll_write(self, cx, buf) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Ready(Ok(n)) => Poll::Ready(Ok(n)), + Poll::Pending => Poll::Pending, + } + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match futures::AsyncWrite::poll_flush(self, cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } + + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match futures::AsyncWrite::poll_close(self, cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + Poll::Ready(Ok(())) => Poll::Ready(Ok(())), + Poll::Pending => Poll::Pending, + } + } + } +} + +#[cfg(not(feature="std"))] +mod no_std_impls { + use crate::async_io::{AsyncRead, AsyncBufRead, AsyncWrite}; + use core::pin::Pin; + use core::task::{Context, Poll}; + use capnp::Result; + + impl AsyncRead for &[u8] { + fn poll_read(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut [u8]) + -> Poll> + { + Poll::Ready(capnp::io::Read::read(&mut *self, buf)) + } + } + + impl AsyncRead for &mut R { + fn poll_read(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut [u8]) -> Poll> { + Pin::new(&mut **self).poll_read(cx, buf) + } + } + + impl AsyncBufRead for &[u8] { + fn poll_fill_buf(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(capnp::io::BufRead::fill_buf(self.get_mut())) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + capnp::io::BufRead::consume(&mut *self, amt) + } + } + + impl AsyncBufRead for &mut R { + fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self.get_mut()).poll_fill_buf(cx) + } + + fn consume(mut self: Pin<&mut Self>, amt: usize) { + Pin::new(&mut **self).consume(amt) + } + } + + impl<'a> AsyncWrite for &'a mut [u8] { + fn poll_write(mut self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(capnp::io::Write::write_all(&mut *self, buf).map(|_| buf.len())) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + impl AsyncWrite for &mut W { + fn poll_write(mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll> { + Pin::new(&mut **self).poll_write(cx, buf) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_flush(cx) + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + Pin::new(&mut **self).poll_close(cx) + } + } +} + +#[cfg(feature="std")] +pub mod async_ext { + pub use futures::AsyncReadExt; + pub use futures::AsyncWriteExt; +} + +#[cfg(not(feature="std"))] +pub mod async_ext { + use core::pin::Pin; + use core::future::Future; + use core::task::{Context, Poll}; + use alloc::string::ToString; + use super::{AsyncRead, AsyncWrite}; + + pub trait AsyncReadExt: AsyncRead { + fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> + where Self: Unpin, + { + Read::new(self, buf) + } + + fn read_exact<'a>( + &'a mut self, + buf: &'a mut [u8], + ) -> ReadExact<'a, Self> + where Self: Unpin, + { + ReadExact::new(self, buf) + } + } + + impl AsyncReadExt for R {} + + pub trait AsyncWriteExt: AsyncWrite { + fn flush(&mut self) -> Flush<'_, Self> + where Self: Unpin, + { + Flush::new(self) + } + + fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> + where Self: Unpin, + { + WriteAll::new(self, buf) + } + } + + impl AsyncWriteExt for W {} + + /// Future for the [`read`](super::AsyncReadExt::read) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Read<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut [u8], + } + + impl Unpin for Read<'_, R> {} + + impl<'a, R: AsyncRead + ?Sized + Unpin> Read<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { + Read { reader, buf } + } + } + + impl Future for Read<'_, R> { + type Output = capnp::Result; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + match Pin::new(&mut this.reader).poll_read(cx, this.buf) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + other => other, + } + } + } + + /// Future for the [`read_exact`](super::AsyncReadExt::read_exact) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct ReadExact<'a, R: ?Sized> { + reader: &'a mut R, + buf: &'a mut [u8], + } + + impl Unpin for ReadExact<'_, R> {} + + impl<'a, R: AsyncRead + ?Sized + Unpin> ReadExact<'a, R> { + pub(super) fn new(reader: &'a mut R, buf: &'a mut [u8]) -> Self { + ReadExact { reader, buf } + } + } + + impl Future for ReadExact<'_, R> { + type Output = capnp::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + while !this.buf.is_empty() { + + let n = match Pin::new(&mut this.reader).poll_read(cx, this.buf) { + Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + Poll::Pending => return Poll::Pending, + }; + + // let n = ready!(Pin::new(&mut this.reader).poll_read(cx, this.buf))?; + { + let (_, rest) = core::mem::replace(&mut this.buf, &mut []).split_at_mut(n); + this.buf = rest; + } + if n == 0 { + return Poll::Ready(Err(capnp::Error::failed("unexpected End of File".to_string()))) + } + } + Poll::Ready(Ok(())) + } + } + + /// Future for the [`flush`](super::AsyncWriteExt::flush) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct Flush<'a, W: ?Sized> { + writer: &'a mut W, + } + + impl Unpin for Flush<'_, W> {} + + impl<'a, W: AsyncWrite + ?Sized + Unpin> Flush<'a, W> { + pub(super) fn new(writer: &'a mut W) -> Self { + Flush { writer } + } + } + + impl Future for Flush<'_, W> + where W: AsyncWrite + ?Sized + Unpin, + { + type Output = capnp::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match Pin::new(&mut *self.writer).poll_flush(cx) { + Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())), + other => other, + } + } + } + + /// Future for the [`write_all`](super::AsyncWriteExt::write_all) method. + #[derive(Debug)] + #[must_use = "futures do nothing unless you `.await` or poll them"] + pub struct WriteAll<'a, W: ?Sized> { + writer: &'a mut W, + buf: &'a [u8], + } + + impl Unpin for WriteAll<'_, W> {} + + impl<'a, W: AsyncWrite + ?Sized + Unpin> WriteAll<'a, W> { + pub(super) fn new(writer: &'a mut W, buf: &'a [u8]) -> Self { + WriteAll { writer, buf } + } + } + + impl Future for WriteAll<'_, W> { + type Output = capnp::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = &mut *self; + while !this.buf.is_empty() { + + let n = match Pin::new(&mut this.writer).poll_write(cx, this.buf) { + Poll::Ready(Ok(n)) => n, + Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())), + Poll::Pending => return Poll::Pending, + }; + + // let n = ready!(Pin::new(&mut this.writer).poll_write(cx, this.buf))?; + { + let (_, rest) = core::mem::replace(&mut this.buf, &[]).split_at(n); + this.buf = rest; + } + if n == 0 { + return Poll::Ready(Err(capnp::Error::failed("Write Zero".to_string()))); + } + } + + Poll::Ready(Ok(())) + } + } +} \ No newline at end of file diff --git a/capnp-futures/src/lib.rs b/capnp-futures/src/lib.rs index d9ef0946c..8ac0607cc 100644 --- a/capnp-futures/src/lib.rs +++ b/capnp-futures/src/lib.rs @@ -18,6 +18,10 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. +#![cfg_attr(not(feature = "std"), no_std)] + +#[macro_use] +extern crate alloc; extern crate futures; pub use read_stream::ReadStream; @@ -26,3 +30,4 @@ pub use write_queue::{write_queue, Sender}; pub mod serialize; mod read_stream; mod write_queue; +pub mod async_io; diff --git a/capnp-futures/src/read_stream.rs b/capnp-futures/src/read_stream.rs index 035030d08..e952e45c8 100644 --- a/capnp-futures/src/read_stream.rs +++ b/capnp-futures/src/read_stream.rs @@ -18,11 +18,14 @@ // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN // THE SOFTWARE. -use std::pin::Pin; -use std::task::{Context, Poll}; +use core::pin::Pin; +use core::task::{Context, Poll}; +use alloc::boxed::Box; + use futures::future::Future; use futures::stream::Stream; -use futures::{AsyncRead}; +use crate::async_io::AsyncRead; +// use futures::{AsyncRead}; use capnp::{Error, message}; diff --git a/capnp-futures/src/serialize.rs b/capnp-futures/src/serialize.rs index f1da8a908..7e8d2d4cf 100644 --- a/capnp-futures/src/serialize.rs +++ b/capnp-futures/src/serialize.rs @@ -21,12 +21,12 @@ //! Asynchronous reading and writing of messages using the //! [standard stream framing](https://capnproto.org/encoding.html#serialization-over-a-stream). -use std::convert::TryInto; +use core::convert::TryInto; use capnp::{message, Error, Result, OutputSegments}; use capnp::serialize::{OwnedSegments, SegmentLengthsBuilder}; -use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; +use crate::async_io::{AsyncRead, AsyncWrite, AsyncReadExt, AsyncWriteExt}; /// Begins an asynchronous read of a message from `reader`. pub async fn read_message(mut reader: R, options: message::ReaderOptions) -> Result>> @@ -144,7 +144,7 @@ impl AsOutputSegments for message::Builder where A: message::Allocator { } }*/ -impl AsOutputSegments for ::std::rc::Rc> where A: message::Allocator { +impl AsOutputSegments for alloc::rc::Rc> where A: message::Allocator { fn as_output_segments<'a>(&'a self) -> OutputSegments<'a> { self.get_segments_for_output() } @@ -160,7 +160,7 @@ pub async fn write_message(mut writer: W, message: M) -> Result<()> Ok(()) } -async fn write_segment_table(mut write: W, segments: &[&[u8]]) -> ::std::io::Result<()> +async fn write_segment_table(mut write: W, segments: &[&[u8]]) -> capnp::Result<()> where W: AsyncWrite + Unpin { let mut buf: [u8; 8] = [0; 8]; @@ -215,7 +215,8 @@ pub mod test { use std::pin::Pin; use std::task::{Context, Poll}; - use futures::{AsyncRead, AsyncWrite}; + // use futures::{AsyncRead, AsyncWrite}; + use crate::async_io::{AsyncRead, AsyncWrite}; use futures::io::Cursor; use quickcheck::{quickcheck, TestResult}; diff --git a/capnp-futures/src/write_queue.rs b/capnp-futures/src/write_queue.rs index 8ae4dcaeb..ebd2f478e 100644 --- a/capnp-futures/src/write_queue.rs +++ b/capnp-futures/src/write_queue.rs @@ -20,11 +20,12 @@ use futures::future::Future; use futures::channel::oneshot; -use futures::{AsyncWrite, AsyncWriteExt, StreamExt, TryFutureExt}; +use futures::{StreamExt, TryFutureExt}; +use crate::async_io::{AsyncWrite, AsyncWriteExt}; -use capnp::{Error}; +use capnp::Error; -use crate::serialize::{AsOutputSegments}; +use crate::serialize::AsOutputSegments; enum Item where M: AsOutputSegments { Message(M, oneshot::Sender), diff --git a/capnp-rpc/Cargo.toml b/capnp-rpc/Cargo.toml index 45f083e41..39103f5af 100644 --- a/capnp-rpc/Cargo.toml +++ b/capnp-rpc/Cargo.toml @@ -21,8 +21,17 @@ path = "src/lib.rs" [dependencies.futures] version = "0.3.0" default-features = false -features = ["std"] -[dependencies] -capnp-futures = { version = "0.13.0", path = "../capnp-futures" } -capnp = {version = "0.13.0", path = "../capnp"} +[dependencies.capnp] +version = "0.13.0" +path = "../capnp" +default-features = false + +[dependencies.capnp-futures] +version = "0.13.0" +path = "../capnp-futures" +default-features = false + +[features] +default = ["std"] +std = ["futures/std", "capnp/std", "capnp-futures/std"] diff --git a/capnp-rpc/src/lib.rs b/capnp-rpc/src/lib.rs index 59f24ae75..e14bde5b4 100644 --- a/capnp-rpc/src/lib.rs +++ b/capnp-rpc/src/lib.rs @@ -58,20 +58,23 @@ //! //! For a more complete example, see https://github.com/capnproto/capnproto-rust/tree/master/capnp-rpc/examples/calculator +#![cfg_attr(not(feature = "std"), no_std)] extern crate capnp; extern crate capnp_futures; extern crate futures; +extern crate alloc; + +use core::pin::Pin; +use core::task::{Context, Poll}; +use core::cell::{RefCell}; +use alloc::rc::Rc; -use std::pin::Pin; -use std::task::{Context, Poll}; use futures::{Future, FutureExt, TryFutureExt}; use futures::channel::oneshot; use capnp::Error; use capnp::capability::Promise; use capnp::private::capability::{ClientHook}; -use std::cell::{RefCell}; -use std::rc::{Rc}; use crate::task_set::TaskSet; pub use crate::rpc::Disconnector; @@ -92,9 +95,9 @@ pub mod rpc_twoparty_capnp; macro_rules! pry { ($expr:expr) => ( match $expr { - ::std::result::Result::Ok(val) => val, - ::std::result::Result::Err(err) => { - return ::capnp::capability::Promise::err(::std::convert::From::from(err)) + ::core::result::Result::Ok(val) => val, + ::core::result::Result::Err(err) => { + return ::capnp::capability::Promise::err(::core::convert::From::from(err)) } }) } diff --git a/capnp-rpc/src/rpc.rs b/capnp-rpc/src/rpc.rs index 11472bc5c..2e7e77914 100644 --- a/capnp-rpc/src/rpc.rs +++ b/capnp-rpc/src/rpc.rs @@ -32,7 +32,8 @@ use futures::{future, Future, FutureExt, TryFutureExt}; use futures::channel::oneshot; use std::vec::Vec; -use std::collections::hash_map::HashMap; +// use std::collections::hash_map::HashMap; +use alloc::collections::BTreeMap as HashMap; use std::collections::binary_heap::BinaryHeap; use std::cell::{Cell, RefCell}; use std::rc::{Rc, Weak}; diff --git a/capnp-rpc/src/twoparty.rs b/capnp-rpc/src/twoparty.rs index a65f64e7b..e712e8839 100644 --- a/capnp-rpc/src/twoparty.rs +++ b/capnp-rpc/src/twoparty.rs @@ -23,7 +23,8 @@ use capnp::message::ReaderOptions; use capnp::capability::Promise; -use futures::{AsyncRead, AsyncWrite, FutureExt, TryFutureExt}; +use futures::{FutureExt, TryFutureExt}; +use capnp_futures::async_io::{AsyncRead, AsyncWrite}; use futures::channel::oneshot; use std::cell::RefCell; diff --git a/capnp/Cargo.toml b/capnp/Cargo.toml index 36629f1ba..f78b0e75d 100644 --- a/capnp/Cargo.toml +++ b/capnp/Cargo.toml @@ -14,7 +14,6 @@ readme = "README.md" keywords = ["encoding", "protocol", "serialization"] [lib] - name = "capnp" path = "src/lib.rs"