Skip to content
This repository has been archived by the owner on May 16, 2024. It is now read-only.

Commit

Permalink
feat(cluster): aggregating cluster vnodes by nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
yahortsaryk committed Aug 10, 2023
1 parent a13fe8a commit d6682bb
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 39 deletions.
9 changes: 8 additions & 1 deletion bucket/ddc_bucket/cluster/entity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ impl ink_storage::traits::PackedAllocate for Cluster {
pub struct ClusterInfo {
pub cluster_id: ClusterId,
pub cluster: Cluster,
pub cluster_v_nodes: Vec<VNodeToken>,
pub cluster_v_nodes: Vec<NodeVNodesInfo>,
}

#[derive(Clone, PartialEq, Encode, Decode)]
#[cfg_attr(feature = "std", derive(Debug, scale_info::TypeInfo))]
pub struct NodeVNodesInfo {
pub node_key: NodeKey,
pub v_nodes: Vec<VNodeToken>,
}

pub const CLUSTER_PARAMS_MAX_LEN: usize = 100_000;
Expand Down
17 changes: 14 additions & 3 deletions bucket/ddc_bucket/cluster/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ink_prelude::vec::Vec;
use crate::ddc_bucket::bucket::entity::BucketId;
use crate::ddc_bucket::cash::{Cash, Payable};
use crate::ddc_bucket::cdn_node::entity::{CdnNode, CdnNodeKey};
use crate::ddc_bucket::cluster::entity::{ClusterInfo, KB_PER_GB};
use crate::ddc_bucket::cluster::entity::{ClusterInfo, NodeVNodesInfo, KB_PER_GB};
use crate::ddc_bucket::node::entity::{Node, NodeKey, Resource};
use crate::ddc_bucket::perm::entity::Permission;
use crate::ddc_bucket::topology::store::VNodeToken;
Expand Down Expand Up @@ -412,7 +412,13 @@ impl DdcBucket {

pub fn message_cluster_get(&self, cluster_id: ClusterId) -> Result<ClusterInfo> {
let cluster = self.clusters.get(cluster_id)?;
let cluster_v_nodes = self.topology.get_v_nodes_by_cluster(cluster_id);

let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
for node_key in cluster.nodes_keys.clone() {
let v_nodes = self.topology.get_v_nodes_by_node(node_key.clone());
let v_nodes_info = NodeVNodesInfo { node_key, v_nodes };
cluster_v_nodes.push(v_nodes_info)
}

Ok(ClusterInfo {
cluster_id,
Expand Down Expand Up @@ -440,7 +446,12 @@ impl DdcBucket {
}
}

let cluster_v_nodes = self.topology.get_v_nodes_by_cluster(cluster_id);
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
for node_key in cluster.nodes_keys.clone() {
let v_nodes = self.topology.get_v_nodes_by_node(node_key.clone());
let v_nodes_info = NodeVNodesInfo { node_key, v_nodes };
cluster_v_nodes.push(v_nodes_info)
}

// Include the complete status of matched items.
let cluster_info = ClusterInfo {
Expand Down
138 changes: 103 additions & 35 deletions bucket/ddc_bucket/tests/test_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ fn cluster_create_ok() {
cluster_id: Some(ctx.cluster_id),
status_in_cluster: Some(NodeStatusInCluster::ADDING),
},
v_nodes: v_nodes0
v_nodes: v_nodes0.clone()
}
);

Expand All @@ -80,7 +80,7 @@ fn cluster_create_ok() {
cluster_id: Some(ctx.cluster_id),
status_in_cluster: Some(NodeStatusInCluster::ADDING),
},
v_nodes: v_nodes1
v_nodes: v_nodes1.clone()
}
);

Expand All @@ -103,7 +103,7 @@ fn cluster_create_ok() {
cluster_id: Some(ctx.cluster_id),
status_in_cluster: Some(NodeStatusInCluster::ADDING),
},
v_nodes: v_nodes2
v_nodes: v_nodes2.clone()
}
);

