diff --git a/crates/alerion_core/Cargo.toml b/crates/alerion_core/Cargo.toml index dc129ab..e83f76f 100644 --- a/crates/alerion_core/Cargo.toml +++ b/crates/alerion_core/Cargo.toml @@ -19,7 +19,6 @@ bytestring = "1.3.1" jsonwebtoken = "9.3.0" actix = "0.13.3" actix-web-actors = "4.3.0" -actix-web = "4.5.1" log = "0.4.21" pin-project-lite = "0.2.14" reqwest = { version = "0.12.3" } @@ -27,3 +26,4 @@ smallvec = { version = "1.13.2", features = ["serde"] } directories = "5.0.1" bollard = "0.16.1" bitflags = "2.5.0" +warp = "0.3.7" diff --git a/crates/alerion_core/src/lib.rs b/crates/alerion_core/src/lib.rs index cc297ae..1cf61ca 100644 --- a/crates/alerion_core/src/lib.rs +++ b/crates/alerion_core/src/lib.rs @@ -8,12 +8,8 @@ pub mod servers; pub mod webserver; pub mod websocket; -use std::sync::Arc; - use config::AlerionConfig; use futures::stream::{FuturesUnordered, StreamExt}; -use servers::ServerPool; -use webserver::Webserver; use crate::filesystem::setup_directories; @@ -27,18 +23,15 @@ pub async fn alerion_main() -> anyhow::Result<()> { let project_dirs = setup_directories().await?; let config = AlerionConfig::load(&project_dirs)?; - let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build()); + //let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build()); //server_pool.create_server("0e4059ca-d79b-46a5-8ec4-95bd0736d150".try_into().unwrap()).await; // there is a low likelyhood this will actually block, and if it does // it will block only once for a short amount of time, so it's no big deal. - let webserver = Webserver::make(config, Arc::clone(&server_pool))?; + let webserver = webserver::serve((config.api.host, config.api.port)); - let webserver_handle = tokio::spawn(async move { - let _result = webserver.serve().await; - // handle recovery - }); + let webserver_handle = tokio::spawn(webserver); let mut handles = FuturesUnordered::new(); handles.push(webserver_handle); diff --git a/crates/alerion_core/src/servers.rs b/crates/alerion_core/src/servers.rs index c52059f..68d6bfa 100644 --- a/crates/alerion_core/src/servers.rs +++ b/crates/alerion_core/src/servers.rs @@ -1,22 +1,17 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Instant; -use actix_web::HttpResponse; use alerion_datamodel::remote::server::{ContainerConfig, ServerSettings}; -use alerion_datamodel::websocket::{NetworkStatistics, PerformanceStatisics, ServerStatus}; use bollard::container::{Config, CreateContainerOptions}; use bollard::Docker; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use uuid::Uuid; use crate::config::AlerionConfig; -use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage}; -use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection}; pub struct ServerPoolBuilder { servers: HashMap>, @@ -147,8 +142,6 @@ pub struct Server { uuid: Uuid, container_name: String, websocket_id_counter: AtomicU32, - websockets: RwLock>, - sender_copy: Sender<(u32, PanelMessage)>, server_info: ServerInfo, remote_api: Arc, docker: Arc, @@ -161,23 +154,16 @@ impl Server { remote_api: Arc, docker: Arc, ) -> Result, ServerError> { - let (send, recv) = channel(128); - let server = Arc::new(Self { start_time: Instant::now(), uuid, container_name: format!("{}_container", uuid.as_hyphenated()), websocket_id_counter: AtomicU32::new(0), - websockets: RwLock::new(HashMap::new()), - sender_copy: send, server_info, remote_api, docker, }); - tokio::spawn(task_websocket_receiver(recv)); - tokio::spawn(monitor_performance_metrics(Arc::clone(&server))); - server.create_docker_container().await?; Ok(server) @@ -208,74 +194,9 @@ impl Server { Ok(()) } - pub async fn setup_new_websocket( - &self, - start_websocket: F, - ) -> actix_web::Result - where - F: FnOnce(ServerConnection) -> actix_web::Result<(ConnectionAddr, HttpResponse)>, - { - let id = self.websocket_id_counter.fetch_add(1, Ordering::SeqCst); - - // setup the request channel for the websocket - let auth_tracker = Arc::new(AuthTracker::new(self.server_time())); - let sender = self.sender_copy.clone(); - let server_conn = ServerConnection::new(Arc::clone(&auth_tracker), sender, id); - - // setup a websocket connection through the user-provided closure - let (addr, response) = start_websocket(server_conn)?; - - // add the obtained reply channel to the list of websocket connections - let client_conn = ClientConnection::new(auth_tracker, addr); - let mut websockets = self.websockets.write().await; - websockets.insert(id, client_conn); - - // give back the HTTP 101 response - Ok(response) - } - - pub async fn send_to_available_websockets(&self, msg: ServerMessage) { - let lock = self.websockets.read().await; - - for sender in lock.values() { - sender.send_if_authenticated(|| msg.clone()); - } - } - pub fn server_time(&self) -> u64 { self.start_time.elapsed().as_millis() as u64 } } -async fn monitor_performance_metrics(server: Arc) { - loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - let stats = PerformanceStatisics { - memory_bytes: server.server_time() as usize, - memory_limit_bytes: 1024usize.pow(3) * 8, - cpu_absolute: 50.11, - network: NetworkStatistics { - rx_bytes: 1024, - tx_bytes: 800, - }, - uptime: 5000 + server.server_time(), - state: ServerStatus::Running, - disk_bytes: 100, - }; - - server - .send_to_available_websockets(ServerMessage::Stats(stats)) - .await; - } -} - -async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) { - loop { - if let Some(msg) = receiver.recv().await { - log::debug!("Server received websocket message: {msg:?}"); - } - } -} - pub mod remote; diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index d4c61bd..33a3145 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -1,103 +1,21 @@ -use std::io; use std::net::SocketAddr; -use std::sync::Arc; -use actix_web::http::header; -use actix_web::{dev, guard, middleware, web, App, HttpServer}; -use alerion_datamodel::webserver::SystemOptions; -use utils::bearer_auth::BearerAuth; - -use crate::config::AlerionConfig; -use crate::servers::ServerPool; +use warp::Filter; const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; -fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders { - middleware::DefaultHeaders::new() - .add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone())) - .add((header::ACCESS_CONTROL_MAX_AGE, 7200)) - .add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS)) - .add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS)) - .add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true")) -} - -pub struct Webserver { - server_fut: dev::Server, -} - -impl Webserver { - /// Build the webserver. May block if a DNS lookup is required to resolve the host - /// set in the configuration. - pub fn make(config: AlerionConfig, server_pool: Arc) -> io::Result { - let moved_out_config = config.clone(); - - let http_server = HttpServer::new(move || { - let config = moved_out_config.clone(); - let token = &config.auth.token; - - let base_system_options = SystemOptions { - architecture: "amd64", - cpu_count: 8, - kernel_version: "5.14.0-362.8.1.el9_3.x86_64", - os: "linux", - version: "1.11.11", - }; - - App::new() - .app_data(web::Data::new(base_system_options)) - .app_data(web::Data::new(config.clone())) - .app_data(web::Data::new(Arc::clone(&server_pool))) - .wrap(default_headers(&config)) - .wrap(utils::camel_case::CamelCaseHeaders) - .wrap(middleware::Logger::new("%r")) - .route("/", web::get().to(router::root)) - .service({ - use router::api; - - web::scope("/api") - .route( - "servers", - web::post() - .wrap(BearerAuth::new(token.clone())) - .to(api::servers_post), - ) - .route( - "system", - web::get() - .wrap(BearerAuth::new(token.clone())) - .to(api::system_get), - ) - .route( - "system", - web::route().guard(guard::Options()).to(api::system_options), - ) - .route( - "update", - web::post() - .wrap(BearerAuth::new(token.clone())) - .to(api::update_post), - ) - .service(web::scope("/servers/{id}").route("ws", web::get().to(api::ws))) - }) - }); - - let ip = config.api.host; - let port = config.api.port; - - let server_fut = http_server - .worker_max_blocking_threads(16) - .workers(1) - .bind(SocketAddr::new(ip, port))? - .run(); - - Ok(Webserver { server_fut }) - } - - pub async fn serve(self) -> io::Result<()> { - self.server_fut.await - } +//fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders { +//middleware::DefaultHeaders::new() +//.add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone())) +//.add((header::ACCESS_CONTROL_MAX_AGE, 7200)) +//.add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS)) +//.add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS)) +//.add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true")) +//} + +pub async fn serve(address: impl Into) { + warp::serve(warp::path!("test" / String).map(|name| format!("Hello {}!", name))) + .run(address) + .await } - -pub mod router; -pub mod utils; diff --git a/crates/alerion_core/src/webserver/router.rs b/crates/alerion_core/src/webserver/router.rs deleted file mode 100644 index 0cb9ffd..0000000 --- a/crates/alerion_core/src/webserver/router.rs +++ /dev/null @@ -1,7 +0,0 @@ -use actix_web::{HttpResponse, Responder}; - -pub async fn root() -> impl Responder { - HttpResponse::Ok().body("alerion 0.1.0") -} - -pub mod api; diff --git a/crates/alerion_core/src/webserver/router/api.rs b/crates/alerion_core/src/webserver/router/api.rs deleted file mode 100644 index 63201a3..0000000 --- a/crates/alerion_core/src/webserver/router/api.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::Arc; - -use actix_web::{web, HttpRequest, HttpResponse, Responder}; -use alerion_datamodel::webserver::update::{ConfigUpdateRequest, ConfigUpdateResponse}; -use alerion_datamodel::webserver::CreateServerRequest; -use uuid::Uuid; - -use crate::config::AlerionConfig; -use crate::servers::ServerPool; -use crate::webserver::SystemOptions; - -pub async fn servers_post( - opts: web::Json, - server_pool: web::Data>, -) -> impl Responder { - match server_pool.create_server(opts.uuid).await { - Ok(_) => HttpResponse::Accepted(), - Err(_) => HttpResponse::InternalServerError(), - } -} - -pub async fn system_options() -> impl Responder { - HttpResponse::NoContent() -} - -pub async fn system_get(system_options: web::Data) -> impl Responder { - web::Json(system_options) -} - -pub async fn update_post(_payload: web::Json) -> impl Responder { - web::Json(ConfigUpdateResponse { applied: false }) -} - -pub async fn ws( - req: HttpRequest, - payload: web::Payload, - server_uuid: web::Path, - config: web::Data, - server_pool: web::Data>, -) -> actix_web::Result { - let uuid = server_uuid.into_inner(); - let config = config.into_inner(); - - if let Some(server) = server_pool.get_server(uuid).await { - // if the server doesn't exist well we'll see - let fut = server.setup_new_websocket(|conn| { - crate::websocket::start_websocket(uuid, &config, conn, &req, payload) - }); - - fut.await - } else { - Ok(HttpResponse::NotImplemented().into()) - } -} diff --git a/crates/alerion_core/src/webserver/utils.rs b/crates/alerion_core/src/webserver/utils.rs deleted file mode 100644 index 64561d3..0000000 --- a/crates/alerion_core/src/webserver/utils.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bearer_auth; -pub mod camel_case; diff --git a/crates/alerion_core/src/webserver/utils/bearer_auth.rs b/crates/alerion_core/src/webserver/utils/bearer_auth.rs deleted file mode 100644 index 095b9d1..0000000 --- a/crates/alerion_core/src/webserver/utils/bearer_auth.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::future::{ready, Future, Ready}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_web::body::BoxBody; -use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use actix_web::http::header; -use actix_web::{Error, HttpResponse}; -use futures::ready; -use log::info; -use pin_project_lite::pin_project; - -pin_project! { - #[project = BearerAuthFutureProjected] - pub enum BearerAuthFuture> { - Ok { - #[pin] - ok_fut: S::Future, - }, - Err { - #[pin] - err_fut: Ready>, - } - } -} - -impl Future for BearerAuthFuture -where - S: Service, Error = Error>, - >::Future: Future, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - BearerAuthFutureProjected::Ok { ok_fut } => { - let res = ready!(ok_fut.poll(cx)); - Poll::Ready(res) - } - - BearerAuthFutureProjected::Err { err_fut } => { - let res = ready!(err_fut.poll(cx)); - Poll::Ready(res) - } - } - } -} - -pub struct BearerAuth { - token: String, -} - -impl BearerAuth { - pub fn new(token: String) -> Self { - BearerAuth { token } - } -} - -impl Transform for BearerAuth -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = Ready>; - type InitError = (); - type Response = ServiceResponse; - type Transform = BearerAuthMiddleware; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(BearerAuthMiddleware { - service, - token: self.token.clone(), - })) - } -} - -pub struct BearerAuthMiddleware { - service: S, - token: String, -} - -impl Service for BearerAuthMiddleware -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = BearerAuthFuture; - type Response = ServiceResponse; - - forward_ready!(service); - - fn call(&self, req: ServiceRequest) -> Self::Future { - let headers = req.request().headers(); - - let auth_ok = headers - .get(header::AUTHORIZATION) - .and_then(|content| content.to_str().ok()) - .filter(|s| bearer_matches_token(s, &self.token)) - .is_some(); - - match auth_ok { - true => BearerAuthFuture::Ok { - ok_fut: self.service.call(req), - }, - false => { - let (req, _) = req.into_parts(); - let resp = ServiceResponse::new(req, HttpResponse::Unauthorized().body(())); - let err_fut = ready(Ok(resp)); - BearerAuthFuture::Err { err_fut } - } - } - } -} - -fn bearer_matches_token(bearer: &str, token: &str) -> bool { - info!("trying to match:\n{bearer}\nBearer {token}"); - let expected_bearer_len = token.len() + "Bearer ".len(); - bearer.len() == expected_bearer_len && bearer.get(7..).filter(|t| t == &token).is_some() -} diff --git a/crates/alerion_core/src/webserver/utils/camel_case.rs b/crates/alerion_core/src/webserver/utils/camel_case.rs deleted file mode 100644 index 4f65c45..0000000 --- a/crates/alerion_core/src/webserver/utils/camel_case.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::future::{ready, Future, Ready}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use actix_web::Error; -use futures::ready; -use pin_project_lite::pin_project; - -pin_project! { - pub struct DefaultHeaderFuture> { - #[pin] - fut: S::Future, - } -} - -impl Future for DefaultHeaderFuture -where - S: Service, Error = Error>, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let mut res = ready!(this.fut.poll(cx))?; - - let head = res.response_mut().head_mut(); - head.set_camel_case_headers(true); - - Poll::Ready(Ok(res)) - } -} - -pub struct CamelCaseHeaders; - -impl Transform for CamelCaseHeaders -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = Ready>; - type InitError = (); - type Response = ServiceResponse; - type Transform = CamelCaseHeadersMiddleware; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(CamelCaseHeadersMiddleware { service })) - } -} - -pub struct CamelCaseHeadersMiddleware { - service: S, -} - -impl Service for CamelCaseHeadersMiddleware -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = DefaultHeaderFuture; - type Response = ServiceResponse; - - forward_ready!(service); - - fn call(&self, req: ServiceRequest) -> Self::Future { - let fut = self.service.call(req); - - DefaultHeaderFuture { fut } - } -} diff --git a/crates/alerion_core/src/websocket.rs b/crates/alerion_core/src/websocket.rs index c00f2c4..e69de29 100644 --- a/crates/alerion_core/src/websocket.rs +++ b/crates/alerion_core/src/websocket.rs @@ -1,21 +0,0 @@ -use actix_web::{web, HttpRequest, HttpResponse}; -use actix_web_actors::ws; -use relay::ServerConnection; -use uuid::Uuid; - -use crate::config::AlerionConfig; - -pub fn start_websocket( - server_uuid: Uuid, - config: &AlerionConfig, - conn: ServerConnection, - req: &HttpRequest, - payload: web::Payload, -) -> actix_web::Result<(conn::ConnectionAddr, HttpResponse)> { - let conn = conn::WebsocketConnectionImpl::new(server_uuid, conn, config); - ws::WsResponseBuilder::new(conn, req, payload).start_with_addr() -} - -pub mod auth; -pub mod conn; -pub mod relay; diff --git a/crates/alerion_core/src/websocket/auth.rs b/crates/alerion_core/src/websocket/auth.rs deleted file mode 100644 index dff5ab6..0000000 --- a/crates/alerion_core/src/websocket/auth.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::collections::HashSet; - -use bitflags::bitflags; -use jsonwebtoken::{Algorithm, DecodingKey, Validation}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::config::AlerionConfig; - -#[derive(Debug, Serialize, Deserialize)] -struct Claims { - iss: String, - aud: Vec, - jti: String, - iat: usize, - nbf: usize, - exp: usize, - server_uuid: Uuid, - permissions: Vec, - user_uuid: Uuid, - user_id: usize, - unique_id: String, -} - -bitflags! { - #[derive(Debug, Clone, Copy)] - pub struct Permissions: u32 { - const CONNECT = 1; - const START = 1 << 1; - const STOP = 1 << 2; - const RESTART = 1 << 3; - const CONSOLE = 1 << 4; - const BACKUP_READ = 1 << 5; - const ADMIN_ERRORS = 1 << 6; - const ADMIN_INSTALL = 1 << 7; - const ADMIN_TRANSFER = 1 << 8; - } -} - -impl Permissions { - pub fn from_strings(strings: &[impl AsRef]) -> Self { - let mut this = Permissions::empty(); - - for s in strings { - match s.as_ref() { - "*" => { - this.insert(Permissions::CONNECT); - this.insert(Permissions::START); - this.insert(Permissions::STOP); - this.insert(Permissions::RESTART); - this.insert(Permissions::CONSOLE); - this.insert(Permissions::BACKUP_READ); - } - "websocket.connect" => { - this.insert(Permissions::CONNECT); - } - "control.start" => { - this.insert(Permissions::START); - } - "control.stop" => { - this.insert(Permissions::STOP); - } - "control.restart" => { - this.insert(Permissions::RESTART); - } - "control.console" => { - this.insert(Permissions::CONSOLE); - } - "backup.read" => { - this.insert(Permissions::BACKUP_READ); - } - "admin.websocket.errors" => { - this.insert(Permissions::ADMIN_ERRORS); - } - "admin.websocket.install" => { - this.insert(Permissions::ADMIN_INSTALL); - } - "admin.websocket.transfer" => { - this.insert(Permissions::ADMIN_TRANSFER); - } - _ => {} - } - } - - this - } -} - -pub struct Auth { - validation: Validation, - key: DecodingKey, -} - -impl Auth { - pub fn from_config(cfg: &AlerionConfig) -> Self { - let mut validation = Validation::new(Algorithm::HS256); - - let spec_claims = ["exp", "nbf", "aud", "iss"].map(ToOwned::to_owned); - - validation.required_spec_claims = HashSet::from(spec_claims); - validation.leeway = 10; - validation.reject_tokens_expiring_in_less_than = 0; - validation.validate_exp = false; - validation.validate_nbf = false; - validation.validate_aud = false; - validation.aud = None; - validation.iss = Some(HashSet::from([cfg.remote.clone()])); - validation.sub = None; - - let key = DecodingKey::from_secret(cfg.auth.token.as_ref()); - - Self { validation, key } - } - - pub fn validate(&self, auth: &str, server_uuid: &Uuid) -> Option { - jsonwebtoken::decode::(auth, &self.key, &self.validation) - .ok() - .filter(|result| &result.claims.server_uuid == server_uuid) - .map(|result| Permissions::from_strings(&result.claims.permissions)) - } -} diff --git a/crates/alerion_core/src/websocket/conn.rs b/crates/alerion_core/src/websocket/conn.rs deleted file mode 100644 index 8905394..0000000 --- a/crates/alerion_core/src/websocket/conn.rs +++ /dev/null @@ -1,190 +0,0 @@ -use std::cell::Cell; -use std::convert::Infallible; - -use actix::{Actor, ActorContext, Addr, Handler, StreamHandler}; -use actix_web_actors::ws; -use alerion_datamodel::websocket::*; -use uuid::Uuid; - -use crate::config::AlerionConfig; -use crate::websocket::auth::{Auth, Permissions}; -use crate::websocket::relay::ServerConnection; - -#[derive(Debug, Clone)] -pub enum ServerMessage { - Kill, - Logs(String), - Stats(PerformanceStatisics), -} - -impl actix::Message for ServerMessage { - type Result = Result<(), Infallible>; -} - -#[derive(Debug, Clone)] -pub enum PanelMessage { - Command(String), - ReceiveLogs, - ReceiveInstallLog, - ReceiveStats, -} - -impl actix::Message for PanelMessage { - type Result = Result<(), Infallible>; -} - -pub type ConnectionAddr = Addr; - -enum MessageError { - InvalidJwt, - Generic(String), -} - -pub struct WebsocketConnectionImpl { - server_uuid: Uuid, - server_conn: ServerConnection, - auth: Auth, - permissions: Cell, -} - -impl Actor for WebsocketConnectionImpl { - type Context = ws::WebsocketContext; -} - -impl Handler for WebsocketConnectionImpl { - type Result = Result<(), Infallible>; - - fn handle(&mut self, msg: ServerMessage, ctx: &mut Self::Context) -> Self::Result { - match msg { - ServerMessage::Kill => { - ctx.close(None); - ctx.stop(); - } - - ServerMessage::Logs(logs) => { - ctx.text(RawMessage::new(EventType::Logs, logs)); - } - - ServerMessage::Stats(stats) => { - let str = - serde_json::to_string(&stats).expect("JSON serialization should not fail"); - ctx.text(RawMessage::new(EventType::Stats, str)) - } - } - - Ok(()) - } -} - -impl StreamHandler> for WebsocketConnectionImpl { - fn handle(&mut self, item: Result, ctx: &mut Self::Context) { - use ws::Message; - - let Ok(msg) = item else { - return; - }; - - match msg { - Message::Text(t) => { - let _result = self.handle_text(&t, ctx); - } - _ => println!("TODO: non-text WS msgs"), - } - } -} - -impl WebsocketConnectionImpl { - pub fn new(server_uuid: Uuid, server_conn: ServerConnection, cfg: &AlerionConfig) -> Self { - Self { - server_uuid, - server_conn, - auth: Auth::from_config(cfg), - permissions: Cell::new(Permissions::empty()), - } - } - - pub fn handle_text(&self, msg: &str, ctx: &mut ::Context) -> Option<()> { - // todo: behavior on bad JSON payload? right now just ignore - let event = serde_json::from_str::(msg).ok()?; - - match event.event() { - EventType::Authentication => { - let maybe_permissions = self - .auth - .validate(&event.into_first_arg()?, &self.server_uuid); - - if let Some(permissions) = maybe_permissions { - if permissions.contains(Permissions::CONNECT) { - self.permissions.set(permissions); - self.server_conn.set_authenticated(); - ctx.text(RawMessage::new_no_args(EventType::AuthenticationSuccess)); - } - } else { - self.send_error(ctx, MessageError::InvalidJwt); - } - - Some(()) - } - - ty => { - if self.server_conn.is_authenticated() { - let permissions = self.permissions.get(); - - match ty { - EventType::SendCommand => { - if permissions.contains(Permissions::CONSOLE) { - if let Some(command) = event.into_first_arg() { - self.server_conn - .send_if_authenticated(PanelMessage::Command(command)); - } else { - self.send_error(ctx, MessageError::InvalidJwt); - } - } - } - - EventType::SendStats => { - if permissions.contains(Permissions::CONSOLE) { - self.server_conn - .send_if_authenticated(PanelMessage::ReceiveStats); - } - } - - EventType::SendLogs => { - if permissions.contains(Permissions::CONSOLE) { - self.server_conn - .send_if_authenticated(PanelMessage::ReceiveLogs); - - if permissions.contains(Permissions::ADMIN_INSTALL) { - self.server_conn.force_send(PanelMessage::ReceiveInstallLog); - } - } - } - - e => todo!("{e:?}"), - } - } - - Some(()) - } - } - } - - #[inline(always)] - fn send_error(&self, ctx: &mut ::Context, err: MessageError) { - let precise_errors = self.permissions.get().contains(Permissions::ADMIN_ERRORS); - - let raw_msg = if precise_errors { - match err { - MessageError::InvalidJwt => RawMessage::new_no_args(EventType::JwtError), - MessageError::Generic(s) => RawMessage::new(EventType::DaemonError, s), - } - } else { - RawMessage::new( - EventType::DaemonError, - "An unexpected error occurred".to_owned(), - ) - }; - - ctx.text(raw_msg) - } -} diff --git a/crates/alerion_core/src/websocket/relay.rs b/crates/alerion_core/src/websocket/relay.rs deleted file mode 100644 index 156c797..0000000 --- a/crates/alerion_core/src/websocket/relay.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; - -use tokio::sync::mpsc::Sender; - -use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage}; - -pub struct ServerConnection { - auth_tracker: Arc, - sender: Sender<(u32, PanelMessage)>, - id: u32, -} - -impl ServerConnection { - pub fn new( - auth_tracker: Arc, - sender: Sender<(u32, PanelMessage)>, - id: u32, - ) -> Self { - ServerConnection { - auth_tracker, - sender, - id, - } - } - - pub fn set_authenticated(&self) { - self.auth_tracker.set_auth(true); - } - - pub fn is_authenticated(&self) -> bool { - self.auth_tracker.get_auth() - } - - pub fn send_if_authenticated(&self, msg: PanelMessage) { - if self.auth_tracker.get_auth() { - let _ = self.sender.try_send((self.id, msg)); - } - } - - pub fn force_send(&self, msg: PanelMessage) { - let _ = self.sender.try_send((self.id, msg)); - } - - pub fn auth_tracker(&self) -> Arc { - Arc::clone(&self.auth_tracker) - } -} - -pub struct ClientConnection { - auth_tracker: Arc, - ws_sender: ConnectionAddr, -} - -impl ClientConnection { - pub fn new(auth_tracker: Arc, ws_sender: ConnectionAddr) -> Self { - Self { - auth_tracker, - ws_sender, - } - } - - /// Uses a closure because many messages might be expensive to compute. - pub fn send_if_authenticated(&self, msg: F) - where - F: FnOnce() -> ServerMessage, - { - if self.auth_tracker.get_auth() { - let m = msg(); - self.ws_sender.do_send(m); - } - } - - /// Terminate the connection on the server's side. - /// - /// There could be a condition where the server tries to terminate the connection, - /// sets the auth bool to false and tells the websocket to kill itself. Before the - /// websocket connection is actually terminated, the client to re-authenticate and - /// send more messages to the server. This should be a non-issue: who cares if - /// the client manages to send a few more frames; the connection will eventually - /// terminate. - /// - /// This would easily be fixable with another atomic check, but I'd rather avoid - /// seemingly unnecessary atomic loads. - pub fn terminate(&self) { - self.expire_auth(); - self.ws_sender.do_send(ServerMessage::Kill); - } - - pub fn expire_auth(&self) { - self.auth_tracker.set_auth(false); - } - - pub fn is_authenticated(&self) -> bool { - self.auth_tracker.get_auth() - } -} - -/// A middleman between a websocket connection and a server, which keeps track of -/// auth state and the status of the websocket connection. -pub struct AuthTracker { - started_at: AtomicU64, - authenticated: AtomicBool, -} - -impl AuthTracker { - pub fn new(server_time: u64) -> Self { - Self { - started_at: AtomicU64::new(server_time), - authenticated: AtomicBool::new(false), - } - } - - pub fn set_auth(&self, value: bool) { - self.authenticated.store(value, Ordering::SeqCst); - } - - pub fn get_auth(&self) -> bool { - self.authenticated.load(Ordering::SeqCst) - } -} diff --git a/flake.nix b/flake.nix index 8fe90a1..6212609 100644 --- a/flake.nix +++ b/flake.nix @@ -24,6 +24,7 @@ with pkgs; { devShells.default = mkShell { buildInputs = [ + bacon openssl pkg-config (rust-bin.fromRustupToolchainFile ./rust-toolchain.toml)