Skip to content

Commit

Permalink
Merge pull request #10 from pupbrained/pupbrained/pyro-100-moving-off…
Browse files Browse the repository at this point in the history
…-of-actix-web

PYRO-100: Moving away from actix_web, introducing peom
  • Loading branch information
fetchfern authored Apr 23, 2024
2 parents 91a8581 + 1a573d4 commit 69e2ec9
Show file tree
Hide file tree
Showing 15 changed files with 374 additions and 924 deletions.
6 changes: 3 additions & 3 deletions crates/alerion_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ thiserror = "1.0.58"
uuid = { version = "1.8.0", features = ["serde"] }
bytestring = "1.3.1"
jsonwebtoken = "9.3.0"
actix = "0.13.3"
actix-web-actors = "4.3.0"
actix-web = "4.5.1"
tracing = { version = "0.1.40", features = ["log"] }
pin-project-lite = "0.2.14"
reqwest = { version = "0.12.3" }
smallvec = { version = "1.13.2", features = ["serde"] }
directories = "5.0.1"
bollard = "0.16.1"
bitflags = "2.5.0"
num_cpus = "1.16.0"
sysinfo = "0.30.11"
poem = { version = "3.0.0", features = ["websocket"] }
13 changes: 3 additions & 10 deletions crates/alerion_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,8 @@ pub mod servers;
pub mod webserver;
pub mod websocket;

use std::sync::Arc;

use config::AlerionConfig;
use futures::stream::{FuturesUnordered, StreamExt};
use servers::ServerPool;
use webserver::Webserver;

use crate::filesystem::setup_directories;

Expand All @@ -26,18 +22,15 @@ pub async fn alerion_main() -> anyhow::Result<()> {
let project_dirs = setup_directories().await?;
let config = AlerionConfig::load(&project_dirs)?;

let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build());
//let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build());

//server_pool.create_server("0e4059ca-d79b-46a5-8ec4-95bd0736d150".try_into().unwrap()).await;

// there is a low likelyhood this will actually block, and if it does
// it will block only once for a short amount of time, so it's no big deal.
let webserver = Webserver::make(config, Arc::clone(&server_pool))?;
let webserver = webserver::serve(config);

let webserver_handle = tokio::spawn(async move {
let _result = webserver.serve().await;
// handle recovery
});
let webserver_handle = tokio::spawn(webserver);

let mut handles = FuturesUnordered::new();
handles.push(webserver_handle);
Expand Down
88 changes: 1 addition & 87 deletions crates/alerion_core/src/servers.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Instant;

use actix_web::HttpResponse;
use alerion_datamodel::remote::server::{ContainerConfig, ServerSettings};
use alerion_datamodel::websocket::{NetworkStatistics, PerformanceStatisics, ServerStatus};
use bollard::container::{Config, CreateContainerOptions};
use bollard::Docker;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use uuid::Uuid;

use crate::config::AlerionConfig;
use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage};
use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection};

pub struct ServerPoolBuilder {
servers: HashMap<Uuid, Arc<Server>>,
Expand Down Expand Up @@ -157,8 +152,6 @@ pub struct Server {
uuid: Uuid,
container_name: String,
websocket_id_counter: AtomicU32,
websockets: RwLock<HashMap<u32, ClientConnection>>,
sender_copy: Sender<(u32, PanelMessage)>,
server_info: ServerInfo,
remote_api: Arc<remote::RemoteClient>,
docker: Arc<Docker>,
Expand All @@ -173,23 +166,17 @@ impl Server {
docker: Arc<Docker>,
) -> Result<Arc<Self>, ServerError> {
tracing::debug!("Creating new server {uuid}");
let (send, recv) = channel(128);

let server = Arc::new(Self {
start_time: Instant::now(),
uuid,
container_name: format!("{}_container", uuid.as_hyphenated()),
websocket_id_counter: AtomicU32::new(0),
websockets: RwLock::new(HashMap::new()),
sender_copy: send,
server_info,
remote_api,
docker,
});

tokio::spawn(task_websocket_receiver(recv));
tokio::spawn(monitor_performance_metrics(Arc::clone(&server)));

server.create_docker_container().await?;

tracing::info!("Server {uuid} created");
Expand Down Expand Up @@ -222,82 +209,9 @@ impl Server {
Ok(())
}

pub async fn setup_new_websocket<F>(
&self,
start_websocket: F,
) -> actix_web::Result<HttpResponse>
where
F: FnOnce(ServerConnection) -> actix_web::Result<(ConnectionAddr, HttpResponse)>,
{
tracing::info!("Setting up new websocket connection");

let id = self.websocket_id_counter.fetch_add(1, Ordering::SeqCst);

// setup the request channel for the websocket
let auth_tracker = Arc::new(AuthTracker::new(self.server_time()));
let sender = self.sender_copy.clone();
let server_conn = ServerConnection::new(Arc::clone(&auth_tracker), sender, id);

// setup a websocket connection through the user-provided closure
let (addr, response) = start_websocket(server_conn)?;

// add the obtained reply channel to the list of websocket connections
let client_conn = ClientConnection::new(auth_tracker, addr);
let mut websockets = self.websockets.write().await;
websockets.insert(id, client_conn);

// give back the HTTP 101 response
Ok(response)
}

pub async fn send_to_available_websockets(&self, msg: ServerMessage) {
tracing::info!("Sending message to all available websockets");
tracing::debug!("message: {:?}", msg);

let lock = self.websockets.read().await;

for sender in lock.values() {
sender.send_if_authenticated(|| msg.clone());
}
}

pub fn server_time(&self) -> u64 {
self.start_time.elapsed().as_millis() as u64
}
}

#[tracing::instrument(skip(server))]
async fn monitor_performance_metrics(server: Arc<Server>) {
tracing::info!("Starting performance metrics monitor for {}", &server.uuid);

loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

let stats = PerformanceStatisics {
memory_bytes: server.server_time() as usize,
memory_limit_bytes: 1024usize.pow(3) * 8,
cpu_absolute: 50.11,
network: NetworkStatistics {
rx_bytes: 1024,
tx_bytes: 800,
},
uptime: 5000 + server.server_time(),
state: ServerStatus::Running,
disk_bytes: 100,
};

server
.send_to_available_websockets(ServerMessage::Stats(stats))
.await;
}
}

async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) {
loop {
if let Some(msg) = receiver.recv().await {
tracing::debug!("Server received websocket message: {msg:?}");
}
}
}

pub mod remote;
Loading

0 comments on commit 69e2ec9

Please sign in to comment.