Skip to content

Commit

Permalink
initial work for switching to warp
Browse files Browse the repository at this point in the history
i'll fix this commit message later
  • Loading branch information
pupbrained committed Apr 21, 2024
1 parent 8d8c19a commit 64bc6f9
Show file tree
Hide file tree
Showing 14 changed files with 20 additions and 896 deletions.
2 changes: 1 addition & 1 deletion crates/alerion_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ bytestring = "1.3.1"
jsonwebtoken = "9.3.0"
actix = "0.13.3"
actix-web-actors = "4.3.0"
actix-web = "4.5.1"
log = "0.4.21"
pin-project-lite = "0.2.14"
reqwest = { version = "0.12.3" }
smallvec = { version = "1.13.2", features = ["serde"] }
directories = "5.0.1"
bollard = "0.16.1"
bitflags = "2.5.0"
warp = "0.3.7"
13 changes: 3 additions & 10 deletions crates/alerion_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,8 @@ pub mod servers;
pub mod webserver;
pub mod websocket;

use std::sync::Arc;

use config::AlerionConfig;
use futures::stream::{FuturesUnordered, StreamExt};
use servers::ServerPool;
use webserver::Webserver;

use crate::filesystem::setup_directories;

Expand All @@ -27,18 +23,15 @@ pub async fn alerion_main() -> anyhow::Result<()> {
let project_dirs = setup_directories().await?;
let config = AlerionConfig::load(&project_dirs)?;

let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build());
//let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build());

//server_pool.create_server("0e4059ca-d79b-46a5-8ec4-95bd0736d150".try_into().unwrap()).await;

// there is a low likelyhood this will actually block, and if it does
// it will block only once for a short amount of time, so it's no big deal.
let webserver = Webserver::make(config, Arc::clone(&server_pool))?;
let webserver = webserver::serve((config.api.host, config.api.port));

let webserver_handle = tokio::spawn(async move {
let _result = webserver.serve().await;
// handle recovery
});
let webserver_handle = tokio::spawn(webserver);

let mut handles = FuturesUnordered::new();
handles.push(webserver_handle);
Expand Down
81 changes: 1 addition & 80 deletions crates/alerion_core/src/servers.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
use std::collections::HashMap;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::atomic::AtomicU32;
use std::sync::Arc;
use std::time::Instant;

use actix_web::HttpResponse;
use alerion_datamodel::remote::server::{ContainerConfig, ServerSettings};
use alerion_datamodel::websocket::{NetworkStatistics, PerformanceStatisics, ServerStatus};
use bollard::container::{Config, CreateContainerOptions};
use bollard::Docker;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::RwLock;
use uuid::Uuid;

use crate::config::AlerionConfig;
use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage};
use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection};

