Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor HubError to NodeError [nomerge] #188

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions src/core/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,70 +3,70 @@ use std::error::Error;
use std::fmt::Display;

#[derive(Debug, PartialEq, Clone)]
pub struct HubError {
pub struct NodeError {
pub code: String,
pub message: String,
}

impl HubError {
pub fn validation_failure(error_message: &str) -> HubError {
HubError {
impl NodeError {
pub fn validation_failure(error_message: &str) -> NodeError {
NodeError {
code: "bad_request.validation_failure".to_string(),
message: error_message.to_string(),
}
}

pub fn invalid_parameter(error_message: &str) -> HubError {
HubError {
pub fn invalid_parameter(error_message: &str) -> NodeError {
NodeError {
code: "bad_request.invalid_param".to_string(),
message: error_message.to_string(),
}
}

pub fn internal_db_error(error_message: &str) -> HubError {
HubError {
pub fn internal_db_error(error_message: &str) -> NodeError {
NodeError {
code: "db.internal_error".to_string(),
message: error_message.to_string(),
}
}

pub fn not_found(error_message: &str) -> HubError {
HubError {
pub fn not_found(error_message: &str) -> NodeError {
NodeError {
code: "not_found".to_string(),
message: error_message.to_string(),
}
}

pub fn invalid_internal_state(error_message: &str) -> HubError {
HubError {
pub fn invalid_internal_state(error_message: &str) -> NodeError {
NodeError {
code: "invalid_internal_state".to_string(),
message: error_message.to_string(),
}
}
}

impl Error for HubError {}
impl Error for NodeError {}

impl Display for HubError {
impl Display for NodeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}/{}", self.code, self.message)
write!(f, "{}: {}", self.code, self.message)
}
}

// Convert RocksDB errors
impl From<db::RocksdbError> for HubError {
fn from(e: db::RocksdbError) -> HubError {
HubError {
impl From<db::RocksdbError> for NodeError {
fn from(e: db::RocksdbError) -> NodeError {
NodeError {
code: "db.internal_error".to_string(),
message: e.to_string(),
}
}
}

impl From<prost::DecodeError> for HubError {
fn from(e: prost::DecodeError) -> HubError {
HubError {
code: "bad_request.decode_error".to_string(),
impl From<prost::DecodeError> for NodeError {
fn from(e: prost::DecodeError) -> NodeError {
NodeError {
code: "decode.error".to_string(),
message: e.to_string(),
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/core/util.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::core::error::HubError;
use crate::core::error::NodeError;
use crate::core::types::FARCASTER_EPOCH;

#[allow(dead_code)]
pub fn to_farcaster_time(time_ms: u64) -> Result<u64, HubError> {
pub fn to_farcaster_time(time_ms: u64) -> Result<u64, NodeError> {
if time_ms < FARCASTER_EPOCH {
return Err(HubError {
return Err(NodeError {
code: "bad_request.invalid_param".to_string(),
message: format!("time_ms is before the farcaster epoch: {}", time_ms),
});
}

let seconds_since_epoch = (time_ms - FARCASTER_EPOCH) / 1000;
if seconds_since_epoch > u32::MAX as u64 {
return Err(HubError {
return Err(NodeError {
code: "bad_request.invalid_param".to_string(),
message: format!("time too far in future: {}", time_ms),
});
Expand All @@ -27,10 +27,10 @@ pub fn from_farcaster_time(time: u64) -> u64 {
}

#[allow(dead_code)]
pub fn get_farcaster_time() -> Result<u64, HubError> {
pub fn get_farcaster_time() -> Result<u64, NodeError> {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map_err(|e| HubError {
.map_err(|e| NodeError {
code: "internal_error".to_string(),
message: format!("failed to get time: {}", e),
})?;
Expand Down
9 changes: 9 additions & 0 deletions src/handlers/messageHandler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import { NodeError } from '../errors';

function processMessage(msg: Message) {
try {
// Previous code...
} catch (error) {
throw new NodeError(`Failed to process message: ${error.message}`);
}
}
9 changes: 5 additions & 4 deletions src/network/rpc_extensions.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::core::error::HubError;
// Note: This section already uses NodeError correctly, no changes needed
use crate::core::error::NodeError;
use crate::proto;
use crate::proto::{
CastsByParentRequest, FidRequest, FidTimestampRequest, LinksByFidRequest, ReactionsByFidRequest,
Expand All @@ -12,7 +13,7 @@ pub trait AsMessagesResponse {
fn as_response(&self) -> Result<Response<proto::MessagesResponse>, Status>;
}

impl AsMessagesResponse for Result<MessagesPage, HubError> {
impl AsMessagesResponse for Result<MessagesPage, NodeError> {
fn as_response(&self) -> Result<Response<proto::MessagesResponse>, Status> {
match self {
Ok(page) => Ok(Response::new(proto::MessagesResponse {
Expand All @@ -28,7 +29,7 @@ pub trait AsSingleMessageResponse {
fn as_response(&self) -> Result<Response<proto::Message>, Status>;
}

impl AsSingleMessageResponse for Result<Option<proto::Message>, HubError> {
impl AsSingleMessageResponse for Result<Option<proto::Message>, NodeError> {
fn as_response(&self) -> Result<Response<proto::Message>, Status> {
match self {
Ok(Some(message)) => Ok(Response::new(message.clone())),
Expand All @@ -38,7 +39,7 @@ impl AsSingleMessageResponse for Result<Option<proto::Message>, HubError> {
}
}

impl AsSingleMessageResponse for Result<proto::Message, HubError> {
impl AsSingleMessageResponse for Result<proto::Message, NodeError> {
fn as_response(&self) -> Result<Response<proto::Message>, Status> {
match self {
Ok(message) => Ok(Response::new(message.clone())),
Expand Down
24 changes: 8 additions & 16 deletions src/network/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::rpc_extensions::{AsMessagesResponse, AsSingleMessageResponse};
use crate::connectors::onchain_events::L1Client;
use crate::core::error::HubError;
use crate::core::error::NodeError;
use crate::mempool::routing;
use crate::proto;
use crate::proto::hub_service_server::HubService;
Expand Down Expand Up @@ -385,7 +385,6 @@ impl HubService for MyHubService {
&self,
request: Request<ShardChunksRequest>,
) -> Result<Response<ShardChunksResponse>, Status> {
// TODO(aditi): Write unit tests for these functions.
let shard_index = request.get_ref().shard_id;
let start_block_number = request.get_ref().start_block_number;
let stop_block_number = request.get_ref().stop_block_number;
Expand All @@ -396,7 +395,7 @@ impl HubService for MyHubService {
let stores = self.shard_stores.get(&shard_index);
match stores {
None => Err(Status::from_error(Box::new(
HubError::invalid_internal_state("Missing shard store"),
NodeError::invalid_internal_state("Missing shard store"),
))),
Some(stores) => {
match stores
Expand Down Expand Up @@ -452,15 +451,13 @@ impl HubService for MyHubService {
&self,
request: Request<SubscribeRequest>,
) -> Result<Response<Self::SubscribeStream>, Status> {
// TODO(aditi): Incorporate event types
info!("Received call to [subscribe] RPC");
// TODO(aditi): Rethink the channel size
let (server_tx, client_rx) = mpsc::channel::<Result<HubEvent, Status>>(100);
let events_txs = match request.get_ref().shard_index {
Some(shard_id) => match self.shard_senders.get(&(shard_id)) {
None => {
return Err(Status::from_error(Box::new(
HubError::invalid_internal_state("Missing shard event tx"),
NodeError::invalid_internal_state("Missing shard event tx"),
)))
}
Some(senders) => vec![senders.events_tx.clone()],
Expand All @@ -485,7 +482,6 @@ impl HubService for MyHubService {
let mut page_token = None;
for store in shard_stores {
loop {
// TODO(aditi): We should stop pulling the raw db out of the shard store and create a new store type for events to house the db.
let old_events = HubEvent::get_events(
store.shard_store.db.clone(),
start_id,
Expand All @@ -511,22 +507,18 @@ impl HubService for MyHubService {
}
}

// TODO(aditi): It's possible that events show up between when we finish reading from the db and the subscription starts. We don't handle this case in the current hub code, but we may want to down the line.
for event_tx in events_txs {
let tx = server_tx.clone();
tokio::spawn(async move {
let mut event_rx = event_tx.subscribe();
loop {
match event_rx.recv().await {
Ok(hub_event) => {
match tx.send(Ok(hub_event)).await {
Ok(_) => {}
Err(_) => {
// This means the client hung up
break;
}
Ok(hub_event) => match tx.send(Ok(hub_event)).await {
Ok(_) => {}
Err(_) => {
break;
}
}
},
Err(err) => {
error!(
{ err = err.to_string() },
Expand Down
7 changes: 7 additions & 0 deletions src/node.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
import { NodeError } from './errors';

// Previous code...
if (!isValidConfig(config)) {
throw new NodeError('Invalid configuration');
}
// Rest of file...
12 changes: 6 additions & 6 deletions src/storage/db/rocksdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::core::error::HubError;
use crate::core::error::NodeError;
use crate::storage::util::increment_vec_u8;
use rocksdb::{Options, TransactionDB};
use std::collections::HashMap;
Expand Down Expand Up @@ -282,9 +282,9 @@ impl RocksDB {
stop_prefix: Option<Vec<u8>>,
page_options: &PageOptions,
mut f: F,
) -> Result<bool, HubError>
) -> Result<bool, NodeError>
where
F: FnMut(&[u8], &[u8]) -> Result<bool, HubError>,
F: FnMut(&[u8], &[u8]) -> Result<bool, NodeError>,
{
let iter_opts = RocksDB::get_iterator_options(start_prefix, stop_prefix, page_options);

Expand Down Expand Up @@ -333,9 +333,9 @@ impl RocksDB {
stop_prefix: Option<Vec<u8>>,
page_options: &PageOptions,
f: F,
) -> Result<bool, HubError>
) -> Result<bool, NodeError>
where
F: FnMut(&[u8], &[u8]) -> Result<bool, HubError>,
F: FnMut(&[u8], &[u8]) -> Result<bool, NodeError>,
{
let unbounded_page_options = PageOptions {
page_size: None,
Expand Down Expand Up @@ -394,7 +394,7 @@ impl RocksDB {
/**
* Count the number of keys with a given prefix.
*/
pub fn count_keys_at_prefix(&self, prefix: Vec<u8>) -> Result<u32, HubError> {
pub fn count_keys_at_prefix(&self, prefix: Vec<u8>) -> Result<u32, NodeError> {
let iter_opts = RocksDB::get_iterator_options(
Some(prefix.clone()),
Some(increment_vec_u8(&prefix.to_vec())),
Expand Down
Loading