Skip to content

Commit

Permalink
Revert "Merge pull request #8 from longfangsong/master"
Browse files Browse the repository at this point in the history
This reverts commit a218776, reversing
changes made to 708cc96.
  • Loading branch information
Yisaer committed Aug 22, 2024
1 parent a218776 commit a396948
Show file tree
Hide file tree
Showing 14 changed files with 105 additions and 74 deletions.
13 changes: 0 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ prometheus = "0.13"
sysinfo = "0.26"
toml = "0.5"
once_cell = "1.8"
prometheus-static-metric = "0.5.1"

[dev-dependencies]
sqllogictest = "0.13.0"
5 changes: 3 additions & 2 deletions src/core/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use log::info;
use std::{collections::HashMap, fmt::Display};

use crate::sql::planner::binder::ProjItem;
Expand All @@ -17,8 +18,8 @@ impl Tuple {
}

pub fn new_default() -> Self {
let values = Vec::new();
let columns = Vec::new();
let mut values = Vec::new();
let mut columns = Vec::new();
Tuple::new(columns, values)
}

Expand Down
41 changes: 27 additions & 14 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
#![feature(let_chains)]

#[macro_use]
extern crate lazy_static;

use std::{collections::HashMap, mem, sync::Arc, time::Duration};

use axum::{
extract::{self},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use log::{info, LevelFilter};
use prometheus::{Encoder, Registry, TextEncoder};
use prometheus::{Encoder, IntGauge, Registry, TextEncoder};
use rumqttc::QoS;
use serde::{Deserialize, Serialize};
use sysinfo::{CpuExt, System, SystemExt};
use tokio::{sync::Mutex, time};
use tokio::{
sync::{mpsc, Mutex},
time,
};

use catalog::Catalog;
use core::Tuple;
Expand All @@ -29,7 +31,6 @@ mod catalog;
mod config;
mod connector;
mod core;
mod metrics;
mod sql;
mod storage;
mod util;
Expand All @@ -41,11 +42,14 @@ struct AppState {
views: HashMap<usize, View>,
next_id: usize,
dummy_subscribers: HashMap<usize, Vec<Tuple>>,
registry: Registry,
}

impl AppState {
pub fn new() -> Self {
Self::default()
pub fn new(registry: Registry) -> Self {
let mut app_state = Self::default();
app_state.registry = registry;
return app_state;
}
}

Expand Down Expand Up @@ -88,10 +92,11 @@ async fn execute_sql(
}
}
});
let view_manager = state.clone();
tokio::spawn(async move {
while let Ok(Ok(result)) = receiver.recv().await {
if let Some(result) = result {
let data = result.parse_into_json().unwrap();
if !result.is_none() {
let data = result.unwrap().parse_into_json().unwrap();
sender
.client
.publish("/yisa/data2", QoS::AtLeastOnce, false, data)
Expand Down Expand Up @@ -140,10 +145,11 @@ async fn ping() -> &'static str {
"pong"
}

async fn metrics_handler() -> String {
async fn metrics_handler(extract::State(state): extract::State<Arc<Mutex<AppState>>>) -> String {
let mut state = state.lock().await;
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = prometheus::gather();
let metric_families = state.registry.gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
Expand All @@ -158,6 +164,13 @@ pub async fn main() {
// initialize Prometheus registry
let registry = Registry::new();

// cpu and memory gauge
let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();

// register gauge
registry.register(Box::new(cpu_gauge.clone())).unwrap();
registry.register(Box::new(memory_gauge.clone())).unwrap();
let mut sys = System::new();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
Expand All @@ -166,12 +179,12 @@ pub async fn main() {
sys.refresh_all();
let cpu_usage = sys.global_cpu_info().cpu_usage() as i64;
let memory_usage = sys.used_memory() as i64;
metrics::CPU.set(cpu_usage);
metrics::MEMORY.set(memory_usage);
cpu_gauge.set(cpu_usage);
memory_gauge.set(memory_usage);
}
});

let app_state = AppState::new();
let app_state = AppState::new(registry);

// Initialize database
let catalog = Catalog::new();
Expand Down
7 changes: 0 additions & 7 deletions src/metrics.rs

This file was deleted.

2 changes: 1 addition & 1 deletion src/sql/expression/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl AggregateFunctionRegistry {
});
self.functions
.entry(name.to_string())
.or_default()
.or_insert_with(Vec::new)
.push(func);
}

Expand Down
4 changes: 2 additions & 2 deletions src/sql/expression/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ScalarFunctionRegistry {

self.functions
.entry(name.to_string())
.or_default()
.or_insert_with(Vec::new)
.push(Arc::new(scalar_func));
}

Expand Down Expand Up @@ -82,7 +82,7 @@ impl ScalarFunctionRegistry {

self.functions
.entry(name.to_string())
.or_default()
.or_insert_with(Vec::new)
.push(Arc::new(scalar_func));
}

Expand Down
2 changes: 1 addition & 1 deletion src/sql/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Expression {
pub fn eval(&self, tuple: &Tuple) -> Result<Datum, SQLError> {
match self {
Expression::Column(column_name, _) => {
Ok(tuple.get_by_name(column_name).ok_or_else(|| {
Ok(tuple.get_by_name(&column_name).ok_or_else(|| {
SQLError::new(
ErrorKind::RuntimeError,
format!("cannot find column at name: {column_name}"),
Expand Down
54 changes: 36 additions & 18 deletions src/sql/planner/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ use sqlparser::ast::{
Expr, Ident, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr,
Statement, TableAlias, TableFactor, TableWithJoins, Visit,
};
use std::any::Any;

use crate::{
catalog::defs::{ColumnDefinition, TableDefinition},
core::{ErrorKind, SQLError, Type},
core::{ErrorKind, SQLError, Tuple, Type},
sql::{
planner::{scalar::bind_scalar, scope::Scope},
runtime::DDLJob,
Expand All @@ -22,6 +23,7 @@ use super::{
};

use crate::core::Datum;
use log::info;

struct FlattenedSelectItem {
expr: Expr,
Expand Down Expand Up @@ -206,7 +208,7 @@ impl<'a> Binder<'a> {
if alias_name.eq(&String::from("?column?")) {
return column_name;
}
alias_name
return alias_name;
}

pub fn bind_select_statement(
Expand Down Expand Up @@ -248,24 +250,40 @@ impl<'a> Binder<'a> {
let mut group_by = select_stmt.group_by.clone();

if group_by.len() == 1 {
let group_by_expr = group_by.first().unwrap();
let group_by_expr = group_by.get(0).unwrap();
let se = bind_scalar(ctx, &from_scope, group_by_expr)?;
if let ScalarExpr::FunctionCall(name, args) = se {
if name == "tumblingWindow" && args.len() == 2 {
if let (
ScalarExpr::Literal(Datum::String(s)),
ScalarExpr::Literal(Datum::Int(v)),
) = (&args[0], &args[1])
&& s == "ss"
{
plan = Plan::Window {
window_type: WindowType::TumblingWindow,
length: *v,
input: Box::new(plan),
};
group_by.remove(0);
match se {
ScalarExpr::FunctionCall(name, args) => {
if name.eq(&String::from("tumblingWindow")) {
if args.len() == 2 {
match args[0].clone() {
ScalarExpr::Literal(datum) => match datum {
Datum::String(s) => {
if s.eq(&String::from("ss")) {
match args[1].clone() {
ScalarExpr::Literal(datum) => match datum {
Datum::Int(v) => {
plan = Plan::Window {
window_type: WindowType::TumblingWindow,
length: v,
input: Box::new(plan),
};
group_by.remove(0);
}
_ => {}
},
_ => {}
}
}
}
_ => {}
},
_ => {}
}
}
}
}
_ => {}
}
}

Expand Down Expand Up @@ -426,7 +444,7 @@ impl<'a> Binder<'a> {
// Project the result
let plan = Plan::Project {
input: Box::new(plan),
projections: output_projections.to_vec(),
projections: output_projections.iter().map(|item| item.clone()).collect(),
};

let output_scope = Scope {
Expand Down
2 changes: 1 addition & 1 deletion src/sql/planner/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn bind_aggregate_function(
));
}

if let Some(arg) = func.args.first() {
if let Some(arg) = func.args.get(0) {
match arg {
ast::FunctionArg::Unnamed(arg) => match arg {
// Rewrite count(*) to count()
Expand Down
5 changes: 3 additions & 2 deletions src/sql/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl<'a> ExecutorBuilder<'a> {
ExecuteTreeNode::from(WindowExecutor::new(
Box::new(input_executor.unwrap_execute_tree_node()),
(*window_type).clone(),
*length,
(*length).clone(),
))
.into(),
schema,
Expand Down Expand Up @@ -242,7 +242,8 @@ impl<'a> ExecutorBuilder<'a> {
schema,
))
}
Plan::Explain(_) => {

Plan::Explain(display_str) => {
let values_exec =
ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into();
Ok((values_exec, Schema::default()))
Expand Down
Loading

0 comments on commit a396948

Please sign in to comment.