Expand Down Expand Up @@ -160,7 +160,26 @@ fn cluster_create_ok() {
// Check the cluster

let cluster = ctx.contract.cluster_get(ctx.cluster_id)?;
let cluster_v_nodes = ctx.contract.get_v_nodes_by_cluster(ctx.cluster_id);
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();

let node_v_nodes_0 = NodeVNodesInfo {
node_key: ctx.node_key0,
v_nodes: v_nodes0,
};
cluster_v_nodes.push(node_v_nodes_0);

let node_v_nodes_1 = NodeVNodesInfo {
node_key: ctx.node_key1,
v_nodes: v_nodes1,
};
cluster_v_nodes.push(node_v_nodes_1);

let node_v_nodes_2 = NodeVNodesInfo {
node_key: ctx.node_key2,
v_nodes: v_nodes2,
};
cluster_v_nodes.push(node_v_nodes_2);

let total_rent = ctx.rent_v_node_per_month0 * v_nodes0_len as Balance
+ ctx.rent_v_node_per_month1 * v_nodes1_len as Balance
+ ctx.rent_v_node_per_month2 * v_nodes2_len as Balance;
Expand Down Expand Up @@ -490,8 +509,7 @@ fn cluster_add_node_ok() {
new_v_nodes.clone(),
];

let mut cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
cluster_info.cluster_v_nodes.sort();
let cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
assert!(matches!(cluster_info.cluster.nodes_keys, _nodes_keys));
assert!(matches!(cluster_info.cluster_v_nodes, _cluster_v_nodes));

Expand Down Expand Up @@ -579,8 +597,7 @@ fn cluster_remove_node_ok_if_node_provider() {

let _cluster_v_nodes = vec![ctx.v_nodes0.clone(), ctx.v_nodes2];

let mut cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
cluster_info.cluster_v_nodes.sort();
let cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
assert!(matches!(cluster_info.cluster.nodes_keys, _nodes_keys));
assert!(matches!(cluster_info.cluster_v_nodes, _cluster_v_nodes));

Expand Down Expand Up @@ -621,8 +638,7 @@ fn cluster_remove_node_ok_if_cluster_manager() {

let _cluster_v_nodes = vec![ctx.v_nodes0, ctx.v_nodes1];

let mut cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
cluster_info.cluster_v_nodes.sort();
let cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
assert!(matches!(cluster_info.cluster.nodes_keys, _nodes_keys));
assert!(matches!(cluster_info.cluster_v_nodes, _cluster_v_nodes));

Expand Down Expand Up @@ -1127,15 +1143,27 @@ fn cluster_replace_node_ok() {
}
));

let mut cluster_v_nodes = Vec::<VNodeToken>::new();
cluster_v_nodes.extend(vec![2]);
cluster_v_nodes.extend(ctx.v_nodes1.clone());
cluster_v_nodes.extend(ctx.v_nodes2.clone());
cluster_v_nodes.extend(v_nodes_to_reasign.clone());
cluster_v_nodes.sort();
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
let node_v_nodes_0 = NodeVNodesInfo {
node_key: ctx.node_key0,
v_nodes: vec![2],
};
cluster_v_nodes.push(node_v_nodes_0);

let node_v_nodes_1 = NodeVNodesInfo {
node_key: ctx.node_key1,
v_nodes: ctx.v_nodes1.clone(),
};
cluster_v_nodes.push(node_v_nodes_1);

let mut node_v_nodes_2 = NodeVNodesInfo {
node_key: ctx.node_key2,
v_nodes: ctx.v_nodes2.clone(),
};
node_v_nodes_2.v_nodes.extend(v_nodes_to_reasign.clone());
cluster_v_nodes.push(node_v_nodes_2);

let mut cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
cluster_info.cluster_v_nodes.sort();
let cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
assert_eq!(
&cluster_info.cluster_v_nodes, &cluster_v_nodes,
"a v_node must be replaced"
Expand Down Expand Up @@ -1318,14 +1346,26 @@ fn cluster_reset_node_ok() {
}
));

let mut cluster_v_nodes = Vec::<VNodeToken>::new();
cluster_v_nodes.extend(ctx.v_nodes0.clone());
cluster_v_nodes.extend(vec![10, 11, 12]);
cluster_v_nodes.extend(ctx.v_nodes2.clone());
cluster_v_nodes.sort();
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
let node_v_nodes_0 = NodeVNodesInfo {
node_key: ctx.node_key0,
v_nodes: ctx.v_nodes0.clone(),
};
cluster_v_nodes.push(node_v_nodes_0);

