Skip to content

Commit

Permalink
add endpoint for message counts by fid (#149)
Browse files Browse the repository at this point in the history
It's useful to have visibility into which message types have differing
counts for reach fid to validate the migration.

```
❯ grpcurl -plaintext -proto src/proto/rpc.proto -import-path src/proto -d '{"fid": 503}' 127.0.0.1:3383 HubService/GetInfoByFid
{
  "numMessagesByMessageType": {
    "1": "100",
    "2": "0",
    "3": "386",
    "4": "17",
    "5": "97",
    "6": "0",
    "7": "0",
    "8": "0",
    "11": "0",
    "12": "0",
    "13": "0",
    "14": "0"
  }
}
```
  • Loading branch information
aditiharini authored Dec 10, 2024
1 parent f9efa4c commit 2305f82
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 0 deletions.
42 changes: 42 additions & 0 deletions src/network/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use crate::proto;
use crate::proto::hub_service_server::HubService;
use crate::proto::Block;
use crate::proto::DbStats;
use crate::proto::GetInfoByFidRequest;
use crate::proto::GetInfoByFidResponse;
use crate::proto::GetInfoRequest;
use crate::proto::GetInfoResponse;
use crate::proto::HubEvent;
use crate::proto::MessageType;
use crate::proto::{BlocksRequest, ShardChunksRequest, ShardChunksResponse, SubscribeRequest};
use crate::storage::constants::OnChainEventPostfix;
use crate::storage::constants::RootPrefix;
Expand All @@ -15,6 +18,7 @@ use crate::storage::db::RocksDbTransactionBatch;
use crate::storage::store::engine::{MempoolMessage, Senders, ShardEngine};
use crate::storage::store::stores::{StoreLimits, Stores};
use crate::storage::store::BlockStore;
use crate::storage::trie::merkle_trie::TrieKey;
use crate::utils::statsd_wrapper::StatsdClientWrapper;
use hex::ToHex;
use std::collections::HashMap;
Expand Down Expand Up @@ -303,6 +307,44 @@ impl HubService for MyHubService {
}))
}

async fn get_info_by_fid(
&self,
request: Request<GetInfoByFidRequest>,
) -> Result<Response<GetInfoByFidResponse>, Status> {
let message_types = [
MessageType::CastAdd,
MessageType::CastRemove,
MessageType::ReactionAdd,
MessageType::ReactionRemove,
MessageType::LinkAdd,
MessageType::LinkRemove,
MessageType::VerificationAddEthAddress,
MessageType::VerificationRemove,
MessageType::UserDataAdd,
MessageType::UsernameProof,
MessageType::FrameAction,
MessageType::LinkCompactState,
];
let mut num_messages_by_message_type = HashMap::new();
let fid = request.get_ref().fid;
for (_, shard_store) in self.shard_stores.iter() {
for message_type in message_types {
num_messages_by_message_type.insert(
message_type as u32,
shard_store.trie.get_count(
&shard_store.db,
&mut RocksDbTransactionBatch::new(),
&TrieKey::for_message_type(fid, message_type as u8),
),
);
}
}

Ok(Response::new(GetInfoByFidResponse {
num_messages_by_message_type,
}))
}

type SubscribeStream = ReceiverStream<Result<HubEvent, Status>>;

async fn subscribe(
Expand Down
9 changes: 9 additions & 0 deletions src/proto/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,20 @@ message GetInfoResponse {
DbStats db_stats = 5;
}

message GetInfoByFidRequest {
uint32 fid = 1;
}

message GetInfoByFidResponse {
map<uint32, uint64> num_messages_by_message_type = 3;
}

service HubService {
rpc SubmitMessage(Message) returns (Message);
rpc SubmitMessageWithOptions(SubmitMessageRequest) returns (SubmitMessageResponse);
rpc GetBlocks(BlocksRequest) returns (stream Block);
rpc GetShardChunks(ShardChunksRequest) returns (ShardChunksResponse);
rpc Subscribe(SubscribeRequest) returns (stream HubEvent);
rpc GetInfo(GetInfoRequest) returns (GetInfoResponse);
rpc GetInfoByFid(GetInfoByFidRequest) returns (GetInfoByFidResponse);
};

0 comments on commit 2305f82

Please sign in to comment.