Skip to content

Commit

Permalink
Fix: use unary RPCs instead of server-side streaming (#19)
Browse files Browse the repository at this point in the history
Problem: server-side streaming does not actually change anything related
to connection management (gRPC manages one long-lived HTTP/2 connection
for all
RPCs). Therefore, unary RPCs are simpler and a better fit.
  • Loading branch information
Olivier Desenfans authored Nov 28, 2023
1 parent f7a5428 commit fe41308
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 42 deletions.
26 changes: 9 additions & 17 deletions madara-prover-rpc-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,9 @@
use tonic::codegen::tokio_stream::StreamExt;
use tonic::{Status, Streaming};
use tonic::Status;

use crate::prover::prover_client::ProverClient;
use crate::prover::{ExecutionRequest, ExecutionResponse, ProverRequest, ProverResponse};
use madara_prover_common::models::{Proof, ProverConfig, ProverParameters, PublicInput};

async fn wait_for_streamed_response<ResponseType>(
stream: Streaming<ResponseType>,
) -> Result<ResponseType, Status> {
if let Some(response) = stream.take(1).next().await {
return response;
}

Err(Status::cancelled("server-side stream was dropped"))
}
use crate::prover::prover_client::ProverClient;
use crate::prover::{ExecutionRequest, ExecutionResponse, ProverRequest, ProverResponse};

/// Execute a program in proof mode and retrieve the execution artifacts.
pub async fn execute_program(
Expand All @@ -25,8 +15,10 @@ pub async fn execute_program(
prover_config: None,
prover_parameters: None,
});
let execution_stream = client.execute(request).await?.into_inner();
wait_for_streamed_response(execution_stream).await
client
.execute(request)
.await
.map(|response| response.into_inner())
}

fn unpack_prover_response(prover_result: Result<ProverResponse, Status>) -> Result<Proof, Status> {
Expand Down Expand Up @@ -57,8 +49,8 @@ pub async fn prove_execution(
prover_config: prover_config_str,
prover_parameters: prover_parameters_str,
});
let prover_stream = client.prove(request).await?.into_inner();
let prover_result = wait_for_streamed_response(prover_stream).await;
let prover_response = client.prove(request).await;
let prover_result = prover_response.map(|response| response.into_inner());
unpack_prover_response(prover_result)
}

Expand Down
32 changes: 9 additions & 23 deletions madara-prover-rpc-server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::path::Path;

use tokio::net::UnixListener;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::wrappers::UnixListenerStream;
use tonic::{transport::Server, Request, Response, Status};

Expand Down Expand Up @@ -93,30 +92,22 @@ pub struct ProverService {}

#[tonic::async_trait]
impl Prover for ProverService {
type ExecuteStream = ReceiverStream<Result<ExecutionResponse, Status>>;

async fn execute(
&self,
request: Request<ExecutionRequest>,
) -> Result<Response<Self::ExecuteStream>, Status> {
) -> Result<Response<ExecutionResponse>, Status> {
let execution_request = request.into_inner();
let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
let execution_result = run_cairo_program_in_proof_mode(&execution_request.program);
let execution_result = format_execution_result(execution_result);
let _ = tx.send(execution_result).await;
});
let execution_result = run_cairo_program_in_proof_mode(&execution_request.program);
let execution_result = format_execution_result(execution_result);

Ok(Response::new(ReceiverStream::new(rx)))
execution_result.map(Response::new)
}

type ProveStream = ReceiverStream<Result<ProverResponse, Status>>;

async fn prove(
&self,
request: Request<ProverRequest>,
) -> Result<Response<Self::ProveStream>, Status> {
) -> Result<Response<ProverResponse>, Status> {
let ProverRequest {
public_input: public_input_str,
memory,
Expand All @@ -138,16 +129,11 @@ impl Prover for ProverService {
trace,
};

let (tx, rx) = tokio::sync::mpsc::channel(1);

tokio::spawn(async move {
let prover_result =
call_prover(&execution_artifacts, &prover_config, &prover_parameters).await;
let formatted_result = format_prover_result(prover_result);
let _ = tx.send(formatted_result).await;
});
let prover_result =
call_prover(&execution_artifacts, &prover_config, &prover_parameters).await;
let formatted_result = format_prover_result(prover_result);

Ok(Response::new(ReceiverStream::new(rx)))
formatted_result.map(Response::new)
}

async fn execute_and_prove(
Expand Down
4 changes: 2 additions & 2 deletions protocols/prover.proto
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ syntax = "proto3";
package prover;

service Prover {
rpc Execute(ExecutionRequest) returns (stream ExecutionResponse);
rpc Prove (ProverRequest) returns (stream ProverResponse);
rpc Execute(ExecutionRequest) returns (ExecutionResponse);
rpc Prove (ProverRequest) returns (ProverResponse);
rpc ExecuteAndProve(ExecutionRequest) returns (ProverResponse);
}

Expand Down

0 comments on commit fe41308

Please sign in to comment.