pub struct ServerPoolBuilder {
servers: HashMap<Uuid, Arc<Server>>,
Expand Down Expand Up @@ -147,8 +142,6 @@ pub struct Server {
uuid: Uuid,
container_name: String,
websocket_id_counter: AtomicU32,
websockets: RwLock<HashMap<u32, ClientConnection>>,
sender_copy: Sender<(u32, PanelMessage)>,
server_info: ServerInfo,
remote_api: Arc<remote::RemoteClient>,
docker: Arc<Docker>,
Expand All @@ -161,23 +154,16 @@ impl Server {
remote_api: Arc<remote::RemoteClient>,
docker: Arc<Docker>,
) -> Result<Arc<Self>, ServerError> {
let (send, recv) = channel(128);

let server = Arc::new(Self {
start_time: Instant::now(),
uuid,
container_name: format!("{}_container", uuid.as_hyphenated()),
websocket_id_counter: AtomicU32::new(0),
websockets: RwLock::new(HashMap::new()),
sender_copy: send,
server_info,
remote_api,
docker,
});

tokio::spawn(task_websocket_receiver(recv));
tokio::spawn(monitor_performance_metrics(Arc::clone(&server)));

server.create_docker_container().await?;

Ok(server)
Expand Down Expand Up @@ -208,74 +194,9 @@ impl Server {
Ok(())
}

pub async fn setup_new_websocket<F>(
&self,
start_websocket: F,
) -> actix_web::Result<HttpResponse>
where
F: FnOnce(ServerConnection) -> actix_web::Result<(ConnectionAddr, HttpResponse)>,
{
let id = self.websocket_id_counter.fetch_add(1, Ordering::SeqCst);

// setup the request channel for the websocket
let auth_tracker = Arc::new(AuthTracker::new(self.server_time()));
let sender = self.sender_copy.clone();
let server_conn = ServerConnection::new(Arc::clone(&auth_tracker), sender, id);

// setup a websocket connection through the user-provided closure
let (addr, response) = start_websocket(server_conn)?;

// add the obtained reply channel to the list of websocket connections
let client_conn = ClientConnection::new(auth_tracker, addr);
let mut websockets = self.websockets.write().await;
websockets.insert(id, client_conn);

// give back the HTTP 101 response
Ok(response)
}

pub async fn send_to_available_websockets(&self, msg: ServerMessage) {
let lock = self.websockets.read().await;

for sender in lock.values() {
sender.send_if_authenticated(|| msg.clone());
}
}

pub fn server_time(&self) -> u64 {
self.start_time.elapsed().as_millis() as u64
}
}

async fn monitor_performance_metrics(server: Arc<Server>) {
loop {
tokio::time::sleep(std::time::Duration::from_secs(1)).await;

let stats = PerformanceStatisics {
memory_bytes: server.server_time() as usize,
memory_limit_bytes: 1024usize.pow(3) * 8,
cpu_absolute: 50.11,
network: NetworkStatistics {
rx_bytes: 1024,
tx_bytes: 800,
},
uptime: 5000 + server.server_time(),
state: ServerStatus::Running,
disk_bytes: 100,
};

server
.send_to_available_websockets(ServerMessage::Stats(stats))
.await;
}
}

async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) {
loop {
if let Some(msg) = receiver.recv().await {
log::debug!("Server received websocket message: {msg:?}");
}
}
}

pub mod remote;
110 changes: 14 additions & 96 deletions crates/alerion_core/src/webserver.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,21 @@
use std::io;
use std::net::SocketAddr;
use std::sync::Arc;

use actix_web::http::header;
use actix_web::{dev, guard, middleware, web, App, HttpServer};
use alerion_datamodel::webserver::SystemOptions;
use utils::bearer_auth::BearerAuth;

use crate::config::AlerionConfig;
use crate::servers::ServerPool;
use warp::Filter;

const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token";
const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS";

fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders {
middleware::DefaultHeaders::new()
.add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone()))
.add((header::ACCESS_CONTROL_MAX_AGE, 7200))
.add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS))
.add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS))
.add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true"))
}

pub struct Webserver {
server_fut: dev::Server,
}

impl Webserver {
/// Build the webserver. May block if a DNS lookup is required to resolve the host
/// set in the configuration.
pub fn make(config: AlerionConfig, server_pool: Arc<ServerPool>) -> io::Result<Self> {
let moved_out_config = config.clone();

let http_server = HttpServer::new(move || {
let config = moved_out_config.clone();
let token = &config.auth.token;

let base_system_options = SystemOptions {
architecture: "amd64",
cpu_count: 8,
kernel_version: "5.14.0-362.8.1.el9_3.x86_64",
os: "linux",
version: "1.11.11",
};

App::new()
.app_data(web::Data::new(base_system_options))
.app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(Arc::clone(&server_pool)))
.wrap(default_headers(&config))
.wrap(utils::camel_case::CamelCaseHeaders)
.wrap(middleware::Logger::new("%r"))
.route("/", web::get().to(router::root))
.service({
use router::api;

web::scope("/api")
.route(
"servers",
web::post()
.wrap(BearerAuth::new(token.clone()))
.to(api::servers_post),
)
.route(
"system",
web::get()
.wrap(BearerAuth::new(token.clone()))
.to(api::system_get),
)
.route(
"system",
web::route().guard(guard::Options()).to(api::system_options),
)
.route(
"update",
web::post()
.wrap(BearerAuth::new(token.clone()))
.to(api::update_post),
)
.service(web::scope("/servers/{id}").route("ws", web::get().to(api::ws)))
})
});

let ip = config.api.host;
let port = config.api.port;

let server_fut = http_server
.worker_max_blocking_threads(16)
.workers(1)
.bind(SocketAddr::new(ip, port))?
.run();

Ok(Webserver { server_fut })
}

pub async fn serve(self) -> io::Result<()> {
self.server_fut.await
}
//fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders {
//middleware::DefaultHeaders::new()
//.add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone()))
//.add((header::ACCESS_CONTROL_MAX_AGE, 7200))
//.add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS))
//.add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS))
//.add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true"))
//}

pub async fn serve(address: impl Into<SocketAddr>) {
warp::serve(warp::path!("test" / String).map(|name| format!("Hello {}!", name)))
.run(address)
.await
}

pub mod router;
pub mod utils;
7 changes: 0 additions & 7 deletions crates/alerion_core/src/webserver/router.rs

This file was deleted.

54 changes: 0 additions & 54 deletions crates/alerion_core/src/webserver/router/api.rs

This file was deleted.

2 changes: 0 additions & 2 deletions crates/alerion_core/src/webserver/utils.rs

This file was deleted.

Loading

0 comments on commit 64bc6f9

Please sign in to comment.