diff --git a/Cargo.lock b/Cargo.lock index 30435be..2aa36a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1280,6 +1280,18 @@ dependencies = [ "thiserror", ] +[[package]] +name = "prometheus-static-metric" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8f30cdb09c39930b8fa5e0f23cbb895ab3f766b187403a0ba0956fc1ef4f0e5" +dependencies = [ + "lazy_static", + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "protobuf" version = "2.28.0" @@ -1434,6 +1446,7 @@ dependencies = [ "pgwire", "pretty_env_logger", "prometheus", + "prometheus-static-metric", "rumqttc", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index a891267..260b5f0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,7 @@ prometheus = "0.13" sysinfo = "0.26" toml = "0.5" once_cell = "1.8" +prometheus-static-metric = "0.5.1" [dev-dependencies] sqllogictest = "0.13.0" diff --git a/src/core/tuple.rs b/src/core/tuple.rs index 4c49631..5d6825d 100644 --- a/src/core/tuple.rs +++ b/src/core/tuple.rs @@ -1,4 +1,3 @@ -use log::info; use std::{collections::HashMap, fmt::Display}; use crate::sql::planner::binder::ProjItem; @@ -18,8 +17,8 @@ impl Tuple { } pub fn new_default() -> Self { - let mut values = Vec::new(); - let mut columns = Vec::new(); + let values = Vec::new(); + let columns = Vec::new(); Tuple::new(columns, values) } diff --git a/src/main.rs b/src/main.rs index c5855e2..92fefb4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +#![feature(let_chains)] + #[macro_use] extern crate lazy_static; @@ -5,19 +7,15 @@ use std::{collections::HashMap, mem, sync::Arc, time::Duration}; use axum::{ extract::{self}, - response::IntoResponse, routing::{delete, get, post}, Json, Router, }; use log::{info, LevelFilter}; -use prometheus::{Encoder, IntGauge, Registry, TextEncoder}; +use prometheus::{Encoder, Registry, TextEncoder}; use rumqttc::QoS; use serde::{Deserialize, Serialize}; use sysinfo::{CpuExt, System, SystemExt}; -use tokio::{ - sync::{mpsc, Mutex}, - time, -}; +use tokio::{sync::Mutex, time}; use catalog::Catalog; use core::Tuple; @@ -31,6 +29,7 @@ mod catalog; mod config; mod connector; mod core; +mod metrics; mod sql; mod storage; mod util; @@ -42,14 +41,11 @@ struct AppState { views: HashMap, next_id: usize, dummy_subscribers: HashMap>, - registry: Registry, } impl AppState { - pub fn new(registry: Registry) -> Self { - let mut app_state = Self::default(); - app_state.registry = registry; - return app_state; + pub fn new() -> Self { + Self::default() } } @@ -92,11 +88,10 @@ async fn execute_sql( } } }); - let view_manager = state.clone(); tokio::spawn(async move { while let Ok(Ok(result)) = receiver.recv().await { - if !result.is_none() { - let data = result.unwrap().parse_into_json().unwrap(); + if let Some(result) = result { + let data = result.parse_into_json().unwrap(); sender .client .publish("/yisa/data2", QoS::AtLeastOnce, false, data) @@ -145,11 +140,10 @@ async fn ping() -> &'static str { "pong" } -async fn metrics_handler(extract::State(state): extract::State>>) -> String { - let mut state = state.lock().await; +async fn metrics_handler() -> String { let mut buffer = Vec::new(); let encoder = TextEncoder::new(); - let metric_families = state.registry.gather(); + let metric_families = prometheus::gather(); encoder.encode(&metric_families, &mut buffer).unwrap(); String::from_utf8(buffer).unwrap() } @@ -164,13 +158,6 @@ pub async fn main() { // initialize Prometheus registry let registry = Registry::new(); - // cpu and memory gauge - let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap(); - let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap(); - - // register gauge - registry.register(Box::new(cpu_gauge.clone())).unwrap(); - registry.register(Box::new(memory_gauge.clone())).unwrap(); let mut sys = System::new(); tokio::spawn(async move { let mut interval = time::interval(Duration::from_secs(1)); @@ -179,12 +166,12 @@ pub async fn main() { sys.refresh_all(); let cpu_usage = sys.global_cpu_info().cpu_usage() as i64; let memory_usage = sys.used_memory() as i64; - cpu_gauge.set(cpu_usage); - memory_gauge.set(memory_usage); + metrics::CPU.set(cpu_usage); + metrics::MEMORY.set(memory_usage); } }); - let app_state = AppState::new(registry); + let app_state = AppState::new(); // Initialize database let catalog = Catalog::new(); diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..da574ce --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,7 @@ +use prometheus::IntGauge; + +lazy_static! { + pub static ref CPU: IntGauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap(); + pub static ref MEMORY: IntGauge = + IntGauge::new("memory_usage", "Memory usage in bytes").unwrap(); +} diff --git a/src/sql/expression/aggregate.rs b/src/sql/expression/aggregate.rs index 0218f62..b7b3f66 100644 --- a/src/sql/expression/aggregate.rs +++ b/src/sql/expression/aggregate.rs @@ -90,7 +90,7 @@ impl AggregateFunctionRegistry { }); self.functions .entry(name.to_string()) - .or_insert_with(Vec::new) + .or_default() .push(func); } diff --git a/src/sql/expression/function.rs b/src/sql/expression/function.rs index 32bb3e4..b895eaa 100644 --- a/src/sql/expression/function.rs +++ b/src/sql/expression/function.rs @@ -52,7 +52,7 @@ impl ScalarFunctionRegistry { self.functions .entry(name.to_string()) - .or_insert_with(Vec::new) + .or_default() .push(Arc::new(scalar_func)); } @@ -82,7 +82,7 @@ impl ScalarFunctionRegistry { self.functions .entry(name.to_string()) - .or_insert_with(Vec::new) + .or_default() .push(Arc::new(scalar_func)); } diff --git a/src/sql/expression/mod.rs b/src/sql/expression/mod.rs index 01d101f..ecec920 100644 --- a/src/sql/expression/mod.rs +++ b/src/sql/expression/mod.rs @@ -27,7 +27,7 @@ impl Expression { pub fn eval(&self, tuple: &Tuple) -> Result { match self { Expression::Column(column_name, _) => { - Ok(tuple.get_by_name(&column_name).ok_or_else(|| { + Ok(tuple.get_by_name(column_name).ok_or_else(|| { SQLError::new( ErrorKind::RuntimeError, format!("cannot find column at name: {column_name}"), diff --git a/src/sql/planner/binder.rs b/src/sql/planner/binder.rs index 09e9d51..3777214 100644 --- a/src/sql/planner/binder.rs +++ b/src/sql/planner/binder.rs @@ -2,11 +2,10 @@ use sqlparser::ast::{ Expr, Ident, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr, Statement, TableAlias, TableFactor, TableWithJoins, Visit, }; -use std::any::Any; use crate::{ catalog::defs::{ColumnDefinition, TableDefinition}, - core::{ErrorKind, SQLError, Tuple, Type}, + core::{ErrorKind, SQLError, Type}, sql::{ planner::{scalar::bind_scalar, scope::Scope}, runtime::DDLJob, @@ -23,7 +22,6 @@ use super::{ }; use crate::core::Datum; -use log::info; struct FlattenedSelectItem { expr: Expr, @@ -208,7 +206,7 @@ impl<'a> Binder<'a> { if alias_name.eq(&String::from("?column?")) { return column_name; } - return alias_name; + alias_name } pub fn bind_select_statement( @@ -250,40 +248,24 @@ impl<'a> Binder<'a> { let mut group_by = select_stmt.group_by.clone(); if group_by.len() == 1 { - let group_by_expr = group_by.get(0).unwrap(); + let group_by_expr = group_by.first().unwrap(); let se = bind_scalar(ctx, &from_scope, group_by_expr)?; - match se { - ScalarExpr::FunctionCall(name, args) => { - if name.eq(&String::from("tumblingWindow")) { - if args.len() == 2 { - match args[0].clone() { - ScalarExpr::Literal(datum) => match datum { - Datum::String(s) => { - if s.eq(&String::from("ss")) { - match args[1].clone() { - ScalarExpr::Literal(datum) => match datum { - Datum::Int(v) => { - plan = Plan::Window { - window_type: WindowType::TumblingWindow, - length: v, - input: Box::new(plan), - }; - group_by.remove(0); - } - _ => {} - }, - _ => {} - } - } - } - _ => {} - }, - _ => {} - } - } + if let ScalarExpr::FunctionCall(name, args) = se { + if name == "tumblingWindow" && args.len() == 2 { + if let ( + ScalarExpr::Literal(Datum::String(s)), + ScalarExpr::Literal(Datum::Int(v)), + ) = (&args[0], &args[1]) + && s == "ss" + { + plan = Plan::Window { + window_type: WindowType::TumblingWindow, + length: *v, + input: Box::new(plan), + }; + group_by.remove(0); } } - _ => {} } } @@ -444,7 +426,7 @@ impl<'a> Binder<'a> { // Project the result let plan = Plan::Project { input: Box::new(plan), - projections: output_projections.iter().map(|item| item.clone()).collect(), + projections: output_projections.to_vec(), }; let output_scope = Scope { diff --git a/src/sql/planner/scalar.rs b/src/sql/planner/scalar.rs index d34ffa0..8427a5b 100644 --- a/src/sql/planner/scalar.rs +++ b/src/sql/planner/scalar.rs @@ -90,7 +90,7 @@ pub fn bind_aggregate_function( )); } - if let Some(arg) = func.args.get(0) { + if let Some(arg) = func.args.first() { match arg { ast::FunctionArg::Unnamed(arg) => match arg { // Rewrite count(*) to count() diff --git a/src/sql/runtime/builder.rs b/src/sql/runtime/builder.rs index 4373dec..544280d 100644 --- a/src/sql/runtime/builder.rs +++ b/src/sql/runtime/builder.rs @@ -122,7 +122,7 @@ impl<'a> ExecutorBuilder<'a> { ExecuteTreeNode::from(WindowExecutor::new( Box::new(input_executor.unwrap_execute_tree_node()), (*window_type).clone(), - (*length).clone(), + *length, )) .into(), schema, @@ -242,8 +242,7 @@ impl<'a> ExecutorBuilder<'a> { schema, )) } - - Plan::Explain(display_str) => { + Plan::Explain(_) => { let values_exec = ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into(); Ok((values_exec, Schema::default())) diff --git a/src/sql/runtime/executor.rs b/src/sql/runtime/executor.rs index b4fc0b7..8d462f9 100644 --- a/src/sql/runtime/executor.rs +++ b/src/sql/runtime/executor.rs @@ -4,7 +4,6 @@ use std::{ }; use derive_more::From; -use futures::io::Window; use log::info; use rumqttc::{Event, Packet, QoS}; use serde_json::Value; @@ -15,12 +14,9 @@ use tokio::{ use crate::{ connector::MqttClient, - core::{tuple::Tuple, Datum, ErrorKind, SQLError, Type}, + core::{tuple::Tuple, Datum, SQLError}, sql::{ - expression::{ - aggregate::{AggregateFunction, AggregateState}, - Expression, - }, + expression::{aggregate::AggregateFunction, Expression}, planner::{binder::ProjItem, WindowType}, session::context::QueryContext, }, @@ -64,7 +60,7 @@ impl Executor { if let Executor::BuildExecuteTree(t) = self { t } else { - panic!("!!!") + panic!("Is not an `ExecuteTreeNode`") } } } @@ -112,7 +108,7 @@ impl DDLExecutor { ctx.storage_mgr.drop_relation(schema_name, table_name); } } - DDLJob::ShowTables(schema_name) => { + DDLJob::ShowTables(_) => { // I refuse to implement this as an DDL } } @@ -320,16 +316,11 @@ impl ScanExecutor { } } pub fn start(&self, ctx: &mut QueryContext) -> Result { - let (stop_tx, mut stop_rx) = broadcast::channel(1); + let (stop_tx, _) = broadcast::channel(1); let (result_tx, result_rx) = broadcast::channel(512); let id = String::from("source"); let mut mqtt_client = MqttClient::new(&id); let topic = String::from("/yisa/data"); - let def = ctx - .catalog - .find_table_by_name(&*self.schema_name, &*self.table_name) - .unwrap() - .unwrap(); tokio::spawn(async move { info!("ScanExecutor listening"); mqtt_client @@ -338,23 +329,16 @@ impl ScanExecutor { .await .unwrap(); loop { - // let event = mqtt_client.event_loop.poll().await.unwrap(); while let Ok(notification) = mqtt_client.event_loop.poll().await { - match notification { - Event::Incoming(Packet::Publish(publish)) => { - // let topic = publish.topic.clone(); - let message = String::from_utf8_lossy(&publish.payload); - let parsed: HashMap = - serde_json::from_str(message.as_ref()).unwrap(); - let tuple = Tuple::from_hashmap(parsed); - // println!("scan recv {tuple}"); - result_tx.send(Ok(Some(tuple))).unwrap(); - } - _ => {} + if let Event::Incoming(Packet::Publish(publish)) = notification { + let message = String::from_utf8_lossy(&publish.payload); + let parsed: HashMap = + serde_json::from_str(message.as_ref()).unwrap(); + let tuple = Tuple::from_hashmap(parsed); + result_tx.send(Ok(Some(tuple))).unwrap(); } } } - info!("ScanExecutor no longer listening"); }); Ok(View { result_receiver: result_rx, diff --git a/src/sql/runtime/mod.rs b/src/sql/runtime/mod.rs index cf65a0e..d6568aa 100644 --- a/src/sql/runtime/mod.rs +++ b/src/sql/runtime/mod.rs @@ -4,7 +4,6 @@ pub mod executor; pub use ddl::*; use executor::View; -use log::info; use self::builder::ExecutorBuilder; use super::{planner::Plan, session::context::QueryContext}; diff --git a/src/sql/session/mod.rs b/src/sql/session/mod.rs index 23ffdd2..9df47dd 100644 --- a/src/sql/session/mod.rs +++ b/src/sql/session/mod.rs @@ -1,8 +1,6 @@ pub mod context; use log::info; -use pgwire::api::results::FieldInfo; -use sqlparser::ast::Statement; use self::context::QueryContext; use super::{ @@ -10,7 +8,7 @@ use super::{ planner::binder::Binder, runtime::{execute_plan, executor::View}, }; -use crate::core::{SQLError, Tuple}; +use crate::core::SQLError; pub struct Session { ctx: QueryContext,