From 64bc6f9e376724b49b12f6909748169eb537b530 Mon Sep 17 00:00:00 2001 From: pupbrained Date: Sun, 21 Apr 2024 03:28:45 -0400 Subject: [PATCH 1/6] initial work for switching to warp i'll fix this commit message later --- crates/alerion_core/Cargo.toml | 2 +- crates/alerion_core/src/lib.rs | 13 +- crates/alerion_core/src/servers.rs | 81 +------- crates/alerion_core/src/webserver.rs | 110 ++-------- crates/alerion_core/src/webserver/router.rs | 7 - .../alerion_core/src/webserver/router/api.rs | 54 ----- crates/alerion_core/src/webserver/utils.rs | 2 - .../src/webserver/utils/bearer_auth.rs | 121 ----------- .../src/webserver/utils/camel_case.rs | 72 ------- crates/alerion_core/src/websocket.rs | 21 -- crates/alerion_core/src/websocket/auth.rs | 121 ----------- crates/alerion_core/src/websocket/conn.rs | 190 ------------------ crates/alerion_core/src/websocket/relay.rs | 121 ----------- flake.nix | 1 + 14 files changed, 20 insertions(+), 896 deletions(-) delete mode 100644 crates/alerion_core/src/webserver/router.rs delete mode 100644 crates/alerion_core/src/webserver/router/api.rs delete mode 100644 crates/alerion_core/src/webserver/utils.rs delete mode 100644 crates/alerion_core/src/webserver/utils/bearer_auth.rs delete mode 100644 crates/alerion_core/src/webserver/utils/camel_case.rs delete mode 100644 crates/alerion_core/src/websocket/auth.rs delete mode 100644 crates/alerion_core/src/websocket/conn.rs delete mode 100644 crates/alerion_core/src/websocket/relay.rs diff --git a/crates/alerion_core/Cargo.toml b/crates/alerion_core/Cargo.toml index dc129ab..e83f76f 100644 --- a/crates/alerion_core/Cargo.toml +++ b/crates/alerion_core/Cargo.toml @@ -19,7 +19,6 @@ bytestring = "1.3.1" jsonwebtoken = "9.3.0" actix = "0.13.3" actix-web-actors = "4.3.0" -actix-web = "4.5.1" log = "0.4.21" pin-project-lite = "0.2.14" reqwest = { version = "0.12.3" } @@ -27,3 +26,4 @@ smallvec = { version = "1.13.2", features = ["serde"] } directories = "5.0.1" bollard = "0.16.1" bitflags = "2.5.0" +warp = "0.3.7" diff --git a/crates/alerion_core/src/lib.rs b/crates/alerion_core/src/lib.rs index cc297ae..1cf61ca 100644 --- a/crates/alerion_core/src/lib.rs +++ b/crates/alerion_core/src/lib.rs @@ -8,12 +8,8 @@ pub mod servers; pub mod webserver; pub mod websocket; -use std::sync::Arc; - use config::AlerionConfig; use futures::stream::{FuturesUnordered, StreamExt}; -use servers::ServerPool; -use webserver::Webserver; use crate::filesystem::setup_directories; @@ -27,18 +23,15 @@ pub async fn alerion_main() -> anyhow::Result<()> { let project_dirs = setup_directories().await?; let config = AlerionConfig::load(&project_dirs)?; - let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build()); + //let server_pool = Arc::new(ServerPool::builder(&config)?.fetch_servers().await?.build()); //server_pool.create_server("0e4059ca-d79b-46a5-8ec4-95bd0736d150".try_into().unwrap()).await; // there is a low likelyhood this will actually block, and if it does // it will block only once for a short amount of time, so it's no big deal. - let webserver = Webserver::make(config, Arc::clone(&server_pool))?; + let webserver = webserver::serve((config.api.host, config.api.port)); - let webserver_handle = tokio::spawn(async move { - let _result = webserver.serve().await; - // handle recovery - }); + let webserver_handle = tokio::spawn(webserver); let mut handles = FuturesUnordered::new(); handles.push(webserver_handle); diff --git a/crates/alerion_core/src/servers.rs b/crates/alerion_core/src/servers.rs index c52059f..68d6bfa 100644 --- a/crates/alerion_core/src/servers.rs +++ b/crates/alerion_core/src/servers.rs @@ -1,22 +1,17 @@ use std::collections::HashMap; -use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::atomic::AtomicU32; use std::sync::Arc; use std::time::Instant; -use actix_web::HttpResponse; use alerion_datamodel::remote::server::{ContainerConfig, ServerSettings}; -use alerion_datamodel::websocket::{NetworkStatistics, PerformanceStatisics, ServerStatus}; use bollard::container::{Config, CreateContainerOptions}; use bollard::Docker; use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::sync::mpsc::{channel, Receiver, Sender}; use tokio::sync::RwLock; use uuid::Uuid; use crate::config::AlerionConfig; -use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage}; -use crate::websocket::relay::{AuthTracker, ClientConnection, ServerConnection}; pub struct ServerPoolBuilder { servers: HashMap>, @@ -147,8 +142,6 @@ pub struct Server { uuid: Uuid, container_name: String, websocket_id_counter: AtomicU32, - websockets: RwLock>, - sender_copy: Sender<(u32, PanelMessage)>, server_info: ServerInfo, remote_api: Arc, docker: Arc, @@ -161,23 +154,16 @@ impl Server { remote_api: Arc, docker: Arc, ) -> Result, ServerError> { - let (send, recv) = channel(128); - let server = Arc::new(Self { start_time: Instant::now(), uuid, container_name: format!("{}_container", uuid.as_hyphenated()), websocket_id_counter: AtomicU32::new(0), - websockets: RwLock::new(HashMap::new()), - sender_copy: send, server_info, remote_api, docker, }); - tokio::spawn(task_websocket_receiver(recv)); - tokio::spawn(monitor_performance_metrics(Arc::clone(&server))); - server.create_docker_container().await?; Ok(server) @@ -208,74 +194,9 @@ impl Server { Ok(()) } - pub async fn setup_new_websocket( - &self, - start_websocket: F, - ) -> actix_web::Result - where - F: FnOnce(ServerConnection) -> actix_web::Result<(ConnectionAddr, HttpResponse)>, - { - let id = self.websocket_id_counter.fetch_add(1, Ordering::SeqCst); - - // setup the request channel for the websocket - let auth_tracker = Arc::new(AuthTracker::new(self.server_time())); - let sender = self.sender_copy.clone(); - let server_conn = ServerConnection::new(Arc::clone(&auth_tracker), sender, id); - - // setup a websocket connection through the user-provided closure - let (addr, response) = start_websocket(server_conn)?; - - // add the obtained reply channel to the list of websocket connections - let client_conn = ClientConnection::new(auth_tracker, addr); - let mut websockets = self.websockets.write().await; - websockets.insert(id, client_conn); - - // give back the HTTP 101 response - Ok(response) - } - - pub async fn send_to_available_websockets(&self, msg: ServerMessage) { - let lock = self.websockets.read().await; - - for sender in lock.values() { - sender.send_if_authenticated(|| msg.clone()); - } - } - pub fn server_time(&self) -> u64 { self.start_time.elapsed().as_millis() as u64 } } -async fn monitor_performance_metrics(server: Arc) { - loop { - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - - let stats = PerformanceStatisics { - memory_bytes: server.server_time() as usize, - memory_limit_bytes: 1024usize.pow(3) * 8, - cpu_absolute: 50.11, - network: NetworkStatistics { - rx_bytes: 1024, - tx_bytes: 800, - }, - uptime: 5000 + server.server_time(), - state: ServerStatus::Running, - disk_bytes: 100, - }; - - server - .send_to_available_websockets(ServerMessage::Stats(stats)) - .await; - } -} - -async fn task_websocket_receiver(mut receiver: Receiver<(u32, PanelMessage)>) { - loop { - if let Some(msg) = receiver.recv().await { - log::debug!("Server received websocket message: {msg:?}"); - } - } -} - pub mod remote; diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index d4c61bd..33a3145 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -1,103 +1,21 @@ -use std::io; use std::net::SocketAddr; -use std::sync::Arc; -use actix_web::http::header; -use actix_web::{dev, guard, middleware, web, App, HttpServer}; -use alerion_datamodel::webserver::SystemOptions; -use utils::bearer_auth::BearerAuth; - -use crate::config::AlerionConfig; -use crate::servers::ServerPool; +use warp::Filter; const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; -fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders { - middleware::DefaultHeaders::new() - .add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone())) - .add((header::ACCESS_CONTROL_MAX_AGE, 7200)) - .add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS)) - .add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS)) - .add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true")) -} - -pub struct Webserver { - server_fut: dev::Server, -} - -impl Webserver { - /// Build the webserver. May block if a DNS lookup is required to resolve the host - /// set in the configuration. - pub fn make(config: AlerionConfig, server_pool: Arc) -> io::Result { - let moved_out_config = config.clone(); - - let http_server = HttpServer::new(move || { - let config = moved_out_config.clone(); - let token = &config.auth.token; - - let base_system_options = SystemOptions { - architecture: "amd64", - cpu_count: 8, - kernel_version: "5.14.0-362.8.1.el9_3.x86_64", - os: "linux", - version: "1.11.11", - }; - - App::new() - .app_data(web::Data::new(base_system_options)) - .app_data(web::Data::new(config.clone())) - .app_data(web::Data::new(Arc::clone(&server_pool))) - .wrap(default_headers(&config)) - .wrap(utils::camel_case::CamelCaseHeaders) - .wrap(middleware::Logger::new("%r")) - .route("/", web::get().to(router::root)) - .service({ - use router::api; - - web::scope("/api") - .route( - "servers", - web::post() - .wrap(BearerAuth::new(token.clone())) - .to(api::servers_post), - ) - .route( - "system", - web::get() - .wrap(BearerAuth::new(token.clone())) - .to(api::system_get), - ) - .route( - "system", - web::route().guard(guard::Options()).to(api::system_options), - ) - .route( - "update", - web::post() - .wrap(BearerAuth::new(token.clone())) - .to(api::update_post), - ) - .service(web::scope("/servers/{id}").route("ws", web::get().to(api::ws))) - }) - }); - - let ip = config.api.host; - let port = config.api.port; - - let server_fut = http_server - .worker_max_blocking_threads(16) - .workers(1) - .bind(SocketAddr::new(ip, port))? - .run(); - - Ok(Webserver { server_fut }) - } - - pub async fn serve(self) -> io::Result<()> { - self.server_fut.await - } +//fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders { +//middleware::DefaultHeaders::new() +//.add((header::ACCESS_CONTROL_ALLOW_ORIGIN, config.remote.clone())) +//.add((header::ACCESS_CONTROL_MAX_AGE, 7200)) +//.add((header::ACCESS_CONTROL_ALLOW_HEADERS, ALLOWED_HEADERS)) +//.add((header::ACCESS_CONTROL_ALLOW_METHODS, ALLOWED_METHODS)) +//.add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true")) +//} + +pub async fn serve(address: impl Into) { + warp::serve(warp::path!("test" / String).map(|name| format!("Hello {}!", name))) + .run(address) + .await } - -pub mod router; -pub mod utils; diff --git a/crates/alerion_core/src/webserver/router.rs b/crates/alerion_core/src/webserver/router.rs deleted file mode 100644 index 0cb9ffd..0000000 --- a/crates/alerion_core/src/webserver/router.rs +++ /dev/null @@ -1,7 +0,0 @@ -use actix_web::{HttpResponse, Responder}; - -pub async fn root() -> impl Responder { - HttpResponse::Ok().body("alerion 0.1.0") -} - -pub mod api; diff --git a/crates/alerion_core/src/webserver/router/api.rs b/crates/alerion_core/src/webserver/router/api.rs deleted file mode 100644 index 63201a3..0000000 --- a/crates/alerion_core/src/webserver/router/api.rs +++ /dev/null @@ -1,54 +0,0 @@ -use std::sync::Arc; - -use actix_web::{web, HttpRequest, HttpResponse, Responder}; -use alerion_datamodel::webserver::update::{ConfigUpdateRequest, ConfigUpdateResponse}; -use alerion_datamodel::webserver::CreateServerRequest; -use uuid::Uuid; - -use crate::config::AlerionConfig; -use crate::servers::ServerPool; -use crate::webserver::SystemOptions; - -pub async fn servers_post( - opts: web::Json, - server_pool: web::Data>, -) -> impl Responder { - match server_pool.create_server(opts.uuid).await { - Ok(_) => HttpResponse::Accepted(), - Err(_) => HttpResponse::InternalServerError(), - } -} - -pub async fn system_options() -> impl Responder { - HttpResponse::NoContent() -} - -pub async fn system_get(system_options: web::Data) -> impl Responder { - web::Json(system_options) -} - -pub async fn update_post(_payload: web::Json) -> impl Responder { - web::Json(ConfigUpdateResponse { applied: false }) -} - -pub async fn ws( - req: HttpRequest, - payload: web::Payload, - server_uuid: web::Path, - config: web::Data, - server_pool: web::Data>, -) -> actix_web::Result { - let uuid = server_uuid.into_inner(); - let config = config.into_inner(); - - if let Some(server) = server_pool.get_server(uuid).await { - // if the server doesn't exist well we'll see - let fut = server.setup_new_websocket(|conn| { - crate::websocket::start_websocket(uuid, &config, conn, &req, payload) - }); - - fut.await - } else { - Ok(HttpResponse::NotImplemented().into()) - } -} diff --git a/crates/alerion_core/src/webserver/utils.rs b/crates/alerion_core/src/webserver/utils.rs deleted file mode 100644 index 64561d3..0000000 --- a/crates/alerion_core/src/webserver/utils.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod bearer_auth; -pub mod camel_case; diff --git a/crates/alerion_core/src/webserver/utils/bearer_auth.rs b/crates/alerion_core/src/webserver/utils/bearer_auth.rs deleted file mode 100644 index 095b9d1..0000000 --- a/crates/alerion_core/src/webserver/utils/bearer_auth.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::future::{ready, Future, Ready}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_web::body::BoxBody; -use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use actix_web::http::header; -use actix_web::{Error, HttpResponse}; -use futures::ready; -use log::info; -use pin_project_lite::pin_project; - -pin_project! { - #[project = BearerAuthFutureProjected] - pub enum BearerAuthFuture> { - Ok { - #[pin] - ok_fut: S::Future, - }, - Err { - #[pin] - err_fut: Ready>, - } - } -} - -impl Future for BearerAuthFuture -where - S: Service, Error = Error>, - >::Future: Future, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.project() { - BearerAuthFutureProjected::Ok { ok_fut } => { - let res = ready!(ok_fut.poll(cx)); - Poll::Ready(res) - } - - BearerAuthFutureProjected::Err { err_fut } => { - let res = ready!(err_fut.poll(cx)); - Poll::Ready(res) - } - } - } -} - -pub struct BearerAuth { - token: String, -} - -impl BearerAuth { - pub fn new(token: String) -> Self { - BearerAuth { token } - } -} - -impl Transform for BearerAuth -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = Ready>; - type InitError = (); - type Response = ServiceResponse; - type Transform = BearerAuthMiddleware; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(BearerAuthMiddleware { - service, - token: self.token.clone(), - })) - } -} - -pub struct BearerAuthMiddleware { - service: S, - token: String, -} - -impl Service for BearerAuthMiddleware -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = BearerAuthFuture; - type Response = ServiceResponse; - - forward_ready!(service); - - fn call(&self, req: ServiceRequest) -> Self::Future { - let headers = req.request().headers(); - - let auth_ok = headers - .get(header::AUTHORIZATION) - .and_then(|content| content.to_str().ok()) - .filter(|s| bearer_matches_token(s, &self.token)) - .is_some(); - - match auth_ok { - true => BearerAuthFuture::Ok { - ok_fut: self.service.call(req), - }, - false => { - let (req, _) = req.into_parts(); - let resp = ServiceResponse::new(req, HttpResponse::Unauthorized().body(())); - let err_fut = ready(Ok(resp)); - BearerAuthFuture::Err { err_fut } - } - } - } -} - -fn bearer_matches_token(bearer: &str, token: &str) -> bool { - info!("trying to match:\n{bearer}\nBearer {token}"); - let expected_bearer_len = token.len() + "Bearer ".len(); - bearer.len() == expected_bearer_len && bearer.get(7..).filter(|t| t == &token).is_some() -} diff --git a/crates/alerion_core/src/webserver/utils/camel_case.rs b/crates/alerion_core/src/webserver/utils/camel_case.rs deleted file mode 100644 index 4f65c45..0000000 --- a/crates/alerion_core/src/webserver/utils/camel_case.rs +++ /dev/null @@ -1,72 +0,0 @@ -use std::future::{ready, Future, Ready}; -use std::pin::Pin; -use std::task::{Context, Poll}; - -use actix_web::dev::{forward_ready, Service, ServiceRequest, ServiceResponse, Transform}; -use actix_web::Error; -use futures::ready; -use pin_project_lite::pin_project; - -pin_project! { - pub struct DefaultHeaderFuture> { - #[pin] - fut: S::Future, - } -} - -impl Future for DefaultHeaderFuture -where - S: Service, Error = Error>, -{ - type Output = ::Output; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.project(); - let mut res = ready!(this.fut.poll(cx))?; - - let head = res.response_mut().head_mut(); - head.set_camel_case_headers(true); - - Poll::Ready(Ok(res)) - } -} - -pub struct CamelCaseHeaders; - -impl Transform for CamelCaseHeaders -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = Ready>; - type InitError = (); - type Response = ServiceResponse; - type Transform = CamelCaseHeadersMiddleware; - - fn new_transform(&self, service: S) -> Self::Future { - ready(Ok(CamelCaseHeadersMiddleware { service })) - } -} - -pub struct CamelCaseHeadersMiddleware { - service: S, -} - -impl Service for CamelCaseHeadersMiddleware -where - S: Service, Error = Error>, - S::Future: 'static, -{ - type Error = Error; - type Future = DefaultHeaderFuture; - type Response = ServiceResponse; - - forward_ready!(service); - - fn call(&self, req: ServiceRequest) -> Self::Future { - let fut = self.service.call(req); - - DefaultHeaderFuture { fut } - } -} diff --git a/crates/alerion_core/src/websocket.rs b/crates/alerion_core/src/websocket.rs index c00f2c4..e69de29 100644 --- a/crates/alerion_core/src/websocket.rs +++ b/crates/alerion_core/src/websocket.rs @@ -1,21 +0,0 @@ -use actix_web::{web, HttpRequest, HttpResponse}; -use actix_web_actors::ws; -use relay::ServerConnection; -use uuid::Uuid; - -use crate::config::AlerionConfig; - -pub fn start_websocket( - server_uuid: Uuid, - config: &AlerionConfig, - conn: ServerConnection, - req: &HttpRequest, - payload: web::Payload, -) -> actix_web::Result<(conn::ConnectionAddr, HttpResponse)> { - let conn = conn::WebsocketConnectionImpl::new(server_uuid, conn, config); - ws::WsResponseBuilder::new(conn, req, payload).start_with_addr() -} - -pub mod auth; -pub mod conn; -pub mod relay; diff --git a/crates/alerion_core/src/websocket/auth.rs b/crates/alerion_core/src/websocket/auth.rs deleted file mode 100644 index dff5ab6..0000000 --- a/crates/alerion_core/src/websocket/auth.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::collections::HashSet; - -use bitflags::bitflags; -use jsonwebtoken::{Algorithm, DecodingKey, Validation}; -use serde::{Deserialize, Serialize}; -use uuid::Uuid; - -use crate::config::AlerionConfig; - -#[derive(Debug, Serialize, Deserialize)] -struct Claims { - iss: String, - aud: Vec, - jti: String, - iat: usize, - nbf: usize, - exp: usize, - server_uuid: Uuid, - permissions: Vec, - user_uuid: Uuid, - user_id: usize, - unique_id: String, -} - -bitflags! { - #[derive(Debug, Clone, Copy)] - pub struct Permissions: u32 { - const CONNECT = 1; - const START = 1 << 1; - const STOP = 1 << 2; - const RESTART = 1 << 3; - const CONSOLE = 1 << 4; - const BACKUP_READ = 1 << 5; - const ADMIN_ERRORS = 1 << 6; - const ADMIN_INSTALL = 1 << 7; - const ADMIN_TRANSFER = 1 << 8; - } -} - -impl Permissions { - pub fn from_strings(strings: &[impl AsRef]) -> Self { - let mut this = Permissions::empty(); - - for s in strings { - match s.as_ref() { - "*" => { - this.insert(Permissions::CONNECT); - this.insert(Permissions::START); - this.insert(Permissions::STOP); - this.insert(Permissions::RESTART); - this.insert(Permissions::CONSOLE); - this.insert(Permissions::BACKUP_READ); - } - "websocket.connect" => { - this.insert(Permissions::CONNECT); - } - "control.start" => { - this.insert(Permissions::START); - } - "control.stop" => { - this.insert(Permissions::STOP); - } - "control.restart" => { - this.insert(Permissions::RESTART); - } - "control.console" => { - this.insert(Permissions::CONSOLE); - } - "backup.read" => { - this.insert(Permissions::BACKUP_READ); - } - "admin.websocket.errors" => { - this.insert(Permissions::ADMIN_ERRORS); - } - "admin.websocket.install" => { - this.insert(Permissions::ADMIN_INSTALL); - } - "admin.websocket.transfer" => { - this.insert(Permissions::ADMIN_TRANSFER); - } - _ => {} - } - } - - this - } -} - -pub struct Auth { - validation: Validation, - key: DecodingKey, -} - -impl Auth { - pub fn from_config(cfg: &AlerionConfig) -> Self { - let mut validation = Validation::new(Algorithm::HS256); - - let spec_claims = ["exp", "nbf", "aud", "iss"].map(ToOwned::to_owned); - - validation.required_spec_claims = HashSet::from(spec_claims); - validation.leeway = 10; - validation.reject_tokens_expiring_in_less_than = 0; - validation.validate_exp = false; - validation.validate_nbf = false; - validation.validate_aud = false; - validation.aud = None; - validation.iss = Some(HashSet::from([cfg.remote.clone()])); - validation.sub = None; - - let key = DecodingKey::from_secret(cfg.auth.token.as_ref()); - - Self { validation, key } - } - - pub fn validate(&self, auth: &str, server_uuid: &Uuid) -> Option { - jsonwebtoken::decode::(auth, &self.key, &self.validation) - .ok() - .filter(|result| &result.claims.server_uuid == server_uuid) - .map(|result| Permissions::from_strings(&result.claims.permissions)) - } -} diff --git a/crates/alerion_core/src/websocket/conn.rs b/crates/alerion_core/src/websocket/conn.rs deleted file mode 100644 index 8905394..0000000 --- a/crates/alerion_core/src/websocket/conn.rs +++ /dev/null @@ -1,190 +0,0 @@ -use std::cell::Cell; -use std::convert::Infallible; - -use actix::{Actor, ActorContext, Addr, Handler, StreamHandler}; -use actix_web_actors::ws; -use alerion_datamodel::websocket::*; -use uuid::Uuid; - -use crate::config::AlerionConfig; -use crate::websocket::auth::{Auth, Permissions}; -use crate::websocket::relay::ServerConnection; - -#[derive(Debug, Clone)] -pub enum ServerMessage { - Kill, - Logs(String), - Stats(PerformanceStatisics), -} - -impl actix::Message for ServerMessage { - type Result = Result<(), Infallible>; -} - -#[derive(Debug, Clone)] -pub enum PanelMessage { - Command(String), - ReceiveLogs, - ReceiveInstallLog, - ReceiveStats, -} - -impl actix::Message for PanelMessage { - type Result = Result<(), Infallible>; -} - -pub type ConnectionAddr = Addr; - -enum MessageError { - InvalidJwt, - Generic(String), -} - -pub struct WebsocketConnectionImpl { - server_uuid: Uuid, - server_conn: ServerConnection, - auth: Auth, - permissions: Cell, -} - -impl Actor for WebsocketConnectionImpl { - type Context = ws::WebsocketContext; -} - -impl Handler for WebsocketConnectionImpl { - type Result = Result<(), Infallible>; - - fn handle(&mut self, msg: ServerMessage, ctx: &mut Self::Context) -> Self::Result { - match msg { - ServerMessage::Kill => { - ctx.close(None); - ctx.stop(); - } - - ServerMessage::Logs(logs) => { - ctx.text(RawMessage::new(EventType::Logs, logs)); - } - - ServerMessage::Stats(stats) => { - let str = - serde_json::to_string(&stats).expect("JSON serialization should not fail"); - ctx.text(RawMessage::new(EventType::Stats, str)) - } - } - - Ok(()) - } -} - -impl StreamHandler> for WebsocketConnectionImpl { - fn handle(&mut self, item: Result, ctx: &mut Self::Context) { - use ws::Message; - - let Ok(msg) = item else { - return; - }; - - match msg { - Message::Text(t) => { - let _result = self.handle_text(&t, ctx); - } - _ => println!("TODO: non-text WS msgs"), - } - } -} - -impl WebsocketConnectionImpl { - pub fn new(server_uuid: Uuid, server_conn: ServerConnection, cfg: &AlerionConfig) -> Self { - Self { - server_uuid, - server_conn, - auth: Auth::from_config(cfg), - permissions: Cell::new(Permissions::empty()), - } - } - - pub fn handle_text(&self, msg: &str, ctx: &mut ::Context) -> Option<()> { - // todo: behavior on bad JSON payload? right now just ignore - let event = serde_json::from_str::(msg).ok()?; - - match event.event() { - EventType::Authentication => { - let maybe_permissions = self - .auth - .validate(&event.into_first_arg()?, &self.server_uuid); - - if let Some(permissions) = maybe_permissions { - if permissions.contains(Permissions::CONNECT) { - self.permissions.set(permissions); - self.server_conn.set_authenticated(); - ctx.text(RawMessage::new_no_args(EventType::AuthenticationSuccess)); - } - } else { - self.send_error(ctx, MessageError::InvalidJwt); - } - - Some(()) - } - - ty => { - if self.server_conn.is_authenticated() { - let permissions = self.permissions.get(); - - match ty { - EventType::SendCommand => { - if permissions.contains(Permissions::CONSOLE) { - if let Some(command) = event.into_first_arg() { - self.server_conn - .send_if_authenticated(PanelMessage::Command(command)); - } else { - self.send_error(ctx, MessageError::InvalidJwt); - } - } - } - - EventType::SendStats => { - if permissions.contains(Permissions::CONSOLE) { - self.server_conn - .send_if_authenticated(PanelMessage::ReceiveStats); - } - } - - EventType::SendLogs => { - if permissions.contains(Permissions::CONSOLE) { - self.server_conn - .send_if_authenticated(PanelMessage::ReceiveLogs); - - if permissions.contains(Permissions::ADMIN_INSTALL) { - self.server_conn.force_send(PanelMessage::ReceiveInstallLog); - } - } - } - - e => todo!("{e:?}"), - } - } - - Some(()) - } - } - } - - #[inline(always)] - fn send_error(&self, ctx: &mut ::Context, err: MessageError) { - let precise_errors = self.permissions.get().contains(Permissions::ADMIN_ERRORS); - - let raw_msg = if precise_errors { - match err { - MessageError::InvalidJwt => RawMessage::new_no_args(EventType::JwtError), - MessageError::Generic(s) => RawMessage::new(EventType::DaemonError, s), - } - } else { - RawMessage::new( - EventType::DaemonError, - "An unexpected error occurred".to_owned(), - ) - }; - - ctx.text(raw_msg) - } -} diff --git a/crates/alerion_core/src/websocket/relay.rs b/crates/alerion_core/src/websocket/relay.rs deleted file mode 100644 index 156c797..0000000 --- a/crates/alerion_core/src/websocket/relay.rs +++ /dev/null @@ -1,121 +0,0 @@ -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; - -use tokio::sync::mpsc::Sender; - -use crate::websocket::conn::{ConnectionAddr, PanelMessage, ServerMessage}; - -pub struct ServerConnection { - auth_tracker: Arc, - sender: Sender<(u32, PanelMessage)>, - id: u32, -} - -impl ServerConnection { - pub fn new( - auth_tracker: Arc, - sender: Sender<(u32, PanelMessage)>, - id: u32, - ) -> Self { - ServerConnection { - auth_tracker, - sender, - id, - } - } - - pub fn set_authenticated(&self) { - self.auth_tracker.set_auth(true); - } - - pub fn is_authenticated(&self) -> bool { - self.auth_tracker.get_auth() - } - - pub fn send_if_authenticated(&self, msg: PanelMessage) { - if self.auth_tracker.get_auth() { - let _ = self.sender.try_send((self.id, msg)); - } - } - - pub fn force_send(&self, msg: PanelMessage) { - let _ = self.sender.try_send((self.id, msg)); - } - - pub fn auth_tracker(&self) -> Arc { - Arc::clone(&self.auth_tracker) - } -} - -pub struct ClientConnection { - auth_tracker: Arc, - ws_sender: ConnectionAddr, -} - -impl ClientConnection { - pub fn new(auth_tracker: Arc, ws_sender: ConnectionAddr) -> Self { - Self { - auth_tracker, - ws_sender, - } - } - - /// Uses a closure because many messages might be expensive to compute. - pub fn send_if_authenticated(&self, msg: F) - where - F: FnOnce() -> ServerMessage, - { - if self.auth_tracker.get_auth() { - let m = msg(); - self.ws_sender.do_send(m); - } - } - - /// Terminate the connection on the server's side. - /// - /// There could be a condition where the server tries to terminate the connection, - /// sets the auth bool to false and tells the websocket to kill itself. Before the - /// websocket connection is actually terminated, the client to re-authenticate and - /// send more messages to the server. This should be a non-issue: who cares if - /// the client manages to send a few more frames; the connection will eventually - /// terminate. - /// - /// This would easily be fixable with another atomic check, but I'd rather avoid - /// seemingly unnecessary atomic loads. - pub fn terminate(&self) { - self.expire_auth(); - self.ws_sender.do_send(ServerMessage::Kill); - } - - pub fn expire_auth(&self) { - self.auth_tracker.set_auth(false); - } - - pub fn is_authenticated(&self) -> bool { - self.auth_tracker.get_auth() - } -} - -/// A middleman between a websocket connection and a server, which keeps track of -/// auth state and the status of the websocket connection. -pub struct AuthTracker { - started_at: AtomicU64, - authenticated: AtomicBool, -} - -impl AuthTracker { - pub fn new(server_time: u64) -> Self { - Self { - started_at: AtomicU64::new(server_time), - authenticated: AtomicBool::new(false), - } - } - - pub fn set_auth(&self, value: bool) { - self.authenticated.store(value, Ordering::SeqCst); - } - - pub fn get_auth(&self) -> bool { - self.authenticated.load(Ordering::SeqCst) - } -} diff --git a/flake.nix b/flake.nix index 8fe90a1..6212609 100644 --- a/flake.nix +++ b/flake.nix @@ -24,6 +24,7 @@ with pkgs; { devShells.default = mkShell { buildInputs = [ + bacon openssl pkg-config (rust-bin.fromRustupToolchainFile ./rust-toolchain.toml) From 09db42cb8890098b01531a5920fcc9cf047c30c1 Mon Sep 17 00:00:00 2001 From: GitHub Actions Date: Sun, 21 Apr 2024 07:31:56 +0000 Subject: [PATCH 2/6] refactor: rustfmt [skip ci] --- crates/alerion_core/src/websocket.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/alerion_core/src/websocket.rs b/crates/alerion_core/src/websocket.rs index e69de29..8b13789 100644 --- a/crates/alerion_core/src/websocket.rs +++ b/crates/alerion_core/src/websocket.rs @@ -0,0 +1 @@ + From 51e14739ca937f4147c2cbaab7acebbb9e3f9af1 Mon Sep 17 00:00:00 2001 From: pupbrained Date: Sun, 21 Apr 2024 20:37:53 -0400 Subject: [PATCH 3/6] chore: remove (more) unneeded stuff --- crates/alerion_core/src/webserver.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index 33a3145..3e68554 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -2,8 +2,8 @@ use std::net::SocketAddr; use warp::Filter; -const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; -const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; +//const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; +//const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; //fn default_headers(config: &AlerionConfig) -> middleware::DefaultHeaders { //middleware::DefaultHeaders::new() From 1d487474583ea2df03d644e985244176164cb533 Mon Sep 17 00:00:00 2001 From: pupbrained Date: Mon, 22 Apr 2024 02:29:32 -0400 Subject: [PATCH 4/6] update: we're doing poem im just gonna squash these commits when im done so the commit messages arent terrible --- crates/alerion_core/Cargo.toml | 4 +- crates/alerion_core/src/webserver.rs | 196 ++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 5 deletions(-) diff --git a/crates/alerion_core/Cargo.toml b/crates/alerion_core/Cargo.toml index fb8ea98..c0bb853 100644 --- a/crates/alerion_core/Cargo.toml +++ b/crates/alerion_core/Cargo.toml @@ -24,4 +24,6 @@ smallvec = { version = "1.13.2", features = ["serde"] } directories = "5.0.1" bollard = "0.16.1" bitflags = "2.5.0" -warp = "0.3.7" +num_cpus = "1.16.0" +sysinfo = "0.30.11" +poem = { version = "3.0.0", features = ["websocket"] } diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index 3e68554..a95aec2 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -1,6 +1,15 @@ +use std::env::consts::{ARCH, OS}; use std::net::SocketAddr; -use warp::Filter; +use futures::{SinkExt, StreamExt}; +use poem::listener::TcpListener; +use poem::web::websocket::{Message, WebSocket}; +use poem::web::{Path, Query}; +use poem::{get, handler, Body, IntoResponse, Response, Route, Server}; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use sysinfo::System; +use uuid::Uuid; //const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; //const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; @@ -14,8 +23,187 @@ use warp::Filter; //.add((header::ACCESS_CONTROL_ALLOW_CREDENTIALS, "true")) //} +#[derive(Debug, Serialize, Deserialize)] +struct SystemQuery { + v: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SystemResponseV1 { + architecture: String, + cpu_count: usize, + kernel_version: String, + os: String, + version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SystemResponseV2 { + version: String, + docker: DockerInfo, + system: SystemInfo, +} + +#[derive(Debug, Serialize, Deserialize)] +struct DockerInfo { + version: String, + cgroups: CGroupsInfo, + containers: ContainersInfo, + storage: StorageInfo, + runc: RunCInfo, +} + +#[derive(Debug, Serialize, Deserialize)] +struct CGroupsInfo { + driver: String, + version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ContainersInfo { + total: u32, + running: u32, + paused: u32, + stopped: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +struct StorageInfo { + driver: String, + filesystem: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct RunCInfo { + version: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct SystemInfo { + architecture: String, + cpu_threads: usize, + memory_bytes: usize, + kernel_version: String, + os: String, + os_type: String, +} + +#[derive(Debug, Serialize, Deserialize)] +struct WebSocketEvent { + event: String, + args: Option>, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum EventType { + // Send + #[serde(rename = "auth")] + Auth, + #[serde(rename = "set state")] + SetState, + #[serde(rename = "send command")] + SendCommand, + #[serde(rename = "send logs")] + SendLogs, + #[serde(rename = "send stats")] + SendStats, + + // Recieve + #[serde(rename = "auth success")] + AuthSuccess, + #[serde(rename = "backup complete")] + BackupComplete, + #[serde(rename = "backup restore completed")] + BackupRestoreCompleted, + #[serde(rename = "console output")] + ConsoleOutput, + #[serde(rename = "daemon error")] + DaemonError, + #[serde(rename = "daemon message")] + DaemonMessage, + #[serde(rename = "install completed")] + InstallCompleted, + #[serde(rename = "install output")] + InstallOutput, + #[serde(rename = "install started")] + InstallStarted, + #[serde(rename = "jwt error")] + JwtError, + #[serde(rename = "stats")] + Stats, + #[serde(rename = "status")] + Status, + #[serde(rename = "token expired")] + TokenExpired, + #[serde(rename = "token expiring")] + TokenExpiring, + #[serde(rename = "transfer logs")] + TransferLogs, + #[serde(rename = "transfer status")] + TransferStatus, +} + +#[handler] +fn process_system_query(Query(params): Query) -> impl IntoResponse { + match params.v.as_deref() { + Some("2") => Response::builder() + .status(StatusCode::NOT_IMPLEMENTED) + .finish(), + Some(_) => Response::builder() + .status(StatusCode::BAD_REQUEST) + .body("Invalid version"), + None => { + let Some(kernel_version) = System::kernel_version() else { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("The system information could not be fetched."); + }; + + let Ok(response) = Body::from_json(SystemResponseV1 { + architecture: ARCH.to_owned(), + cpu_count: num_cpus::get(), + kernel_version, + os: OS.to_owned(), + version: env!("CARGO_PKG_VERSION").to_owned(), + }) else { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .finish(); + }; + + Response::builder().body(response) + } + } +} + +#[handler] +async fn initialize_websocket(Path(uuid): Path, ws: WebSocket) -> impl IntoResponse { + ws.on_upgrade(move |mut socket| async move { + loop { + if let Some(Ok(Message::Text(text))) = socket.next().await { + let _ = socket.send(Message::Text(format!("{uuid}"))).await; + let _ = socket + .send(Message::Text( + match serde_json::from_str::(text.as_str()) { + Ok(json) => format!("{json:?}"), + Err(e) => format!("error: {e}"), + }, + )) + .await; + } + } + }) +} + pub async fn serve(address: impl Into) { - warp::serve(warp::path!("test" / String).map(|name| format!("Hello {}!", name))) - .run(address) - .await + let api = Route::new().nest( + "/api", + Route::new() + .at("system", get(process_system_query)) + .at("servers/:uuid/ws", get(initialize_websocket)), + ); + + let _ = Server::new(TcpListener::bind(address.into())) + .run(api) + .await; } From 0e67354d27a5d96c776208f2f7319042c20c5751 Mon Sep 17 00:00:00 2001 From: Mars Date: Mon, 22 Apr 2024 20:13:48 -0400 Subject: [PATCH 5/6] some more websocket stuff --- crates/alerion_core/src/lib.rs | 2 +- crates/alerion_core/src/webserver.rs | 347 +++++++++++++++++++-------- flake.nix | 41 +++- 3 files changed, 285 insertions(+), 105 deletions(-) diff --git a/crates/alerion_core/src/lib.rs b/crates/alerion_core/src/lib.rs index fc356e0..0032578 100644 --- a/crates/alerion_core/src/lib.rs +++ b/crates/alerion_core/src/lib.rs @@ -28,7 +28,7 @@ pub async fn alerion_main() -> anyhow::Result<()> { // there is a low likelyhood this will actually block, and if it does // it will block only once for a short amount of time, so it's no big deal. - let webserver = webserver::serve((config.api.host, config.api.port)); + let webserver = webserver::serve(config); let webserver_handle = tokio::spawn(webserver); diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index a95aec2..6bc7927 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -1,16 +1,25 @@ +#![deny(dead_code)] +use std::collections::HashSet; use std::env::consts::{ARCH, OS}; use std::net::SocketAddr; +use bitflags::bitflags; use futures::{SinkExt, StreamExt}; +use jsonwebtoken::{Algorithm, DecodingKey, Validation}; use poem::listener::TcpListener; +use poem::middleware::Cors; use poem::web::websocket::{Message, WebSocket}; -use poem::web::{Path, Query}; -use poem::{get, handler, Body, IntoResponse, Response, Route, Server}; -use reqwest::StatusCode; +use poem::web::Path; +use poem::{ + get, handler, Body, Endpoint, EndpointExt, IntoResponse, Middleware, Request, Response, Route, Server +}; +use reqwest::{Method, StatusCode}; use serde::{Deserialize, Serialize}; use sysinfo::System; use uuid::Uuid; +use crate::config::AlerionConfig; + //const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; //const ALLOWED_METHODS: &str = "GET, POST, PATCH, PUT, DELETE, OPTIONS"; @@ -24,79 +33,157 @@ use uuid::Uuid; //} #[derive(Debug, Serialize, Deserialize)] -struct SystemQuery { - v: Option, +struct Claims { + iss: String, + aud: Vec, + jti: String, + iat: usize, + nbf: usize, + exp: usize, + server_uuid: Uuid, + permissions: Vec, + user_uuid: Uuid, + user_id: usize, + unique_id: String, } -#[derive(Debug, Serialize, Deserialize)] -struct SystemResponseV1 { - architecture: String, - cpu_count: usize, - kernel_version: String, - os: String, - version: String, +bitflags! { + #[derive(Debug, Clone, Copy)] + pub struct Permissions: u32 { + const CONNECT = 1; + const START = 1 << 1; + const STOP = 1 << 2; + const RESTART = 1 << 3; + const CONSOLE = 1 << 4; + const BACKUP_READ = 1 << 5; + const ADMIN_ERRORS = 1 << 6; + const ADMIN_INSTALL = 1 << 7; + const ADMIN_TRANSFER = 1 << 8; + } } -#[derive(Debug, Serialize, Deserialize)] -struct SystemResponseV2 { - version: String, - docker: DockerInfo, - system: SystemInfo, +impl Permissions { + pub fn from_strings(strings: &[impl AsRef]) -> Self { + let mut this = Permissions::empty(); + + for s in strings { + match s.as_ref() { + "*" => { + this.insert(Permissions::CONNECT); + this.insert(Permissions::START); + this.insert(Permissions::STOP); + this.insert(Permissions::RESTART); + this.insert(Permissions::CONSOLE); + this.insert(Permissions::BACKUP_READ); + } + "websocket.connect" => { + this.insert(Permissions::CONNECT); + } + "control.start" => { + this.insert(Permissions::START); + } + "control.stop" => { + this.insert(Permissions::STOP); + } + "control.restart" => { + this.insert(Permissions::RESTART); + } + "control.console" => { + this.insert(Permissions::CONSOLE); + } + "backup.read" => { + this.insert(Permissions::BACKUP_READ); + } + "admin.websocket.errors" => { + this.insert(Permissions::ADMIN_ERRORS); + } + "admin.websocket.install" => { + this.insert(Permissions::ADMIN_INSTALL); + } + "admin.websocket.transfer" => { + this.insert(Permissions::ADMIN_TRANSFER); + } + _ => {} + } + } + + this + } } -#[derive(Debug, Serialize, Deserialize)] -struct DockerInfo { - version: String, - cgroups: CGroupsInfo, - containers: ContainersInfo, - storage: StorageInfo, - runc: RunCInfo, +pub struct Auth { + validation: Validation, + key: DecodingKey, +} + +impl Auth { + pub fn from_config(cfg: &AlerionConfig) -> Self { + let mut validation = Validation::new(Algorithm::HS256); + + let spec_claims = ["exp", "nbf", "aud", "iss"].map(ToOwned::to_owned); + + validation.required_spec_claims = HashSet::from(spec_claims); + validation.leeway = 10; + validation.reject_tokens_expiring_in_less_than = 0; + validation.validate_exp = false; + validation.validate_nbf = false; + validation.validate_aud = false; + validation.aud = None; + validation.iss = Some(HashSet::from([cfg.remote.clone()])); + validation.sub = None; + + let key = DecodingKey::from_secret(cfg.auth.token.as_ref()); + + Self { validation, key } + } + + pub fn validate(&self, auth: &str, server_uuid: &Uuid) -> Option { + jsonwebtoken::decode::(auth, &self.key, &self.validation) + .ok() + .filter(|result| &result.claims.server_uuid == server_uuid) + .map(|result| Permissions::from_strings(&result.claims.permissions)) + } } #[derive(Debug, Serialize, Deserialize)] -struct CGroupsInfo { - driver: String, - version: String, +struct SystemQuery { + v: Option, } #[derive(Debug, Serialize, Deserialize)] -struct ContainersInfo { - total: u32, - running: u32, - paused: u32, - stopped: u32, +struct SystemResponseV1 { + architecture: String, + cpu_count: usize, + kernel_version: String, + os: String, + version: String, } #[derive(Debug, Serialize, Deserialize)] -struct StorageInfo { - driver: String, - filesystem: String, +struct WebsocketEvent { + event: ServerEventType, + args: Option>, } #[derive(Debug, Serialize, Deserialize)] -struct RunCInfo { - version: String, +struct ClientWebSocketEvent { + event: ClientEventType, + args: Option>, } #[derive(Debug, Serialize, Deserialize)] -struct SystemInfo { - architecture: String, - cpu_threads: usize, - memory_bytes: usize, - kernel_version: String, - os: String, - os_type: String, +struct AuthDetails { + data: AuthDetailsInner, } #[derive(Debug, Serialize, Deserialize)] -struct WebSocketEvent { - event: String, - args: Option>, +struct AuthDetailsInner { + token: String, + socket: String, } #[derive(Debug, Serialize, Deserialize)] -pub enum EventType { - // Send +pub enum ClientEventType { #[serde(rename = "auth")] Auth, #[serde(rename = "set state")] @@ -107,8 +194,10 @@ pub enum EventType { SendLogs, #[serde(rename = "send stats")] SendStats, +} - // Recieve +#[derive(Debug, Serialize, Deserialize)] +pub enum ServerEventType { #[serde(rename = "auth success")] AuthSuccess, #[serde(rename = "backup complete")] @@ -144,66 +233,132 @@ pub enum EventType { } #[handler] -fn process_system_query(Query(params): Query) -> impl IntoResponse { - match params.v.as_deref() { - Some("2") => Response::builder() - .status(StatusCode::NOT_IMPLEMENTED) - .finish(), - Some(_) => Response::builder() - .status(StatusCode::BAD_REQUEST) - .body("Invalid version"), - None => { - let Some(kernel_version) = System::kernel_version() else { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("The system information could not be fetched."); - }; +async fn process_system_query() -> impl IntoResponse { + let Some(kernel_version) = System::kernel_version() else { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body("The system information could not be fetched."); + }; + + let Ok(response) = Body::from_json(SystemResponseV1 { + architecture: ARCH.to_owned(), + cpu_count: num_cpus::get(), + kernel_version, + os: OS.to_owned(), + version: env!("CARGO_PKG_VERSION").to_owned(), + }) else { + return Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .finish(); + }; - let Ok(response) = Body::from_json(SystemResponseV1 { - architecture: ARCH.to_owned(), - cpu_count: num_cpus::get(), - kernel_version, - os: OS.to_owned(), - version: env!("CARGO_PKG_VERSION").to_owned(), - }) else { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .finish(); + Response::builder().body(response) +} + +#[handler] +async fn initialize_websocket(Path(_uuid): Path, ws: WebSocket) -> impl IntoResponse { + ws.on_upgrade(move |mut socket| async move { + while let Some(Ok(Message::Text(text))) = socket.next().await { + let data = serde_json::from_str::(text.as_str()); + + let response = match data { + Ok(json) => Message::Text(match json.event { + ClientEventType::Auth => "auth success".to_owned(), + _ => "not implemented".to_owned(), + }), + Err(e) => Message::Text(format!("error: {e}")), }; - Response::builder().body(response) + let _ = socket.send(response).await; } - } + }) } #[handler] -async fn initialize_websocket(Path(uuid): Path, ws: WebSocket) -> impl IntoResponse { - ws.on_upgrade(move |mut socket| async move { - loop { - if let Some(Ok(Message::Text(text))) = socket.next().await { - let _ = socket.send(Message::Text(format!("{uuid}"))).await; - let _ = socket - .send(Message::Text( - match serde_json::from_str::(text.as_str()) { - Ok(json) => format!("{json:?}"), - Err(e) => format!("error: {e}"), - }, - )) - .await; +async fn return_auth_details(Path(_identifier): Path) -> impl IntoResponse { + todo!() +} + +struct TokenMiddleware { + token: String, +} + +impl Middleware for TokenMiddleware { + type Output = TokenMiddlewareImpl; + + fn transform(&self, ep: E) -> Self::Output { + TokenMiddlewareImpl { + ep, + token: self.token.clone(), + } + } +} + +/// The new endpoint type generated by the TokenMiddleware. +struct TokenMiddlewareImpl { + ep: E, + token: String, +} + +/// Token data +impl Endpoint for TokenMiddlewareImpl { + type Output = E::Output; + + async fn call(&self, req: Request) -> poem::Result { + if req.method() == Method::OPTIONS { + return self.ep.call(req).await; + } + + if let Some(value) = req + .headers() + .get("Authorization") + .and_then(|value| value.to_str().ok()) + { + let token = value.to_string(); + + if token == format!("Bearer {}", self.token) { + self.ep.call(req).await + } else { + Err(poem::Error::from_string( + "Token does not match", + StatusCode::UNAUTHORIZED, + )) } + } else { + Err(poem::Error::from_string( + "No token provided", + StatusCode::UNAUTHORIZED, + )) } - }) + } } -pub async fn serve(address: impl Into) { +#[handler] +async fn system_query_options() -> impl IntoResponse { + Response::builder().status(StatusCode::NO_CONTENT).finish() +} + +pub async fn serve(config: AlerionConfig) { + let system_endpoint = get(process_system_query) + .options(system_query_options) + .with( + Cors::new() + .allow_methods(["GET", "OPTIONS"]) + .allow_credentials(true), + ) + .with(TokenMiddleware { + token: config.auth.token, + }); + let api = Route::new().nest( - "/api", - Route::new() - .at("system", get(process_system_query)) - .at("servers/:uuid/ws", get(initialize_websocket)), + "api", + Route::new().at("system", system_endpoint).nest( + "servers", + Route::new().at(":uuid/ws", get(initialize_websocket)), + ), ); - let _ = Server::new(TcpListener::bind(address.into())) + let _ = Server::new(TcpListener::bind((config.api.host, config.api.port))) .run(api) .await; } diff --git a/flake.nix b/flake.nix index 6212609..442956b 100644 --- a/flake.nix +++ b/flake.nix @@ -20,16 +20,41 @@ pkgs = import nixpkgs { inherit system overlays; }; + darwinPkgs = nixpkgs.lib.optionals pkgs.stdenv.isDarwin (with pkgs.darwin; [ + apple_sdk.frameworks.AppKit + apple_sdk.frameworks.Carbon + apple_sdk.frameworks.Cocoa + apple_sdk.frameworks.CoreFoundation + apple_sdk.frameworks.IOKit + apple_sdk.frameworks.WebKit + apple_sdk.frameworks.Security + apple_sdk.frameworks.DisplayServices + ]); in with pkgs; { - devShells.default = mkShell { - buildInputs = [ - bacon - openssl - pkg-config - (rust-bin.fromRustupToolchainFile ./rust-toolchain.toml) - (rust-bin.nightly."2024-04-19".rustfmt) - ]; + devShells = { + default = mkShell { + buildInputs = + [ + bacon + openssl + pkg-config + (rust-bin.fromRustupToolchainFile ./rust-toolchain.toml) + (rust-bin.nightly."2024-04-19".rustfmt) + ] + ++ darwinPkgs; + }; + + nightly = mkShell { + buildInputs = + [ + bacon + openssl + pkg-config + (rust-bin.nightly."2024-04-19".default) + ] + ++ darwinPkgs; + }; }; } ); From 1a573d42bbbf31935c8272187a7e46d47f287b67 Mon Sep 17 00:00:00 2001 From: pupbrained Date: Mon, 22 Apr 2024 22:14:10 -0400 Subject: [PATCH 6/6] feat: implement more endpoints --- crates/alerion_core/src/webserver.rs | 103 +++--------------- .../alerion_core/src/webserver/middleware.rs | 1 + .../src/webserver/middleware/bearer_auth.rs | 64 +++++++++++ 3 files changed, 78 insertions(+), 90 deletions(-) create mode 100644 crates/alerion_core/src/webserver/middleware.rs create mode 100644 crates/alerion_core/src/webserver/middleware/bearer_auth.rs diff --git a/crates/alerion_core/src/webserver.rs b/crates/alerion_core/src/webserver.rs index 6bc7927..f70292e 100644 --- a/crates/alerion_core/src/webserver.rs +++ b/crates/alerion_core/src/webserver.rs @@ -1,7 +1,6 @@ #![deny(dead_code)] use std::collections::HashSet; use std::env::consts::{ARCH, OS}; -use std::net::SocketAddr; use bitflags::bitflags; use futures::{SinkExt, StreamExt}; @@ -9,15 +8,14 @@ use jsonwebtoken::{Algorithm, DecodingKey, Validation}; use poem::listener::TcpListener; use poem::middleware::Cors; use poem::web::websocket::{Message, WebSocket}; -use poem::web::Path; -use poem::{ - get, handler, Body, Endpoint, EndpointExt, IntoResponse, Middleware, Request, Response, Route, Server -}; -use reqwest::{Method, StatusCode}; +use poem::web::{Json, Path}; +use poem::{endpoint, get, handler, EndpointExt, IntoResponse, Route, Server}; +use reqwest::StatusCode; use serde::{Deserialize, Serialize}; use sysinfo::System; use uuid::Uuid; +use self::middleware::bearer_auth::BearerAuthMiddleware; use crate::config::AlerionConfig; //const ALLOWED_HEADERS: &str = "Accept, Accept-Encoding, Authorization, Cache-Control, Content-Type, Content-Length, Origin, X-Real-IP, X-CSRF-Token"; @@ -145,11 +143,6 @@ impl Auth { } } -#[derive(Debug, Serialize, Deserialize)] -struct SystemQuery { - v: Option, -} - #[derive(Debug, Serialize, Deserialize)] struct SystemResponseV1 { architecture: String, @@ -235,24 +228,17 @@ pub enum ServerEventType { #[handler] async fn process_system_query() -> impl IntoResponse { let Some(kernel_version) = System::kernel_version() else { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .body("The system information could not be fetched."); + return StatusCode::INTERNAL_SERVER_ERROR.into_response(); }; - let Ok(response) = Body::from_json(SystemResponseV1 { + Json(SystemResponseV1 { architecture: ARCH.to_owned(), cpu_count: num_cpus::get(), kernel_version, os: OS.to_owned(), version: env!("CARGO_PKG_VERSION").to_owned(), - }) else { - return Response::builder() - .status(StatusCode::INTERNAL_SERVER_ERROR) - .finish(); - }; - - Response::builder().body(response) + }) + .into_response() } #[handler] @@ -279,76 +265,11 @@ async fn return_auth_details(Path(_identifier): Path) -> impl IntoRespon todo!() } -struct TokenMiddleware { - token: String, -} - -impl Middleware for TokenMiddleware { - type Output = TokenMiddlewareImpl; - - fn transform(&self, ep: E) -> Self::Output { - TokenMiddlewareImpl { - ep, - token: self.token.clone(), - } - } -} - -/// The new endpoint type generated by the TokenMiddleware. -struct TokenMiddlewareImpl { - ep: E, - token: String, -} - -/// Token data -impl Endpoint for TokenMiddlewareImpl { - type Output = E::Output; - - async fn call(&self, req: Request) -> poem::Result { - if req.method() == Method::OPTIONS { - return self.ep.call(req).await; - } - - if let Some(value) = req - .headers() - .get("Authorization") - .and_then(|value| value.to_str().ok()) - { - let token = value.to_string(); - - if token == format!("Bearer {}", self.token) { - self.ep.call(req).await - } else { - Err(poem::Error::from_string( - "Token does not match", - StatusCode::UNAUTHORIZED, - )) - } - } else { - Err(poem::Error::from_string( - "No token provided", - StatusCode::UNAUTHORIZED, - )) - } - } -} - -#[handler] -async fn system_query_options() -> impl IntoResponse { - Response::builder().status(StatusCode::NO_CONTENT).finish() -} - pub async fn serve(config: AlerionConfig) { let system_endpoint = get(process_system_query) - .options(system_query_options) - .with( - Cors::new() - .allow_methods(["GET", "OPTIONS"]) - .allow_credentials(true), - ) - .with(TokenMiddleware { - token: config.auth.token, - }); + .options(endpoint::make_sync(|_| StatusCode::NO_CONTENT)) + .with(Cors::new().allow_credentials(true)) + .with(BearerAuthMiddleware::new(config.auth.token)); let api = Route::new().nest( "api", @@ -362,3 +283,5 @@ pub async fn serve(config: AlerionConfig) { .run(api) .await; } + +pub mod middleware; diff --git a/crates/alerion_core/src/webserver/middleware.rs b/crates/alerion_core/src/webserver/middleware.rs new file mode 100644 index 0000000..2720c5e --- /dev/null +++ b/crates/alerion_core/src/webserver/middleware.rs @@ -0,0 +1 @@ +pub mod bearer_auth; diff --git a/crates/alerion_core/src/webserver/middleware/bearer_auth.rs b/crates/alerion_core/src/webserver/middleware/bearer_auth.rs new file mode 100644 index 0000000..44e4a28 --- /dev/null +++ b/crates/alerion_core/src/webserver/middleware/bearer_auth.rs @@ -0,0 +1,64 @@ +use poem::{Endpoint, Middleware, Request}; +use reqwest::{Method, StatusCode}; + +pub struct BearerAuthMiddleware { + token: String, +} + +impl BearerAuthMiddleware { + pub fn new(token: String) -> Self { + Self { token } + } +} + +impl Middleware for BearerAuthMiddleware { + type Output = BearerAuthMiddlewareImpl; + + fn transform(&self, ep: E) -> Self::Output { + BearerAuthMiddlewareImpl { + ep, + token: self.token.clone(), + } + } +} + +/// The new endpoint type generated by the TokenMiddleware. +pub struct BearerAuthMiddlewareImpl { + ep: E, + token: String, +} + +/// Token data +impl Endpoint for BearerAuthMiddlewareImpl { + type Output = E::Output; + + async fn call(&self, req: Request) -> poem::Result { + println!("{req:#?}"); + + if req.method() == Method::OPTIONS { + return self.ep.call(req).await; + } + + if let Some(value) = req + .headers() + .get("Authorization") + .and_then(|value| value.to_str().ok()) + { + let token = value.to_string(); + + if token == format!("Bearer {}", self.token) { + self.ep.call(req).await + } else { + Err(poem::Error::from_string( + "Token does not match", + StatusCode::UNAUTHORIZED, + )) + } + } else { + Err(poem::Error::from_string( + "No token provided", + StatusCode::UNAUTHORIZED, + )) + } + } +}