let node_v_nodes_1 = NodeVNodesInfo {
node_key: ctx.node_key1,
v_nodes: vec![10, 11, 12],
};
cluster_v_nodes.push(node_v_nodes_1);

let node_v_nodes_2 = NodeVNodesInfo {
node_key: ctx.node_key2,
v_nodes: ctx.v_nodes2.clone(),
};
cluster_v_nodes.push(node_v_nodes_2);

let mut cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
cluster_info.cluster_v_nodes.sort();
let cluster_info = ctx.contract.cluster_get(ctx.cluster_id)?;
assert_eq!(
&cluster_info.cluster_v_nodes, &cluster_v_nodes,
"a v_node must be replaced"
Expand Down Expand Up @@ -1686,10 +1726,24 @@ fn cluster_get_err_if_cluster_does_not_exist() {
fn cluster_get_ok() {
let ctx = setup_cluster();

let mut cluster_v_nodes1 = Vec::<VNodeToken>::new();
cluster_v_nodes1.extend(ctx.v_nodes0.clone());
cluster_v_nodes1.extend(ctx.v_nodes1.clone());
cluster_v_nodes1.extend(ctx.v_nodes2.clone());
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
let node_v_nodes_0 = NodeVNodesInfo {
node_key: ctx.node_key0,
v_nodes: ctx.v_nodes0.clone(),
};
cluster_v_nodes.push(node_v_nodes_0);

let node_v_nodes_1 = NodeVNodesInfo {
node_key: ctx.node_key1,
v_nodes: ctx.v_nodes1.clone(),
};
cluster_v_nodes.push(node_v_nodes_1);

let node_v_nodes_2 = NodeVNodesInfo {
node_key: ctx.node_key2,
v_nodes: ctx.v_nodes2.clone(),
};
cluster_v_nodes.push(node_v_nodes_2);

let total_rent = ctx.rent_v_node_per_month0 * ctx.v_nodes0.len() as Balance
+ ctx.rent_v_node_per_month1 * ctx.v_nodes1.len() as Balance
Expand All @@ -1712,7 +1766,7 @@ fn cluster_get_ok() {
cdn_usd_per_gb: CDN_USD_PER_GB,
cdn_revenues: Cash(0),
},
cluster_v_nodes: cluster_v_nodes1,
cluster_v_nodes,
}
})
);
Expand All @@ -1722,10 +1776,24 @@ fn cluster_get_ok() {
fn cluster_list_ok() {
let mut ctx = setup_cluster();

let mut cluster_v_nodes1 = Vec::<VNodeToken>::new();
cluster_v_nodes1.extend(ctx.v_nodes0.clone());
cluster_v_nodes1.extend(ctx.v_nodes1.clone());
cluster_v_nodes1.extend(ctx.v_nodes2.clone());
let mut cluster_v_nodes: Vec<NodeVNodesInfo> = Vec::new();
let node_v_nodes_0 = NodeVNodesInfo {
node_key: ctx.node_key0,
v_nodes: ctx.v_nodes0.clone(),
};
cluster_v_nodes.push(node_v_nodes_0);

let node_v_nodes_1 = NodeVNodesInfo {
node_key: ctx.node_key1,
v_nodes: ctx.v_nodes1.clone(),
};
cluster_v_nodes.push(node_v_nodes_1);

let node_v_nodes_2 = NodeVNodesInfo {
node_key: ctx.node_key2,
v_nodes: ctx.v_nodes2.clone(),
};
cluster_v_nodes.push(node_v_nodes_2);

let total_rent = ctx.rent_v_node_per_month0 * ctx.v_nodes0.len() as Balance
+ ctx.rent_v_node_per_month1 * ctx.v_nodes1.len() as Balance
Expand All @@ -1745,7 +1813,7 @@ fn cluster_list_ok() {
cdn_usd_per_gb: CDN_USD_PER_GB,
cdn_revenues: Cash(0),
},
cluster_v_nodes: cluster_v_nodes1,
cluster_v_nodes,
};

let cluster_params2 = ClusterParams::from("{}");
Expand Down

0 comments on commit d6682bb

Please sign in to comment.