diff --git a/capnp-rpc/src/lib.rs b/capnp-rpc/src/lib.rs index 61828a6c3..48767990b 100644 --- a/capnp-rpc/src/lib.rs +++ b/capnp-rpc/src/lib.rs @@ -462,7 +462,7 @@ where /// Converts a promise for a client into a client that queues up any calls that arrive /// before the promise resolves. -// TODO: figure out a better way to allow construction of promise clients. +#[deprecated(since = "0.20.2", note = "use `new_deferred_client()` instead")] pub fn new_promise_client(client_promise: F) -> T where T: ::capnp::capability::FromClientHook, @@ -482,6 +482,28 @@ where T::new(Box::new(queued_client)) } +/// Creates a `Client` from a future that resolves to a `Client`. +/// +/// Any calls that arrive before the resolution are accumulated in a queue. +pub fn new_deferred_client( + client_future: impl ::futures::Future> + 'static, +) -> T +where + T: ::capnp::capability::FromClientHook, +{ + let mut queued_client = crate::queued::Client::new(None); + let weak_client = Rc::downgrade(&queued_client.inner); + + queued_client.drive(client_future.then(move |r| { + if let Some(queued_inner) = weak_client.upgrade() { + crate::queued::ClientInner::resolve(&queued_inner, r.map(|c| c.into_client_hook())); + } + Promise::ok(()) + })); + + T::new(Box::new(queued_client)) +} + struct SystemTaskReaper; impl crate::task_set::TaskReaper for SystemTaskReaper { fn task_failed(&mut self, error: Error) { diff --git a/capnp-rpc/src/queued.rs b/capnp-rpc/src/queued.rs index 701c0b3bc..506becc8f 100644 --- a/capnp-rpc/src/queued.rs +++ b/capnp-rpc/src/queued.rs @@ -241,7 +241,7 @@ impl Client { pub fn drive(&mut self, promise: F) where - F: Future> + 'static + Unpin, + F: Future> + 'static, { assert!(self.inner.borrow().promise_to_drive.is_none()); self.inner.borrow_mut().promise_to_drive = Some(Promise::from_future(promise).shared()); diff --git a/capnp-rpc/test/impls.rs b/capnp-rpc/test/impls.rs index 88174a7d1..e8f92dc73 100644 --- a/capnp-rpc/test/impls.rs +++ b/capnp-rpc/test/impls.rs @@ -745,9 +745,8 @@ impl test_promise_resolve::resolver::Server for ResolverImpl { return Promise::err(Error::failed("no sender".into())); }; let (snd, rcv) = oneshot::channel(); - let _ = sender.send(capnp_rpc::new_promise_client( - rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())) - .map_ok(|x: test_interface::Client| x.client), + let _ = sender.send(capnp_rpc::new_deferred_client( + rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())), )); self.sender = Some(snd); Promise::ok(()) @@ -777,9 +776,8 @@ impl test_promise_resolve::Server for TestPromiseResolveImpl { let (snd, rcv) = oneshot::channel(); let resolver = ResolverImpl { sender: Some(snd) }; let mut results_root = results.get(); - results_root.set_cap(capnp_rpc::new_promise_client( - rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())) - .map_ok(|x| x.client), + results_root.set_cap(capnp_rpc::new_deferred_client( + rcv.map_err(|_| Error::failed("oneshot was canceled".to_string())), )); results_root.set_resolver(capnp_rpc::new_client(resolver)); Promise::ok(()) diff --git a/capnp-rpc/test/reconnect_test.rs b/capnp-rpc/test/reconnect_test.rs index e42164edb..4c0160b56 100644 --- a/capnp-rpc/test/reconnect_test.rs +++ b/capnp-rpc/test/reconnect_test.rs @@ -5,7 +5,7 @@ use std::rc::Rc; use capnp::capability::{Promise, Response}; use capnp::Error; use capnp_rpc::{ - auto_reconnect, lazy_auto_reconnect, new_client, new_promise_client, pry, rpc_twoparty_capnp, + auto_reconnect, lazy_auto_reconnect, new_client, new_deferred_client, pry, rpc_twoparty_capnp, twoparty, RpcSystem, }; use futures::channel::oneshot; @@ -314,8 +314,8 @@ fn auto_reconnect_rpc_call() { do_autoconnect_test(&mut pool, |c| { b.set_interface(c); let req = client.test_interface_request(); - new_promise_client(req.send().promise.map(|resp| match resp { - Ok(resp) => Ok(resp.get()?.get_cap()?.client), + new_deferred_client(req.send().promise.map(|resp| match resp { + Ok(resp) => Ok(resp.get()?.get_cap()?), Err(err) => Err(err), })) }) diff --git a/capnp-rpc/test/test.rs b/capnp-rpc/test/test.rs index 4fb0d360f..a6c203707 100644 --- a/capnp-rpc/test/test.rs +++ b/capnp-rpc/test/test.rs @@ -605,7 +605,7 @@ fn promise_resolve() { let (paf_fulfiller, paf_promise) = oneshot::channel(); let cap: crate::test_capnp::test_interface::Client = - ::capnp_rpc::new_promise_client(paf_promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(paf_promise.map_err(canceled_to_error)); request.get().set_cap(cap.clone()); request2.get().set_cap(cap); @@ -620,9 +620,7 @@ fn promise_resolve() { let _response = client2.get_call_sequence_request().send().promise.await?; let server = impls::TestInterface::new(); - let _ = paf_fulfiller.send( - capnp_rpc::new_client::(server).client, - ); + let _ = paf_fulfiller.send(capnp_rpc::new_client(server)); let response = promise.await?; if response.get()?.get_s()? != "bar" { @@ -815,7 +813,7 @@ fn dont_hold() { let (fulfiller, promise) = oneshot::channel(); let cap: crate::test_capnp::test_interface::Client = - ::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error)); let mut request = client.dont_hold_request(); request.get().set_cap(cap.clone()); @@ -913,7 +911,7 @@ fn embargo_error() { let (fulfiller, promise) = oneshot::channel(); let cap: crate::test_capnp::test_call_order::Client = - ::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error)); let client2: crate::test_capnp::test_call_order::Client = client.clone().cast_to(); let early_call = client2.get_call_sequence_request().send(); @@ -958,7 +956,7 @@ fn echo_destruction() { let (fulfiller, promise) = oneshot::channel(); let cap: crate::test_capnp::test_call_order::Client = - ::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error)); let client2: crate::test_capnp::test_call_order::Client = client.clone().cast_to(); let early_call = client2.get_call_sequence_request().send(); @@ -1104,15 +1102,15 @@ fn capability_server_set() { // Also works if the client is a promise. let (fulfiller, promise) = oneshot::channel(); let client_promise: test_interface::Client = - ::capnp_rpc::new_promise_client(promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(promise.map_err(canceled_to_error)); let client_promise2: test_interface::Client = client_promise.clone(); let (error_fulfiller, error_promise) = oneshot::channel(); let error_promise: test_interface::Client = - ::capnp_rpc::new_promise_client(error_promise.map_err(canceled_to_error)); + ::capnp_rpc::new_deferred_client(error_promise.map_err(canceled_to_error)); - assert!(fulfiller.send(client1.client).is_ok()); + assert!(fulfiller.send(client1).is_ok()); let own_server1_again2 = futures::executor::block_on(set1.get_local_server(&client_promise)).unwrap(); assert_eq!(