diff --git a/Cargo.lock b/Cargo.lock index 24234de9d5..0fc5dda42b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2338,6 +2338,20 @@ dependencies = [ "tonic", ] +[[package]] +name = "oak_client_tonic" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "futures", + "log", + "oak_client", + "oak_proto_rust", + "prost", + "tonic", +] + [[package]] name = "oak_containers_agent" version = "0.1.0" @@ -2390,6 +2404,7 @@ dependencies = [ "hyper-util", "log", "oak_client", + "oak_client_tonic", "oak_containers_launcher", "oak_crypto", "oak_functions_test_utils", @@ -2698,6 +2713,7 @@ dependencies = [ "log", "micro_rpc", "oak_client", + "oak_client_tonic", "oak_functions_abi", "oak_grpc", "prost", diff --git a/Cargo.toml b/Cargo.toml index 64d3a15b70..4ff17c09eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "oak_attestation_verification", "oak_channel", "oak_client", + "oak_client/tonic", "oak_containers/agent", "oak_containers/examples/hello_world/untrusted_app", "oak_containers/examples/hello_world/web_client", @@ -103,6 +104,7 @@ oak_attestation_explain = { path = "./oak_attestation_explain" } oak_attestation_verification = { path = "./oak_attestation_verification" } oak_channel = { path = "./oak_channel" } oak_client = { path = "./oak_client" } +oak_client_tonic = { path = "./oak_client/tonic" } oak_containers_agent = { path = "./oak_containers/agent" } oak_containers_orchestrator = { path = "./oak_containers/orchestrator" } oak_containers_attestation = { path = "./oak_containers/attestation" } diff --git a/oak_client/BUILD b/oak_client/BUILD index 606ecc85b9..e19c04f613 100644 --- a/oak_client/BUILD +++ b/oak_client/BUILD @@ -29,9 +29,7 @@ rust_library( "src/transport.rs", "src/verifier.rs", ], - crate_features = [ - "grpc_streaming_transport_implementation", - ], + crate_features = [], proc_macro_deps = [ "@oak_crates_index//:async-trait", ], @@ -43,7 +41,5 @@ rust_library( "@oak_crates_index//:anyhow", "@oak_crates_index//:futures", "@oak_crates_index//:futures-util", - "@oak_crates_index//:prost", - "@oak_crates_index//:tonic", ], ) diff --git a/oak_client/Cargo.toml b/oak_client/Cargo.toml index 625ca39d6a..00f3a9d44a 100644 --- a/oak_client/Cargo.toml +++ b/oak_client/Cargo.toml @@ -5,10 +5,6 @@ authors = ["Ivan Petrov "] edition = "2021" license = "Apache-2.0" -[features] -default = ["grpc_streaming_transport_implementation"] -grpc_streaming_transport_implementation = ["tonic"] - [dependencies] anyhow = "*" async-trait = "*" diff --git a/oak_client/src/transport.rs b/oak_client/src/transport.rs index 0eb60763e4..0bc301cf8a 100644 --- a/oak_client/src/transport.rs +++ b/oak_client/src/transport.rs @@ -14,88 +14,11 @@ // limitations under the License. // -#[cfg(feature = "grpc_streaming_transport_implementation")] -use std::future::Future; - -#[cfg(feature = "grpc_streaming_transport_implementation")] -use anyhow::Context; -#[cfg(feature = "grpc_streaming_transport_implementation")] -use futures::channel::mpsc; -#[cfg(feature = "grpc_streaming_transport_implementation")] -use oak_proto_rust::oak::session::v1::{ - request_wrapper, response_wrapper, RequestWrapper, ResponseWrapper, -}; -#[cfg(feature = "grpc_streaming_transport_implementation")] -use oak_proto_rust::oak::session::v1::{ - GetEndorsedEvidenceRequest, GetEndorsedEvidenceResponse, InvokeRequest, InvokeResponse, -}; use oak_proto_rust::oak::{ crypto::v1::{EncryptedRequest, EncryptedResponse}, session::v1::EndorsedEvidence, }; -/// A [Transport] implementation that uses a single gRPC streaming session to -/// get the evidence and then invokve the desired request. -#[cfg(feature = "grpc_streaming_transport_implementation")] -pub struct GrpcStreamingTransport { - response_stream: tonic::Streaming, - request_tx_channel: mpsc::Sender, -} - -#[cfg(feature = "grpc_streaming_transport_implementation")] -impl GrpcStreamingTransport { - /// Create a new [GrpcStreamingTransport]. - /// - /// The provided stream_creator will be immediately invokved to create a new - /// stream session, and all actions on the newly created instance will use - /// that stream. If the stream dies, a new client/transport pair will need - /// to be created. - /// - /// The `stream_creator` is a closure to create a new stream. Typically this - /// will be used with a gRPC client instance that exposes a - /// bi-directional streaming method. - /// - /// For example, if you have a gRPC service like: - /// ``` - /// service { - /// rpc MyMethod(stream RequestWrapper) returns (stream ResponseWrapper); - /// } - /// ``` - /// - /// You will call: - /// ``` - /// let transport = - /// GrpcStreamingTransport::new(|rx| client.my_method(rx)) - /// ``` - pub async fn new( - stream_creator: impl FnOnce(mpsc::Receiver) -> Fut, - ) -> anyhow::Result - where - Fut: Future>>>, - { - let (tx, rx) = mpsc::channel(10); - - let response_stream = - stream_creator(rx).await.context("couldn't send stream request")?.into_inner(); - - Ok(Self { response_stream, request_tx_channel: tx }) - } - - async fn send_and_receive( - &mut self, - request: request_wrapper::Request, - ) -> anyhow::Result { - self.request_tx_channel - .try_send(RequestWrapper { request: Some(request) }) - .context("Couldn't send request")?; - self.response_stream - .message() - .await - .context("received empty response stream")? - .context("empty response") - } -} - #[async_trait::async_trait(?Send)] pub trait Transport { async fn invoke( @@ -104,54 +27,7 @@ pub trait Transport { ) -> anyhow::Result; } -#[cfg(feature = "grpc_streaming_transport_implementation")] -#[async_trait::async_trait(?Send)] -impl Transport for GrpcStreamingTransport { - async fn invoke( - &mut self, - encrypted_request: &EncryptedRequest, - ) -> anyhow::Result { - let response_wrapper: ResponseWrapper = self - .send_and_receive(request_wrapper::Request::InvokeRequest(InvokeRequest { - encrypted_request: Some(encrypted_request.clone()), - })) - .await - .context("Sending invoke request")?; - - match response_wrapper.response { - Some(response_wrapper::Response::InvokeResponse(InvokeResponse { - encrypted_response: Some(encrypted_response), - })) => Ok(encrypted_response), - _ => Err(anyhow::anyhow!( - "response_wrapper does not have a valid invoke_response message" - )), - } - } -} - #[async_trait::async_trait(?Send)] pub trait EvidenceProvider { async fn get_endorsed_evidence(&mut self) -> anyhow::Result; } - -#[cfg(feature = "grpc_streaming_transport_implementation")] -#[async_trait::async_trait(?Send)] -impl EvidenceProvider for GrpcStreamingTransport { - async fn get_endorsed_evidence(&mut self) -> anyhow::Result { - let response_wrapper = self - .send_and_receive(request_wrapper::Request::GetEndorsedEvidenceRequest( - GetEndorsedEvidenceRequest {}, - )) - .await - .context("Sending get_endorsed_evidence request")?; - - match response_wrapper.response { - Some(response_wrapper::Response::GetEndorsedEvidenceResponse( - GetEndorsedEvidenceResponse { endorsed_evidence: Some(endorsed_evidence) }, - )) => Ok(endorsed_evidence), - _ => Err(anyhow::anyhow!( - "response_wrapper does not have a valid invoke_response message" - )), - } - } -} diff --git a/oak_client/tonic/BUILD b/oak_client/tonic/BUILD new file mode 100644 index 0000000000..2646701b43 --- /dev/null +++ b/oak_client/tonic/BUILD @@ -0,0 +1,44 @@ +# +# Copyright 2024 The Project Oak Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +load("@rules_rust//rust:defs.bzl", "rust_library") + +package( + default_visibility = ["//:internal"], + licenses = ["notice"], +) + +rust_library( + name = "oak_client_tonic", + srcs = [ + "lib.rs", + "transport.rs", + ], + proc_macro_deps = [ + "@oak_crates_index//:async-trait", + ], + deps = [ + "//oak_attestation_verification", + "//oak_client", + "//oak_crypto", + "//oak_proto_rust", + "//oak_proto_rust/grpc", + "@oak_crates_index//:anyhow", + "@oak_crates_index//:futures", + "@oak_crates_index//:futures-util", + "@oak_crates_index//:tonic", + ], +) diff --git a/oak_client/tonic/Cargo.toml b/oak_client/tonic/Cargo.toml new file mode 100644 index 0000000000..24dc1f64a8 --- /dev/null +++ b/oak_client/tonic/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "oak_client_tonic" +version = "0.1.0" +authors = ["Ivan Petrov "] +edition = "2021" +license = "Apache-2.0" + +[lib] +path = "lib.rs" + +[dependencies] +anyhow = "*" +async-trait = "*" +futures = "*" +log = "*" +oak_client = { workspace = true } +oak_proto_rust = { workspace = true } +prost = { workspace = true } +tonic = { workspace = true } diff --git a/oak_client/tonic/lib.rs b/oak_client/tonic/lib.rs new file mode 100644 index 0000000000..6df089053f --- /dev/null +++ b/oak_client/tonic/lib.rs @@ -0,0 +1,16 @@ +// +// Copyright 2024 The Project Oak Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod transport; diff --git a/oak_client/tonic/transport.rs b/oak_client/tonic/transport.rs new file mode 100644 index 0000000000..8dd0de6e95 --- /dev/null +++ b/oak_client/tonic/transport.rs @@ -0,0 +1,132 @@ +// +// Copyright 2024 The Project Oak Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::future::Future; + +use anyhow::Context; +use futures::channel::mpsc; +use oak_client::transport::{EvidenceProvider, Transport}; +use oak_proto_rust::oak::{ + crypto::v1::{EncryptedRequest, EncryptedResponse}, + session::v1::{ + request_wrapper, response_wrapper, EndorsedEvidence, GetEndorsedEvidenceRequest, + GetEndorsedEvidenceResponse, InvokeRequest, InvokeResponse, RequestWrapper, + ResponseWrapper, + }, +}; + +/// A [Transport] implementation that uses a single gRPC streaming session +/// to get the evidence and then invokve the desired request. +pub struct GrpcStreamingTransport { + response_stream: tonic::Streaming, + request_tx_channel: mpsc::Sender, +} + +impl GrpcStreamingTransport { + /// Create a new [GrpcStreamingTransport]. + /// + /// The provided stream_creator will be immediately invokved to create a + /// new stream session, and all actions on the newly created + /// instance will use that stream. If the stream dies, a new + /// client/transport pair will need to be created. + /// + /// The `stream_creator` is a closure to create a new stream. Typically + /// this will be used with a gRPC client instance that exposes a + /// bi-directional streaming method. + /// + /// For example, if you have a gRPC service like: + /// ``` + /// service { + /// rpc MyMethod(stream RequestWrapper) returns (stream ResponseWrapper); + /// } + /// ``` + /// + /// You will call: + /// ``` + /// let transport = + /// GrpcStreamingTransport::new(|rx| client.my_method(rx)) + /// ``` + pub async fn new( + stream_creator: impl FnOnce(mpsc::Receiver) -> Fut, + ) -> anyhow::Result + where + Fut: Future>>>, + { + let (tx, rx) = mpsc::channel(10); + + let response_stream = + stream_creator(rx).await.context("couldn't send stream request")?.into_inner(); + + Ok(Self { response_stream, request_tx_channel: tx }) + } + + async fn send_and_receive( + &mut self, + request: request_wrapper::Request, + ) -> anyhow::Result { + self.request_tx_channel + .try_send(RequestWrapper { request: Some(request) }) + .context("Couldn't send request")?; + self.response_stream + .message() + .await + .context("received empty response stream")? + .context("empty response") + } +} +#[async_trait::async_trait(?Send)] +impl Transport for GrpcStreamingTransport { + async fn invoke( + &mut self, + encrypted_request: &EncryptedRequest, + ) -> anyhow::Result { + let response_wrapper: ResponseWrapper = self + .send_and_receive(request_wrapper::Request::InvokeRequest(InvokeRequest { + encrypted_request: Some(encrypted_request.clone()), + })) + .await + .context("Sending invoke request")?; + + match response_wrapper.response { + Some(response_wrapper::Response::InvokeResponse(InvokeResponse { + encrypted_response: Some(encrypted_response), + })) => Ok(encrypted_response), + _ => Err(anyhow::anyhow!( + "response_wrapper does not have a valid invoke_response message" + )), + } + } +} + +#[async_trait::async_trait(?Send)] +impl EvidenceProvider for GrpcStreamingTransport { + async fn get_endorsed_evidence(&mut self) -> anyhow::Result { + let response_wrapper = self + .send_and_receive(request_wrapper::Request::GetEndorsedEvidenceRequest( + GetEndorsedEvidenceRequest {}, + )) + .await + .context("Sending get_endorsed_evidence request")?; + + match response_wrapper.response { + Some(response_wrapper::Response::GetEndorsedEvidenceResponse( + GetEndorsedEvidenceResponse { endorsed_evidence: Some(endorsed_evidence) }, + )) => Ok(endorsed_evidence), + _ => Err(anyhow::anyhow!( + "response_wrapper does not have a valid invoke_response message" + )), + } + } +} diff --git a/oak_containers/examples/hello_world/trusted_app/BUILD b/oak_containers/examples/hello_world/trusted_app/BUILD index f82f91f7ca..eddbfcc4ea 100644 --- a/oak_containers/examples/hello_world/trusted_app/BUILD +++ b/oak_containers/examples/hello_world/trusted_app/BUILD @@ -54,6 +54,7 @@ rust_test( deps = [ ":lib", "//oak_client", + "//oak_client/tonic:oak_client_tonic", "//oak_containers/examples/hello_world/proto:hello_world_rust_proto", "//oak_containers_sdk", "//oak_crypto", diff --git a/oak_containers/examples/hello_world/trusted_app/tests/standalone_test.rs b/oak_containers/examples/hello_world/trusted_app/tests/standalone_test.rs index 030e3bd291..bc79d777be 100644 --- a/oak_containers/examples/hello_world/trusted_app/tests/standalone_test.rs +++ b/oak_containers/examples/hello_world/trusted_app/tests/standalone_test.rs @@ -16,9 +16,8 @@ use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use anyhow::{Context, Result}; -use oak_client::{ - client::OakClient, transport::GrpcStreamingTransport, verifier::InsecureAttestationVerifier, -}; +use oak_client::{client::OakClient, verifier::InsecureAttestationVerifier}; +use oak_client_tonic::transport::GrpcStreamingTransport; use oak_containers_sdk::{ standalone::StandaloneOrchestrator, OakSessionContext, OrchestratorInterface, }; diff --git a/oak_containers/examples/hello_world/untrusted_app/Cargo.toml b/oak_containers/examples/hello_world/untrusted_app/Cargo.toml index e2388b3438..8485dec245 100644 --- a/oak_containers/examples/hello_world/untrusted_app/Cargo.toml +++ b/oak_containers/examples/hello_world/untrusted_app/Cargo.toml @@ -21,6 +21,7 @@ hyper = { version = "1.4.1", features = ["full"] } hyper-util = { version = "*", features = ["full"] } log = "*" oak_client = { workspace = true } +oak_client_tonic = { workspace = true } oak_containers_launcher = { workspace = true } oak_crypto = { workspace = true } oak_grpc = { workspace = true } diff --git a/oak_containers/examples/hello_world/untrusted_app/tests/integration_test.rs b/oak_containers/examples/hello_world/untrusted_app/tests/integration_test.rs index 84dda0a5e7..5297249cb3 100644 --- a/oak_containers/examples/hello_world/untrusted_app/tests/integration_test.rs +++ b/oak_containers/examples/hello_world/untrusted_app/tests/integration_test.rs @@ -25,9 +25,10 @@ use std::{ use oak_client::{ client::OakClient, - transport::{EvidenceProvider, GrpcStreamingTransport, Transport}, + transport::{EvidenceProvider, Transport}, verifier::InsecureAttestationVerifier, }; +use oak_client_tonic::transport::GrpcStreamingTransport; use oak_containers_hello_world_untrusted_app::demo_transport::{self, DemoTransport}; use oak_containers_launcher::Args; use oak_grpc::oak::session::v1::streaming_session_client::StreamingSessionClient; diff --git a/oak_functions_client/BUILD b/oak_functions_client/BUILD index 65f6d4f199..51bc8e24ad 100644 --- a/oak_functions_client/BUILD +++ b/oak_functions_client/BUILD @@ -28,6 +28,7 @@ rust_library( deps = [ "//micro_rpc", "//oak_client", + "//oak_client/tonic:oak_client_tonic", "//oak_proto_rust/grpc", "@oak_crates_index//:anyhow", "@oak_crates_index//:prost", diff --git a/oak_functions_client/Cargo.toml b/oak_functions_client/Cargo.toml index 183d045873..172893dd5f 100644 --- a/oak_functions_client/Cargo.toml +++ b/oak_functions_client/Cargo.toml @@ -14,6 +14,7 @@ http = "*" log = "*" micro_rpc = { workspace = true } oak_client = { workspace = true } +oak_client_tonic = { workspace = true } oak_functions_abi = { workspace = true } oak_grpc = { workspace = true } prost = { workspace = true } diff --git a/oak_functions_client/src/lib.rs b/oak_functions_client/src/lib.rs index 7b761f81be..a3e0850e50 100644 --- a/oak_functions_client/src/lib.rs +++ b/oak_functions_client/src/lib.rs @@ -14,9 +14,8 @@ // limitations under the License. use anyhow::Context; -use oak_client::{ - client::OakClient, transport::GrpcStreamingTransport, verifier::AttestationVerifier, -}; +use oak_client::{client::OakClient, verifier::AttestationVerifier}; +use oak_client_tonic::transport::GrpcStreamingTransport; use oak_grpc::oak::session::v1::streaming_session_client::StreamingSessionClient; use prost::Message; use tonic::transport::Channel;