Skip to content

Commit

Permalink
tests: add rpc client test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mauve Signweaver committed Nov 28, 2024
1 parent 612cc9d commit 2929b20
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 53 deletions.
8 changes: 8 additions & 0 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,14 @@ impl Backend {
self.join_group(keys).await
}

pub async fn get_route_id_blob(&self) -> Result<Vec<u8>> {
if let Some(blobs) = self.get_iroh_blobs().await {
Ok(blobs.route_id_blob().await)
} else {
Err(anyhow!("Veilid not initialized"))
}
}

pub async fn join_group(&self, keys: CommonKeypair) -> Result<Box<Group>> {
let mut inner = self.inner.lock().await;

Expand Down
38 changes: 38 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ mod tests {
use anyhow::Result;
use bytes::Bytes;
use common::init_veilid;
use rpc::RpcClient;
use rpc::RpcService;
use std::path::Path;
use std::result;
Expand Down Expand Up @@ -1351,4 +1352,41 @@ mod tests {
backend.stop().await.expect("Unable to stop backend");
Ok(())
}

#[tokio::test]
#[serial]
async fn test_rpc_client() -> Result<()> {
// Setup temporary directory and initialize the backend
let path = TmpDir::new("test_backend_collection_hash_consistency")
.await
.unwrap();
fs::create_dir_all(path.as_ref())
.await
.expect("Failed to create base directory");

let (veilid2, _) = init_veilid(
&path.to_path_buf().join("client"),
"save-dweb-backup".to_string(),
)
.await?;

let mut backend = Backend::new(path.as_ref()).expect("Unable to create Backend");
backend.start().await.expect("Unable to start");

let rpc_instance = RpcService::from_backend(&backend).await?;

let url = rpc_instance.get_descriptor_url();

tokio::time::sleep(Duration::from_secs(2)).await;

let client = RpcClient::from_veilid(veilid2.clone(), &url).await?;

let list = client.list_groups().await?;

assert_eq!(list.group_ids.len(), 0, "No groups on init");

backend.stop().await.expect("Unable to stop backend");
veilid2.shutdown().await;
Ok(())
}
}
59 changes: 29 additions & 30 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use crate::backend::Backend;
use crate::rpc::{RpcService, RpcClient};
use crate::rpc::{JoinGroupRequest, RemoveGroupRequest};
use crate::common::{CommonKeypair, DHTEntity, init_veilid};
use crate::common::{init_veilid, CommonKeypair, DHTEntity};
use crate::constants::{UNABLE_TO_GET_GROUP_NAME, UNABLE_TO_SET_GROUP_NAME};
use crate::group::Group;
use crate::repo::Repo;
use crate::rpc::{JoinGroupRequest, RemoveGroupRequest};
use crate::rpc::{RpcClient, RpcService};
use anyhow::{anyhow, Result};
use clap::{Arg, Command, ArgAction, Subcommand};
use clap::{Arg, ArgAction, Command, Subcommand};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::Mutex;
use std::sync::Arc;
use xdg::BaseDirectories;
use tracing::error;
use xdg::BaseDirectories;

mod backend;
mod common;
Expand All @@ -38,11 +38,11 @@ async fn setup_rpc_client(
base_dir: &std::path::Path,
backend_url: &str,
) -> anyhow::Result<RpcClient> {
let (veilid_api, _update_rx) = init_veilid(base_dir, "save-dweb-backup".to_string()).await?;
let (veilid_api, _update_rx) =
init_veilid(base_dir, "save-dweb-backup-client".to_string()).await?;
RpcClient::from_veilid(veilid_api, backend_url).await
}


#[tokio::main]
async fn main() -> anyhow::Result<()> {
let matches = Command::new("Save DWeb Backend")
Expand All @@ -54,24 +54,20 @@ async fn main() -> anyhow::Result<()> {
.global(true),
)
.subcommand(
Command::new("join")
.about("Join a group")
.arg(
Arg::new("group_url")
.long("group-url")
.help("URL of the group to join")
.required(true),
),
Command::new("join").about("Join a group").arg(
Arg::new("group_url")
.long("group-url")
.help("URL of the group to join")
.required(true),
),
)
.subcommand(
Command::new("remove")
.about("Remove a group")
.arg(
Arg::new("group_id")
.long("group-id")
.help("ID of the group to remove")
.required(true),
),
Command::new("remove").about("Remove a group").arg(
Arg::new("group_id")
.long("group-id")
.help("ID of the group to remove")
.required(true),
),
)
.subcommand(Command::new("list").about("List known groups"))
.subcommand(Command::new("start").about("Start the RPC service and log the URL"))
Expand All @@ -93,20 +89,20 @@ async fn main() -> anyhow::Result<()> {
let backend_url = matches.get_one::<String>("backend_url").ok_or_else(|| {
anyhow!("Error: --backend-url is required for the 'join' command")
})?;

let group_url = sub_matches.get_one::<String>("group_url").unwrap();
println!("Joining group: {}", group_url);

let rpc_client = setup_rpc_client(&base_dir, backend_url).await?;

rpc_client.join_group(group_url.to_string()).await?;
println!("Successfully joined group.");
}
Some(("list", _)) => {
let backend_url = matches.get_one::<String>("backend_url").ok_or_else(|| {
anyhow!("Error: --backend-url is required for the 'list' command")
})?;

println!("Listing all groups...");

let rpc_client = setup_rpc_client(&base_dir, backend_url).await?;
Expand All @@ -120,7 +116,7 @@ async fn main() -> anyhow::Result<()> {
let backend_url = matches.get_one::<String>("backend_url").ok_or_else(|| {
anyhow!("Error: --backend-url is required for the 'remove' command")
})?;

let group_id = sub_matches.get_one::<String>("group_id").unwrap();
println!("Removing group: {}", group_id);

Expand All @@ -132,7 +128,10 @@ async fn main() -> anyhow::Result<()> {
Some(("start", _)) => {
backend.start().await?;
let rpc_service = RpcService::from_backend(&backend).await?;
println!("RPC service started at URL: {}", rpc_service.get_descriptor_url());
println!(
"RPC service started at URL: {}",
rpc_service.get_descriptor_url()
);
rpc_service.start_update_listener().await?;
}
_ => {
Expand All @@ -143,4 +142,4 @@ async fn main() -> anyhow::Result<()> {
}
}
Ok(())
}
}
53 changes: 30 additions & 23 deletions src/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::backend::{Backend, crypto_key_from_query};
use crate::common::{DHTEntity};
use crate::backend::{crypto_key_from_query, Backend};
use crate::common::DHTEntity;
use crate::group::Group;
use crate::repo::{Repo, ROUTE_SUBKEY};
use crate::repo::Repo;
use crate::{
constants::ROUTE_ID_DHT_KEY,
group::{PROTOCOL_SCHEME, URL_DHT_KEY, URL_ENCRYPTION_KEY},
Expand All @@ -23,8 +23,8 @@ use tracing::{error, info};
use url::Url;
use veilid_core::{
vld0_generate_keypair, CryptoKey, CryptoSystem, CryptoSystemVLD0, DHTRecordDescriptor,
DHTSchema, RoutingContext, SharedSecret, Target, VeilidAPI, VeilidAppCall, VeilidUpdate,
CRYPTO_KIND_VLD0, KeyPair, TypedKey,
DHTSchema, KeyPair, RoutingContext, SharedSecret, Target, TypedKey, VeilidAPI, VeilidAppCall,
VeilidUpdate, CRYPTO_KIND_VLD0,
};
use veilid_iroh_blobs::tunnels::OnNewRouteCallback;

Expand All @@ -33,6 +33,8 @@ const MESSAGE_TYPE_LIST_GROUPS: u8 = 0x01;
const MESSAGE_TYPE_REMOVE_GROUP: u8 = 0x02;
const MESSAGE_TYPE_ERROR: u8 = 0xFF;

const ROUTE_SUBKEY: u32 = 1;

#[repr(u8)]
#[derive(Serialize, Deserialize)]
enum MessageType {
Expand Down Expand Up @@ -102,7 +104,6 @@ pub fn parse_url_for_rpc(url_string: &str) -> Result<RpcKeys> {
})
}


impl RpcClient {
pub async fn from_veilid(veilid: VeilidAPI, url: &str) -> Result<Self> {
let routing_context = veilid.routing_context()?;
Expand Down Expand Up @@ -157,19 +158,21 @@ impl RpcClient {

pub async fn join_group(&self, group_url: String) -> Result<JoinGroupResponse> {
let request = JoinGroupRequest { group_url };
self.send_rpc_request(&request, MESSAGE_TYPE_JOIN_GROUP).await
self.send_rpc_request(&request, MESSAGE_TYPE_JOIN_GROUP)
.await
}

pub async fn list_groups(&self) -> Result<ListGroupsResponse> {
let request = ListGroupsRequest;
self.send_rpc_request(&request, MESSAGE_TYPE_LIST_GROUPS).await
self.send_rpc_request(&request, MESSAGE_TYPE_LIST_GROUPS)
.await
}

pub async fn remove_group(&self, group_id: String) -> Result<RemoveGroupResponse> {
let request = RemoveGroupRequest { group_id };
self.send_rpc_request(&request, MESSAGE_TYPE_REMOVE_GROUP).await
self.send_rpc_request(&request, MESSAGE_TYPE_REMOVE_GROUP)
.await
}

}

#[derive(Clone)]
Expand Down Expand Up @@ -302,8 +305,8 @@ impl RpcService {
});
});

// Log the descriptor URL
let descriptor_url = descriptor.get_url();
let route_id_blob = backend.get_route_id_blob().await?;
descriptor.update_route_on_dht(route_id_blob).await?;

Ok(RpcService {
backend,
Expand Down Expand Up @@ -333,19 +336,23 @@ impl RpcService {

if let Err(e) = self.handle_app_call(*app_call).await {
error!("Error processing AppCall: {}", e);

// Wrap the error in RpcResponse and send it
let error_response: RpcResponse<()> = RpcResponse {
success: None,
error: Some(e.to_string()),
};
if let Err(err) = self
.send_response(app_call_clone.id().into(), MESSAGE_TYPE_ERROR, &error_response)
.send_response(
app_call_clone.id().into(),
MESSAGE_TYPE_ERROR,
&error_response,
)
.await
{
error!("Failed to send error response: {}", err);
}
}
}
}
}
Err(RecvError::Lagged(count)) => {
Expand All @@ -360,11 +367,11 @@ impl RpcService {

Ok(())
}

async fn handle_app_call(&self, app_call: VeilidAppCall) -> Result<()> {
let call_id = app_call.id();
let message = app_call.message();

if message.is_empty() {
let error_response: RpcResponse<()> = RpcResponse {
success: None,
Expand All @@ -374,10 +381,10 @@ impl RpcService {
.await?;
return Err(anyhow!("Empty message"));
}

let message_type_byte = message[0];
let payload = &message[1..];

match message_type_byte {
MESSAGE_TYPE_JOIN_GROUP => {
let request: JoinGroupRequest = serde_cbor::from_slice(payload)?;
Expand Down Expand Up @@ -406,10 +413,10 @@ impl RpcService {
.await?;
}
}

Ok(())
}

async fn send_response<T: Serialize>(
&self,
call_id: u64,
Expand Down Expand Up @@ -634,7 +641,7 @@ impl DHTEntity for RpcServiceDescriptor {
}

fn get_id(&self) -> CryptoKey {
self.keypair.dht_key.clone()
self.dht_record.key().value.clone()
}

fn get_secret_key(&self) -> Option<CryptoKey> {
Expand Down

0 comments on commit 2929b20

Please sign in to comment.