Skip to content

Commit

Permalink
Merge pull request #11 from longfangsong/master
Browse files Browse the repository at this point in the history
Re-merge #8 and fix CI
  • Loading branch information
Yisaer authored Aug 23, 2024
2 parents 66dbd5a + 221f471 commit 66a7515
Show file tree
Hide file tree
Showing 16 changed files with 78 additions and 125 deletions.
18 changes: 3 additions & 15 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,7 @@ jobs:

steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Set up Rust
uses: actions-rs/toolchain@v1
with:
toolchain: stable
profile: minimal
override: true
uses: actions/checkout@v4

- name: Build binary
run: cargo build --release
Expand All @@ -31,11 +24,6 @@ jobs:
CARGO_TERM_COLOR: always

- name: Upload release asset
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
uses: softprops/action-gh-release@v2
with:
upload_url: ${{ github.event.release.upload_url }}
asset_path: ./release.tar.gz
asset_name: release.tar.gz
asset_content_type: application/gzip
files: release.tar.gz
4 changes: 1 addition & 3 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,4 @@ jobs:
steps:
- uses: actions/checkout@v4
- name: Build
run: cargo build --verbose
# - name: Run tests
# run: cargo test --verbose
run: cargo build
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ prometheus = "0.13"
sysinfo = "0.26"
toml = "0.5"
once_cell = "1.8"
prometheus-static-metric = "0.5.1"

[dev-dependencies]
sqllogictest = "0.13.0"
5 changes: 2 additions & 3 deletions src/core/tuple.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use log::info;
use std::{collections::HashMap, fmt::Display};

use crate::sql::planner::binder::ProjItem;
Expand All @@ -18,8 +17,8 @@ impl Tuple {
}

pub fn new_default() -> Self {
let mut values = Vec::new();
let mut columns = Vec::new();
let values = Vec::new();
let columns = Vec::new();
Tuple::new(columns, values)
}

Expand Down
41 changes: 14 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
#![feature(let_chains)]

#[macro_use]
extern crate lazy_static;

use std::{collections::HashMap, mem, sync::Arc, time::Duration};

use axum::{
extract::{self},
response::IntoResponse,
routing::{delete, get, post},
Json, Router,
};
use log::{info, LevelFilter};
use prometheus::{Encoder, IntGauge, Registry, TextEncoder};
use prometheus::{Encoder, Registry, TextEncoder};
use rumqttc::QoS;
use serde::{Deserialize, Serialize};
use sysinfo::{CpuExt, System, SystemExt};
use tokio::{
sync::{mpsc, Mutex},
time,
};
use tokio::{sync::Mutex, time};

use catalog::Catalog;
use core::Tuple;
Expand All @@ -31,6 +29,7 @@ mod catalog;
mod config;
mod connector;
mod core;
mod metrics;
mod sql;
mod storage;
mod util;
Expand All @@ -42,14 +41,11 @@ struct AppState {
views: HashMap<usize, View>,
next_id: usize,
dummy_subscribers: HashMap<usize, Vec<Tuple>>,
registry: Registry,
}

