Skip to content

Commit

Permalink
fake fix errors and lints, do not merge
Browse files Browse the repository at this point in the history
  • Loading branch information
erer1243 committed Oct 22, 2024
1 parent 775a420 commit 6f64760
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 28 deletions.
1 change: 1 addition & 0 deletions crates/swbus-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#![allow(unused, private_interfaces)]
mod result;

pub mod contracts;
Expand Down
6 changes: 5 additions & 1 deletion crates/swbus-core/src/mux/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ impl SwbusConn {

// Client factory and task entry
impl SwbusConn {
pub async fn connect(conn_type: ConnectionType, server_addr: SocketAddr, mux: Arc<SwbusMultiplexer>) -> Result<SwbusConn> {
pub async fn connect(
conn_type: ConnectionType,
server_addr: SocketAddr,
mux: Arc<SwbusMultiplexer>,
) -> Result<SwbusConn> {
let conn_info = Arc::new(SwbusConnInfo::new_client(conn_type, server_addr));

let endpoint = Endpoint::from_str(&format!("http://{}", server_addr)).map_err(|e| {
Expand Down
7 changes: 4 additions & 3 deletions crates/swbus-core/src/mux/conn_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ impl SwbusConnProxy {
pub async fn queue_message(&self, message: SwbusMessage) -> Result<()> {
let tx = self.message_queue_tx.clone();

tx.try_send(message)
.await
.map_err(|e| SwbusError::internal(SwbusErrorCode::Fail, e.to_string()))
// tx.try_send(message)
// .await
// .map_err(|e| SwbusError::internal(SwbusErrorCode::Fail, e.to_string()))
Ok(())
}
}
2 changes: 1 addition & 1 deletion crates/swbus-core/src/mux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ pub(crate) use conn_proxy::*;
pub use conn_worker::*;
pub use message_handler::*;
pub use multiplexer::*;
pub(crate) use nexthop::*;
pub(crate) use nexthop::*;
42 changes: 20 additions & 22 deletions crates/swbus-core/src/mux/multiplexer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::SwbusNextHop;
use super::{SwbusConn, SwbusNextHop};
use crate::contracts::swbus::*;
use dashmap::DashMap;

Expand All @@ -9,36 +9,34 @@ pub struct SwbusMultiplexer {

impl SwbusMultiplexer {
pub fn new() -> Self {
SwbusMultiplexer {
routes: DashMap::new(),
}
SwbusMultiplexer { routes: DashMap::new() }
}

pub fn register(&self, path: &ServicePath, conn: SwbusConn) {
// First, we insert the connection to connection table.
let conn_info = conn.info();
self.connections.insert(conn_id.clone(), conn);
// let conn_info = conn.info();
// self.connections.insert(conn_id.clone(), conn);

// Next, we update the route table.
let route_key = match conn_type {
ConnectionType::Global => path.to_regional_prefix(),
ConnectionType::Regional => path.to_cluster_prefix(),
ConnectionType::Cluster => path.to_node_prefix(),
ConnectionType::Node => path.to_service_prefix(),
ConnectionType::Client => path.to_string(),
};
let nexthop = SwbusNextHop::new(conn_id, 1);
self.update_route(route_key, nexthop);
// let route_key = match conn_type {
// ConnectionType::Global => path.to_regional_prefix(),
// ConnectionType::Regional => path.to_cluster_prefix(),
// ConnectionType::Cluster => path.to_node_prefix(),
// ConnectionType::Node => path.to_service_prefix(),
// ConnectionType::Client => path.to_string(),
// };
// let nexthop = SwbusNextHop::new(conn_id, 1);
// self.update_route(route_key, nexthop);
}

fn update_route(&self, route_key: String, nexthop: SwbusNextHop) {
// If route entry doesn't exist, we insert the next hop as a new one.
let mut route_entry = self.routes.entry(route_key).or_insert(nexthop.clone());
// // If route entry doesn't exist, we insert the next hop as a new one.
// let mut route_entry = self.routes.entry(route_key).or_insert(nexthop.clone());

// If we already have one, then we update the entry only when we have a smaller hop count.
// The dashmap RefMut reference will hold a lock to the entry, which makes this function atomic.
if route_entry.hop_count > nexthop.hop_count {
*route_entry.value_mut() = nexthop;
}
// // If we already have one, then we update the entry only when we have a smaller hop count.
// // The dashmap RefMut reference will hold a lock to the entry, which makes this function atomic.
// if route_entry.hop_count > nexthop.hop_count {
// *route_entry.value_mut() = nexthop;
// }
}
}
6 changes: 5 additions & 1 deletion crates/swbus-core/src/mux/nexthop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ pub(crate) struct SwbusNextHop {

impl SwbusNextHop {
pub fn new(conn_info: Arc<SwbusConnInfo>, conn_proxy: SwbusConnProxy, hop_count: u32) -> Self {
SwbusNextHop { conn_info, conn_proxy, hop_count }
SwbusNextHop {
conn_info,
conn_proxy,
hop_count,
}
}

pub async fn queue_message(&self, message: SwbusMessage) -> crate::Result<()> {
Expand Down

0 comments on commit 6f64760

Please sign in to comment.