diff --git a/src/network/server.rs b/src/network/server.rs index fc947290..8e534a11 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -4,9 +4,12 @@ use crate::proto; use crate::proto::hub_service_server::HubService; use crate::proto::Block; use crate::proto::DbStats; +use crate::proto::GetInfoByFidRequest; +use crate::proto::GetInfoByFidResponse; use crate::proto::GetInfoRequest; use crate::proto::GetInfoResponse; use crate::proto::HubEvent; +use crate::proto::MessageType; use crate::proto::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest}; use crate::storage::constants::OnChainEventPostfix; use crate::storage::constants::RootPrefix; @@ -15,6 +18,7 @@ use crate::storage::db::RocksDbTransactionBatch; use crate::storage::store::engine::{MempoolMessage, Senders, ShardEngine}; use crate::storage::store::stores::{StoreLimits, Stores}; use crate::storage::store::BlockStore; +use crate::storage::trie::merkle_trie::TrieKey; use crate::utils::statsd_wrapper::StatsdClientWrapper; use hex::ToHex; use std::collections::HashMap; @@ -303,6 +307,44 @@ impl HubService for MyHubService { })) } + async fn get_info_by_fid( + &self, + request: Request, + ) -> Result, Status> { + let message_types = [ + MessageType::CastAdd, + MessageType::CastRemove, + MessageType::ReactionAdd, + MessageType::ReactionRemove, + MessageType::LinkAdd, + MessageType::LinkRemove, + MessageType::VerificationAddEthAddress, + MessageType::VerificationRemove, + MessageType::UserDataAdd, + MessageType::UsernameProof, + MessageType::FrameAction, + MessageType::LinkCompactState, + ]; + let mut num_messages_by_message_type = HashMap::new(); + let fid = request.get_ref().fid; + for (_, shard_store) in self.shard_stores.iter() { + for message_type in message_types { + num_messages_by_message_type.insert( + message_type as u32, + shard_store.trie.get_count( + &shard_store.db, + &mut RocksDbTransactionBatch::new(), + &TrieKey::for_message_type(fid, message_type as u8), + ), + ); + } + } + + Ok(Response::new(GetInfoByFidResponse { + num_messages_by_message_type, + })) + } + type SubscribeStream = ReceiverStream>; async fn subscribe( diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index 977e1f7b..e849c82a 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -51,6 +51,14 @@ message GetInfoResponse { DbStats db_stats = 5; } +message GetInfoByFidRequest { + uint32 fid = 1; +} + +message GetInfoByFidResponse { + map num_messages_by_message_type = 3; +} + service HubService { rpc SubmitMessage(Message) returns (Message); rpc SubmitMessageWithOptions(SubmitMessageRequest) returns (SubmitMessageResponse); @@ -58,4 +66,5 @@ service HubService { rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse); rpc Subscribe(SubscribeRequest) returns (stream HubEvent); rpc GetInfo(GetInfoRequest) returns (GetInfoResponse); + rpc GetInfoByFid(GetInfoByFidRequest) returns (GetInfoByFidResponse); };