impl AppState {
pub fn new(registry: Registry) -> Self {
let mut app_state = Self::default();
app_state.registry = registry;
return app_state;
pub fn new() -> Self {
Self::default()
}
}

Expand Down Expand Up @@ -92,11 +88,10 @@ async fn execute_sql(
}
}
});
let view_manager = state.clone();
tokio::spawn(async move {
while let Ok(Ok(result)) = receiver.recv().await {
if !result.is_none() {
let data = result.unwrap().parse_into_json().unwrap();
if let Some(result) = result {
let data = result.parse_into_json().unwrap();
sender
.client
.publish("/yisa/data2", QoS::AtLeastOnce, false, data)
Expand Down Expand Up @@ -145,11 +140,10 @@ async fn ping() -> &'static str {
"pong"
}

async fn metrics_handler(extract::State(state): extract::State<Arc<Mutex<AppState>>>) -> String {
let mut state = state.lock().await;
async fn metrics_handler() -> String {
let mut buffer = Vec::new();
let encoder = TextEncoder::new();
let metric_families = state.registry.gather();
let metric_families = prometheus::gather();
encoder.encode(&metric_families, &mut buffer).unwrap();
String::from_utf8(buffer).unwrap()
}
Expand All @@ -164,13 +158,6 @@ pub async fn main() {
// initialize Prometheus registry
let registry = Registry::new();

// cpu and memory gauge
let cpu_gauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
let memory_gauge = IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();

// register gauge
registry.register(Box::new(cpu_gauge.clone())).unwrap();
registry.register(Box::new(memory_gauge.clone())).unwrap();
let mut sys = System::new();
tokio::spawn(async move {
let mut interval = time::interval(Duration::from_secs(1));
Expand All @@ -179,12 +166,12 @@ pub async fn main() {
sys.refresh_all();
let cpu_usage = sys.global_cpu_info().cpu_usage() as i64;
let memory_usage = sys.used_memory() as i64;
cpu_gauge.set(cpu_usage);
memory_gauge.set(memory_usage);
metrics::CPU.set(cpu_usage);
metrics::MEMORY.set(memory_usage);
}
});

let app_state = AppState::new(registry);
let app_state = AppState::new();

// Initialize database
let catalog = Catalog::new();
Expand Down
7 changes: 7 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
use prometheus::IntGauge;

lazy_static! {
pub static ref CPU: IntGauge = IntGauge::new("cpu_usage", "CPU usage in percentage").unwrap();
pub static ref MEMORY: IntGauge =
IntGauge::new("memory_usage", "Memory usage in bytes").unwrap();
}
2 changes: 1 addition & 1 deletion src/sql/expression/aggregate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl AggregateFunctionRegistry {
});
self.functions
.entry(name.to_string())
.or_insert_with(Vec::new)
.or_default()
.push(func);
}

Expand Down
4 changes: 2 additions & 2 deletions src/sql/expression/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl ScalarFunctionRegistry {

self.functions
.entry(name.to_string())
.or_insert_with(Vec::new)
.or_default()
.push(Arc::new(scalar_func));
}

Expand Down Expand Up @@ -82,7 +82,7 @@ impl ScalarFunctionRegistry {

self.functions
.entry(name.to_string())
.or_insert_with(Vec::new)
.or_default()
.push(Arc::new(scalar_func));
}

Expand Down
2 changes: 1 addition & 1 deletion src/sql/expression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ impl Expression {
pub fn eval(&self, tuple: &Tuple) -> Result<Datum, SQLError> {
match self {
Expression::Column(column_name, _) => {
Ok(tuple.get_by_name(&column_name).ok_or_else(|| {
Ok(tuple.get_by_name(column_name).ok_or_else(|| {
SQLError::new(
ErrorKind::RuntimeError,
format!("cannot find column at name: {column_name}"),
Expand Down
52 changes: 15 additions & 37 deletions src/sql/planner/binder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@ use sqlparser::ast::{
Expr, Ident, JoinConstraint, JoinOperator, ObjectName, Query, Select, SelectItem, SetExpr,
Statement, TableAlias, TableFactor, TableWithJoins, Visit,
};
use std::any::Any;

use crate::{
catalog::defs::{ColumnDefinition, TableDefinition},
core::{ErrorKind, SQLError, Tuple, Type},
core::{ErrorKind, SQLError, Type},
sql::{
planner::{scalar::bind_scalar, scope::Scope},
runtime::DDLJob,
Expand All @@ -23,7 +22,6 @@ use super::{
};

use crate::core::Datum;
use log::info;

struct FlattenedSelectItem {
expr: Expr,
Expand Down Expand Up @@ -208,7 +206,7 @@ impl<'a> Binder<'a> {
if alias_name.eq(&String::from("?column?")) {
return column_name;
}
return alias_name;
alias_name
}

pub fn bind_select_statement(
Expand Down Expand Up @@ -250,40 +248,20 @@ impl<'a> Binder<'a> {
let mut group_by = select_stmt.group_by.clone();

if group_by.len() == 1 {
let group_by_expr = group_by.get(0).unwrap();
let group_by_expr = group_by.first().unwrap();
let se = bind_scalar(ctx, &from_scope, group_by_expr)?;
match se {
ScalarExpr::FunctionCall(name, args) => {
if name.eq(&String::from("tumblingWindow")) {
if args.len() == 2 {
match args[0].clone() {
ScalarExpr::Literal(datum) => match datum {
Datum::String(s) => {
if s.eq(&String::from("ss")) {
match args[1].clone() {
ScalarExpr::Literal(datum) => match datum {
Datum::Int(v) => {
plan = Plan::Window {
window_type: WindowType::TumblingWindow,
length: v,
input: Box::new(plan),
};
group_by.remove(0);
}
_ => {}
},
_ => {}
}
}
}
_ => {}
},
_ => {}
}
}
}
if let ScalarExpr::FunctionCall(name, args) = se {
if name == "tumblingWindow" && args.len() == 2 && let (
ScalarExpr::Literal(Datum::String(s)),
ScalarExpr::Literal(Datum::Int(v)),
) = (&args[0], &args[1]) && s == "ss" {
plan = Plan::Window {
window_type: WindowType::TumblingWindow,
length: *v,
input: Box::new(plan),
};
group_by.remove(0);
}
_ => {}
}
}

Expand Down Expand Up @@ -445,7 +423,7 @@ impl<'a> Binder<'a> {
// Project the result
let plan = Plan::Project {
input: Box::new(plan),
projections: output_projections.iter().map(|item| item.clone()).collect(),
projections: output_projections.to_vec(),
is_wildcard,
};

Expand Down
2 changes: 1 addition & 1 deletion src/sql/planner/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub fn bind_aggregate_function(
));
}

if let Some(arg) = func.args.get(0) {
if let Some(arg) = func.args.first() {
match arg {
ast::FunctionArg::Unnamed(arg) => match arg {
// Rewrite count(*) to count()
Expand Down
7 changes: 3 additions & 4 deletions src/sql/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl<'a> ExecutorBuilder<'a> {
ExecuteTreeNode::from(ProjectExecutor::new(
Box::new(input_executor.unwrap_execute_tree_node()),
projections.clone(),
is_wildcard.clone(),
*is_wildcard,
))
.into(),
schema.project(projections),
Expand Down Expand Up @@ -127,7 +127,7 @@ impl<'a> ExecutorBuilder<'a> {
ExecuteTreeNode::from(WindowExecutor::new(
Box::new(input_executor.unwrap_execute_tree_node()),
(*window_type).clone(),
(*length).clone(),
*length,
))
.into(),
schema,
Expand Down Expand Up @@ -247,8 +247,7 @@ impl<'a> ExecutorBuilder<'a> {
schema,
))
}

Plan::Explain(display_str) => {
Plan::Explain(_) => {
let values_exec =
ExecuteTreeNode::from(ValuesExecutor::new(vec![Tuple::new_default()])).into();
Ok((values_exec, Schema::default()))
Expand Down
Loading

0 comments on commit 66a7515

Please sign in to comment.