Skip to content

Commit

Permalink
MessageBus now aware of connections closing and can close itself
Browse files Browse the repository at this point in the history
  • Loading branch information
bbmena committed Oct 5, 2023
1 parent 2befc3f commit ab615d5
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 63 deletions.
59 changes: 43 additions & 16 deletions common/connection/src/connection_manager.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::message_bus::{retrieve_command_channel, MessageBus, MessageBusHandle};
use crate::messages::{
AddConnection, ConnectionNotification, MessageBusCommand, NodeManagerCommand,
ConnectionManagerCommand, ConnectionNotification, MessageBusCommand, MessageBusConnection,
};
use bytes::BytesMut;
use dashmap::DashMap;
Expand All @@ -10,17 +10,17 @@ use std::{
};
use tachyonix::{channel, Receiver, Sender};
use tokio::net::{TcpListener, TcpStream};
use util::map_access_wrapper::{arc_map_contains_key, arc_map_insert};
use util::map_access_wrapper::{arc_map_contains_key, arc_map_insert, arc_map_remove};

pub struct ConnectionManager {
command_channel: Receiver<NodeManagerCommand>,
command_channel: Receiver<ConnectionManagerCommand>,
address: SocketAddr,
node_map: Arc<DashMap<IpAddr, MessageBusHandle>>,
notifier: Sender<ConnectionNotification>,
}

pub struct ConnectionManagerHandle {
pub command_channel: Sender<NodeManagerCommand>,
pub command_channel: Sender<ConnectionManagerCommand>,
pub notification_receiver: Receiver<ConnectionNotification>,
}

Expand All @@ -40,8 +40,8 @@ impl ConnectionManagerHandle {
notifier,
};

let connection_manager_task =
tokio::spawn(async move { connection_manager.start(output_channel).await });
let sender_clone = command_sender.clone();
tokio::spawn(async move { connection_manager.start(output_channel, sender_clone).await });

ConnectionManagerHandle {
command_channel: command_sender,
Expand All @@ -51,18 +51,24 @@ impl ConnectionManagerHandle {
}

impl ConnectionManager {
pub async fn start(mut self, output_channel: Sender<BytesMut>) {
pub async fn start(
mut self,
output_channel: Sender<BytesMut>,
self_sender: Sender<ConnectionManagerCommand>,
) {
let data_listener = TcpListener::bind(self.address).await.unwrap();
let node_map = self.node_map.clone();
let output_clone = output_channel.clone();
let sender_clone = self_sender.clone();
let listener = tokio::spawn(async move {
loop {
match data_listener.accept().await {
Ok((stream, address)) => {
println!("New connection request from {}", &address);
if !arc_map_contains_key(node_map.clone(), &address.ip()) {
let out = output_clone.clone();
let handle = MessageBusHandle::new(out);
let handle =
MessageBusHandle::new(out, address.clone(), sender_clone.clone());

arc_map_insert(node_map.clone(), address.ip(), handle);

Expand All @@ -78,7 +84,7 @@ impl ConnectionManager {
}
Some(command_channel) => {
command_channel
.send(MessageBusCommand::AddConnection(AddConnection {
.send(MessageBusCommand::AddConnection(MessageBusConnection {
address,
stream,
}))
Expand All @@ -96,19 +102,23 @@ impl ConnectionManager {
match self.command_channel.recv().await {
Ok(command) => {
match command {
NodeManagerCommand::Shutdown() => {
ConnectionManagerCommand::Shutdown() => {
// TODO: graceful shutdown
listener.abort();
break;
}
NodeManagerCommand::Connect(connect) => {
ConnectionManagerCommand::Connect(connect) => {
println!("Attempting to connect to {}", &connect.address);
let stream = TcpStream::connect(connect.address)
.await
.expect("Unable to connect!");
if !arc_map_contains_key(self.node_map.clone(), &connect.address.ip()) {
let out = out_put_clone.clone();
let handle = MessageBusHandle::new(out);
let handle = MessageBusHandle::new(
out,
connect.address.clone(),
self_sender.clone(),
);

arc_map_insert(self.node_map.clone(), self.address.ip(), handle);
}
Expand All @@ -122,15 +132,32 @@ impl ConnectionManager {
}
Some(command_channel) => {
command_channel
.send(MessageBusCommand::AddConnection(AddConnection {
address: connect.address,
stream,
}))
.send(MessageBusCommand::AddConnection(
MessageBusConnection {
address: connect.address,
stream,
},
))
.await
.expect("Unable to send AddConnection command!");
}
}
}
ConnectionManagerCommand::RemoveConnection(remove_connection) => {
match arc_map_remove(
self.node_map.clone(),
&remove_connection.address.ip(),
) {
None => {}
Some((_, handle)) => {
handle.message_bus_task.abort();
println!(
"Connection removed at address: {}",
remove_connection.address
)
}
}
}
}
}
Err(_) => break,
Expand Down
Loading

0 comments on commit ab615d5

Please sign in to comment.