Skip to content

Commit

Permalink
jsonrpc: remove subscriber field from Request & clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
hozan23 committed May 22, 2024
1 parent 7be7f59 commit d51f212
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 92 deletions.
11 changes: 4 additions & 7 deletions jsonrpc/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ impl Client {
method: &str,
params: T,
) -> Result<V> {
let request = self.send_request(method, params, None).await?;
let request = self.send_request(method, params).await?;
debug!("--> {request}");

let response = match self.timeout {
Expand Down Expand Up @@ -84,7 +84,7 @@ impl Client {
method: &str,
params: T,
) -> Result<(SubscriptionID, Subscription)> {
let request = self.send_request(method, params, Some(json!(true))).await?;
let request = self.send_request(method, params).await?;
debug!("--> {request}");

let response = match self.timeout {
Expand Down Expand Up @@ -117,9 +117,7 @@ impl Client {
/// This function sends an unsubscription request for the specified method
/// and subscription ID. It waits for the response to confirm the unsubscription.
pub async fn unsubscribe(&self, method: &str, sub_id: SubscriptionID) -> Result<()> {
let request = self
.send_request(method, json!(sub_id), Some(json!(true)))
.await?;
let request = self.send_request(method, json!(sub_id)).await?;
debug!("--> {request}");

let response = match self.timeout {
Expand All @@ -144,7 +142,6 @@ impl Client {
&self,
method: &str,
params: T,
subscriber: Option<serde_json::Value>,
) -> Result<message::Request> {
let id = random_64();

Expand All @@ -153,7 +150,6 @@ impl Client {
id: json!(id),
method: method.to_string(),
params: json!(params),
subscriber,
};

let req_json = serde_json::to_value(&request)?;
Expand Down Expand Up @@ -187,6 +183,7 @@ impl Client {
let msg = selfc.conn.recv().await?;
if let Ok(res) = serde_json::from_value::<message::Response>(msg.clone()) {
selfc.chan_tx.send(res).await?;

continue;
}

Expand Down
6 changes: 2 additions & 4 deletions jsonrpc/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ pub struct Request {
pub method: String,
pub params: serde_json::Value,
pub id: serde_json::Value,
#[serde(skip_serializing_if = "Option::is_none")]
pub subscriber: Option<serde_json::Value>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down Expand Up @@ -69,8 +67,8 @@ impl std::fmt::Display for Request {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}, subscribe: {:?}}}",
self.jsonrpc, self.method, self.params, self.id, self.subscriber
"{{jsonrpc: {}, method: {}, params: {:?}, id: {:?}}}",
self.jsonrpc, self.method, self.params, self.id,
)
}
}
Expand Down
111 changes: 30 additions & 81 deletions jsonrpc/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ impl Server {
let selfc = self.clone();
self.task_group.spawn(
async move {
let response = selfc._handle_request(channel, msg).await;
let response = selfc.handle_request(channel, msg).await;
debug!("--> {response}");
sender.send(serde_json::json!(response)).await?;
Ok(())
Expand All @@ -230,7 +230,7 @@ impl Server {
}

/// Handles a new request
async fn _handle_request(
async fn handle_request(
&self,
channel: ArcChannel,
msg: serde_json::Value,
Expand All @@ -240,92 +240,41 @@ impl Server {
SanityCheckResult::ErrRes(res) => return res,
};

if req.msg.subscriber.is_some() {
match self.pubsub_services.get(&req.srvc_name) {
Some(s) => {
self.handle_pubsub_request(channel, s, &req.method_name, req.msg)
.await
}
None => pack_err_res(
message::METHOD_NOT_FOUND_ERROR_CODE,
METHOD_NOT_FOUND_ERROR_MSG,
Some(req.msg.id),
),
}
} else {
match self.services.get(&req.srvc_name) {
Some(s) => self.handle_call_request(s, &req.method_name, req.msg).await,
None => pack_err_res(
message::METHOD_NOT_FOUND_ERROR_CODE,
METHOD_NOT_FOUND_ERROR_MSG,
Some(req.msg.id),
),
}
}
}

/// Handles a call request
async fn handle_call_request(
&self,
service: &Arc<dyn RPCService + 'static>,
method_name: &str,
rpc_msg: message::Request,
) -> message::Response {
let method = match service.get_method(method_name) {
Some(m) => m,
None => {
return pack_err_res(
message::METHOD_NOT_FOUND_ERROR_CODE,
METHOD_NOT_FOUND_ERROR_MSG,
Some(rpc_msg.id),
);
}
};

let result = match method(rpc_msg.params.clone()).await {
Ok(res) => res,
Err(err) => return self.handle_error(err, rpc_msg.id),
};

message::Response {
let mut response = message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
result: Some(result),
id: Some(rpc_msg.id),
}
}
result: None,
id: Some(req.msg.id.clone()),
};

/// Handles a pubsub request
async fn handle_pubsub_request(
&self,
channel: ArcChannel,
service: &Arc<dyn PubSubRPCService + 'static>,
method_name: &str,
rpc_msg: message::Request,
) -> message::Response {
let method = match service.get_pubsub_method(method_name) {
Some(m) => m,
None => {
return pack_err_res(
message::METHOD_NOT_FOUND_ERROR_CODE,
METHOD_NOT_FOUND_ERROR_MSG,
Some(rpc_msg.id),
);
if let Some(service) = self.pubsub_services.get(&req.srvc_name) {
if let Some(method) = service.get_pubsub_method(&req.method_name) {
let name = format!("{}.{}", service.name(), req.method_name);
response.result = match method(channel, name, req.msg.params.clone()).await {
Ok(res) => Some(res),
Err(err) => return self.handle_error(err, req.msg.id),
};

return response;
}
};
}

let name = format!("{}.{}", service.name(), method_name);
let result = match method(channel, name, rpc_msg.params.clone()).await {
Ok(res) => res,
Err(err) => return self.handle_error(err, rpc_msg.id),
};
if let Some(service) = self.services.get(&req.srvc_name) {
if let Some(method) = service.get_method(&req.method_name) {
response.result = match method(req.msg.params.clone()).await {
Ok(res) => Some(res),
Err(err) => return self.handle_error(err, req.msg.id),
};

message::Response {
jsonrpc: message::JSONRPC_VERSION.to_string(),
error: None,
result: Some(result),
id: Some(rpc_msg.id),
return response;
}
}

pack_err_res(
message::METHOD_NOT_FOUND_ERROR_CODE,
METHOD_NOT_FOUND_ERROR_MSG,
Some(req.msg.id),
)
}

fn handle_error(&self, err: Error, msg_id: serde_json::Value) -> message::Response {
Expand Down

0 comments on commit d51f212

Please sign in to comment.