diff --git a/src/network/server.rs b/src/network/server.rs index 8bce7fc4..dc0e912d 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -4,7 +4,9 @@ use crate::core::error::HubError; use crate::proto::hub_event::HubEvent; use crate::proto::msg as message; use crate::proto::rpc::hub_service_server::HubService; -use crate::proto::rpc::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest}; +use crate::proto::rpc::{ + BlocksRequest, LegacyMessage, ShardChunksRequest, ShardChunksResponse, SubscribeRequest, +}; use crate::proto::snapchain::Block; use crate::storage::db::PageOptions; use crate::storage::store::engine::{MempoolMessage, Senders}; @@ -12,6 +14,7 @@ use crate::storage::store::stores::Stores; use crate::storage::store::BlockStore; use crate::utils::statsd_wrapper::StatsdClientWrapper; use hex::ToHex; +use prost::Message; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -47,6 +50,49 @@ impl MyHubService { #[tonic::async_trait] impl HubService for MyHubService { + async fn submit_legacy_message( + &self, + request: Request, + ) -> Result, Status> { + let start_time = std::time::Instant::now(); + + info!("Received call to [submit_legacy_message] RPC"); + + let message = message::Message::decode(request.get_ref().data.as_slice()) + .map_err(|err| Status::from_error(Box::new(err)))?; + + if message.data_bytes.is_none() { + return Err(Status::from_error(Box::new(HubError::validation_failure( + "legacy message must have data_bytes", + )))); + } + + let result = self + .message_tx + .send(MempoolMessage::UserMessage(message.clone())) + .await; + + // Use the same metrics-- we will use legacy messages for performance tests + match result { + Ok(_) => { + self.statsd_client.count("rpc.submit_message.success", 1); + } + Err(_) => { + self.statsd_client.count("rpc.submit_message.failure", 1); + return Err(Status::internal("failed to submit message")); + } + } + + let elapsed = start_time.elapsed().as_millis(); + + let response = Response::new(request.into_inner()); + + self.statsd_client + .time("rpc.submit_message.duration", elapsed as u64); + + Ok(response) + } + async fn submit_message( &self, request: Request, diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index 4bd63b24..8f42315b 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -29,8 +29,16 @@ message SubscribeRequest { optional uint32 shard_index = 5; } +/** + * This message exists to support uploading messages from existing hubs to new snapchain nodes via HubRpcClient + */ +message LegacyMessage { + bytes data = 1; +} + service HubService { rpc SubmitMessage(msg.Message) returns (msg.Message); + rpc SubmitLegacyMessage(LegacyMessage) returns (LegacyMessage); rpc GetBlocks(BlocksRequest) returns (stream snapchain.Block); rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse); rpc Subscribe(SubscribeRequest) returns (stream hub_event.HubEvent);