Skip to content

Commit

Permalink
add submitLegacyMessage endpoint (#124)
Browse files Browse the repository at this point in the history
We'll use this endpoint for submitting existing hub messages to
snapchain nodes. This avoids us needing to remove qualified module
names.
  • Loading branch information
aditiharini authored Dec 5, 2024
1 parent dd80b5e commit d7a2df8
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
48 changes: 47 additions & 1 deletion src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ use crate::core::error::HubError;
use crate::proto::hub_event::HubEvent;
use crate::proto::msg as message;
use crate::proto::rpc::hub_service_server::HubService;
use crate::proto::rpc::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest};
use crate::proto::rpc::{
BlocksRequest, LegacyMessage, ShardChunksRequest, ShardChunksResponse, SubscribeRequest,
};
use crate::proto::snapchain::Block;
use crate::storage::db::PageOptions;
use crate::storage::store::engine::{MempoolMessage, Senders};
use crate::storage::store::stores::Stores;
use crate::storage::store::BlockStore;
use crate::utils::statsd_wrapper::StatsdClientWrapper;
use hex::ToHex;
use prost::Message;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tonic::{Request, Response, Status};
Expand Down Expand Up @@ -47,6 +50,49 @@ impl MyHubService {

#[tonic::async_trait]
impl HubService for MyHubService {
async fn submit_legacy_message(
&self,
request: Request<LegacyMessage>,
) -> Result<Response<LegacyMessage>, Status> {
let start_time = std::time::Instant::now();

info!("Received call to [submit_legacy_message] RPC");

let message = message::Message::decode(request.get_ref().data.as_slice())
.map_err(|err| Status::from_error(Box::new(err)))?;

if message.data_bytes.is_none() {
return Err(Status::from_error(Box::new(HubError::validation_failure(
"legacy message must have data_bytes",
))));
}

let result = self
.message_tx
.send(MempoolMessage::UserMessage(message.clone()))
.await;

// Use the same metrics-- we will use legacy messages for performance tests
match result {
Ok(_) => {
self.statsd_client.count("rpc.submit_message.success", 1);
}
Err(_) => {
self.statsd_client.count("rpc.submit_message.failure", 1);
return Err(Status::internal("failed to submit message"));
}
}

let elapsed = start_time.elapsed().as_millis();

let response = Response::new(request.into_inner());

self.statsd_client
.time("rpc.submit_message.duration", elapsed as u64);

Ok(response)
}

async fn submit_message(
&self,
request: Request<message::Message>,
Expand Down
8 changes: 8 additions & 0 deletions src/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,16 @@ message SubscribeRequest {
optional uint32 shard_index = 5;
}

/**
* This message exists to support uploading messages from existing hubs to new snapchain nodes via HubRpcClient
*/
message LegacyMessage {
bytes data = 1;
}

service HubService {
rpc SubmitMessage(msg.Message) returns (msg.Message);
rpc SubmitLegacyMessage(LegacyMessage) returns (LegacyMessage);
rpc GetBlocks(BlocksRequest) returns (stream snapchain.Block);
rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse);
rpc Subscribe(SubscribeRequest) returns (stream hub_event.HubEvent);
Expand Down

0 comments on commit d7a2df8

Please sign in to comment.