From 2305f8220c0b723ffcd10a70609a6f3c643300ad Mon Sep 17 00:00:00 2001 From: Aditi Srinivasan Date: Tue, 10 Dec 2024 13:05:59 -0500 Subject: [PATCH] add endpoint for message counts by fid (#149) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit It's useful to have visibility into which message types have differing counts for reach fid to validate the migration. ``` ❯ grpcurl -plaintext -proto src/proto/rpc.proto -import-path src/proto -d '{"fid": 503}' 127.0.0.1:3383 HubService/GetInfoByFid { "numMessagesByMessageType": { "1": "100", "2": "0", "3": "386", "4": "17", "5": "97", "6": "0", "7": "0", "8": "0", "11": "0", "12": "0", "13": "0", "14": "0" } } ``` --- src/network/server.rs | 42 ++++++++++++++++++++++++++++++++++++++++++ src/proto/rpc.proto | 9 +++++++++ 2 files changed, 51 insertions(+) diff --git a/src/network/server.rs b/src/network/server.rs index fc947290..8e534a11 100644 --- a/src/network/server.rs +++ b/src/network/server.rs @@ -4,9 +4,12 @@ use crate::proto; use crate::proto::hub_service_server::HubService; use crate::proto::Block; use crate::proto::DbStats; +use crate::proto::GetInfoByFidRequest; +use crate::proto::GetInfoByFidResponse; use crate::proto::GetInfoRequest; use crate::proto::GetInfoResponse; use crate::proto::HubEvent; +use crate::proto::MessageType; use crate::proto::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest}; use crate::storage::constants::OnChainEventPostfix; use crate::storage::constants::RootPrefix; @@ -15,6 +18,7 @@ use crate::storage::db::RocksDbTransactionBatch; use crate::storage::store::engine::{MempoolMessage, Senders, ShardEngine}; use crate::storage::store::stores::{StoreLimits, Stores}; use crate::storage::store::BlockStore; +use crate::storage::trie::merkle_trie::TrieKey; use crate::utils::statsd_wrapper::StatsdClientWrapper; use hex::ToHex; use std::collections::HashMap; @@ -303,6 +307,44 @@ impl HubService for MyHubService { })) } + async fn get_info_by_fid( + &self, + request: Request, + ) -> Result, Status> { + let message_types = [ + MessageType::CastAdd, + MessageType::CastRemove, + MessageType::ReactionAdd, + MessageType::ReactionRemove, + MessageType::LinkAdd, + MessageType::LinkRemove, + MessageType::VerificationAddEthAddress, + MessageType::VerificationRemove, + MessageType::UserDataAdd, + MessageType::UsernameProof, + MessageType::FrameAction, + MessageType::LinkCompactState, + ]; + let mut num_messages_by_message_type = HashMap::new(); + let fid = request.get_ref().fid; + for (_, shard_store) in self.shard_stores.iter() { + for message_type in message_types { + num_messages_by_message_type.insert( + message_type as u32, + shard_store.trie.get_count( + &shard_store.db, + &mut RocksDbTransactionBatch::new(), + &TrieKey::for_message_type(fid, message_type as u8), + ), + ); + } + } + + Ok(Response::new(GetInfoByFidResponse { + num_messages_by_message_type, + })) + } + type SubscribeStream = ReceiverStream>; async fn subscribe( diff --git a/src/proto/rpc.proto b/src/proto/rpc.proto index 977e1f7b..e849c82a 100644 --- a/src/proto/rpc.proto +++ b/src/proto/rpc.proto @@ -51,6 +51,14 @@ message GetInfoResponse { DbStats db_stats = 5; } +message GetInfoByFidRequest { + uint32 fid = 1; +} + +message GetInfoByFidResponse { + map num_messages_by_message_type = 3; +} + service HubService { rpc SubmitMessage(Message) returns (Message); rpc SubmitMessageWithOptions(SubmitMessageRequest) returns (SubmitMessageResponse); @@ -58,4 +66,5 @@ service HubService { rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse); rpc Subscribe(SubscribeRequest) returns (stream HubEvent); rpc GetInfo(GetInfoRequest) returns (GetInfoResponse); + rpc GetInfoByFid(GetInfoByFidRequest) returns (GetInfoByFidResponse); };