diff --git a/bindings/js/Cargo.toml b/bindings/js/Cargo.toml new file mode 100644 index 0000000..5215ead --- /dev/null +++ b/bindings/js/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "tonbo-js" +version = "0.1.0" +edition = "2021" + +[lib] +crate-type = ["cdylib", "rlib"] + +[workspace] + +[dependencies] + +fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio", version = "0.3.3", features = [ + "aws", + "opfs", +] } +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "216eb446fb0a0c6e5e85bfac51a6f6ed8e5ed606", package = "fusio-dispatch", version = "0.2.1", features = [ + "aws", + "opfs", +] } +futures = { version = "0.3" } +js-sys = { version = "0.3.72" } +tonbo = { version = "0.2.0", path = "../../", default-features = false, features = [ + "bytes", + "wasm", +] } + +wasm-bindgen = "0.2.95" +wasm-bindgen-futures = { version = "0.4.45" } +wasm-streams = "0.4.2" + +[dev-dependencies] +wasm-bindgen = "0.2.95" +wasm-bindgen-futures = { version = "0.4.45" } +wasm-bindgen-test = "0.3.9" +web-sys = { version = "0.3", features = ["console"] } +wasm-streams = "0.4.2" + +[package.metadata.wasm-pack.profile.release] +wasm-opt = false \ No newline at end of file diff --git a/bindings/js/examples/db/index.js b/bindings/js/examples/db/index.js new file mode 100644 index 0000000..acba121 --- /dev/null +++ b/bindings/js/examples/db/index.js @@ -0,0 +1,27 @@ +import { userSchema } from "./schema"; +import { TonboDB, DbOption, Bound } from "./pkg/tonbo_js"; + + +const option = new DbOption("store_dir"); +const db = await new TonboDB(option, userSchema); + +await db.insert({ id: 0, name: "Alice" }); + +const id = await db.get(0, (val) => val.id); +console.log(id); + +await db.transaction(async (txn) => { + txn.insert({ id: 1, name: "Bob" }); + const record1 = await txn.get(1, ["id"]); + const record2 = await txn.get(0, ["id"]); + console.log(record1); + console.log(record2); + // can not read uncommitted change + const uncommitted_name = await db.get(1, (val) => val.name); + console.log("read uncommitted name: ", uncommitted_name); + await txn.commit(); + const name = await db.get(1, (val) => val.name); + console.log("read committed name: ", name); +}); + + diff --git a/bindings/js/examples/db/package.json b/bindings/js/examples/db/package.json new file mode 100644 index 0000000..d9653c6 --- /dev/null +++ b/bindings/js/examples/db/package.json @@ -0,0 +1,13 @@ +{ + "scripts": { + "build": "wasm-pack build", + "serve": "cp -r ../../pkg ./ && webpack serve" + }, + "devDependencies": { + "@wasm-tool/wasm-pack-plugin": "1.5.0", + "html-webpack-plugin": "^5.6.0", + "webpack": "^5.93.0", + "webpack-cli": "^5.1.4", + "webpack-dev-server": "^5.0.4" + } +} diff --git a/bindings/js/examples/db/schema.js b/bindings/js/examples/db/schema.js new file mode 100644 index 0000000..2c3d554 --- /dev/null +++ b/bindings/js/examples/db/schema.js @@ -0,0 +1,11 @@ +export const userSchema = { + id: { + primary: true, + type: "UInt8", + nullable: false, + }, + name: { + type: "String", + nullable: true, + }, +}; diff --git a/bindings/js/examples/db/webpack.config.js b/bindings/js/examples/db/webpack.config.js new file mode 100644 index 0000000..80540e4 --- /dev/null +++ b/bindings/js/examples/db/webpack.config.js @@ -0,0 +1,22 @@ +const path = require("path"); +const HtmlWebpackPlugin = require("html-webpack-plugin"); +const webpack = require("webpack"); +const WasmPackPlugin = require("@wasm-tool/wasm-pack-plugin"); + +module.exports = { + entry: "./index.js", + output: { + path: path.resolve(__dirname, "dist"), + filename: "index.js", + }, + mode: "development", + plugins: [ + new HtmlWebpackPlugin(), + new WasmPackPlugin({ + crateDirectory: path.resolve(__dirname, "."), + }), + ], + experiments: { + asyncWebAssembly: true, + }, +}; diff --git a/bindings/js/src/datatype.rs b/bindings/js/src/datatype.rs new file mode 100644 index 0000000..615422e --- /dev/null +++ b/bindings/js/src/datatype.rs @@ -0,0 +1,53 @@ +use tonbo::record::Datatype; +use wasm_bindgen::prelude::wasm_bindgen; + +#[wasm_bindgen] +#[repr(u8)] +#[derive(Copy, Clone, Debug)] +pub enum DataType { + UInt8 = 0, + UInt16 = 1, + UInt32 = 2, + UInt64 = 3, + Int8 = 4, + Int16 = 5, + Int32 = 6, + Int64 = 7, + String = 8, + Boolean = 9, + Bytes = 10, +} + +impl From for Datatype { + fn from(datatype: DataType) -> Self { + match datatype { + DataType::UInt8 => Datatype::UInt8, + DataType::UInt16 => Datatype::UInt16, + DataType::UInt32 => Datatype::UInt32, + DataType::UInt64 => Datatype::UInt64, + DataType::Int8 => Datatype::Int8, + DataType::Int16 => Datatype::Int16, + DataType::Int32 => Datatype::Int32, + DataType::Int64 => Datatype::Int64, + DataType::String => Datatype::String, + DataType::Boolean => Datatype::Boolean, + _ => todo!(), + } + } +} + +pub(crate) fn to_datatype(datatype: &str) -> Datatype { + match datatype { + "UInt8" => Datatype::UInt8, + "UInt16" => Datatype::UInt16, + "UInt32" => Datatype::UInt32, + "UInt64" => Datatype::UInt64, + "Int8" => Datatype::Int8, + "Int16" => Datatype::Int16, + "Int32" => Datatype::Int32, + "Int64" => Datatype::Int64, + "String" => Datatype::String, + "Boolean" => Datatype::Boolean, + _ => todo!(), + } +} diff --git a/bindings/js/src/db.rs b/bindings/js/src/db.rs new file mode 100644 index 0000000..89e1077 --- /dev/null +++ b/bindings/js/src/db.rs @@ -0,0 +1,359 @@ +use std::{mem::transmute, sync::Arc}; + +use futures::TryStreamExt; +use js_sys::{Array, Function, JsString, Object, Reflect}; +use tonbo::{ + executor::opfs::OpfsExecutor, + record::{ColumnDesc, DynRecord}, + DB, +}; +use wasm_bindgen::prelude::*; + +use crate::{ + datatype::to_datatype, + options::DbOption, + transaction::Transaction, + utils::{parse_key, parse_record, to_record}, + Bound, +}; + +type JsExecutor = OpfsExecutor; + +#[wasm_bindgen] +pub struct TonboDB { + desc: Arc>, + primary_key_index: usize, + db: Arc>, +} + +impl TonboDB { + fn parse_schema(schema: Object) -> (Vec, usize) { + let mut desc = vec![]; + let mut primary_index = None; + + for (i, entry) in Object::entries(&schema).iter().enumerate() { + let pair = entry + .dyn_ref::() + .expect_throw("unexpected entry") + .to_vec(); + let name = pair[0] + .dyn_ref::() + .expect_throw("expected key to be string"); + let col = pair[1] + .dyn_ref::() + .expect_throw("convert to object failed"); + let datatype = Reflect::get(&col, &"type".into()) + .expect("type must be specified") + .as_string() + .unwrap(); + let primary = Reflect::get(&col, &"primary".into()) + .unwrap() + .as_bool() + .unwrap_or(false); + let nullable = Reflect::get(&col, &"nullable".into()) + .unwrap() + .as_bool() + .unwrap_or(false); + + if primary { + if primary_index.is_some() { + panic!("multiply primary keys are not supported!"); + } + primary_index = Some(i); + } + desc.push(ColumnDesc::new( + name.into(), + to_datatype(datatype.as_str()), + nullable, + )); + } + let primary_key_index = primary_index.expect_throw("expected to have one primary key"); + + (desc, primary_key_index) + } +} + +#[wasm_bindgen] +impl TonboDB { + #[wasm_bindgen(constructor)] + pub async fn new(option: DbOption, schema: Object) -> Self { + let (desc, primary_key_index) = Self::parse_schema(schema); + let primary_key_name = desc[primary_key_index].name.clone(); + + let db = DB::with_schema( + option.into_option(primary_key_index, primary_key_name), + JsExecutor::new(), + desc.clone(), + primary_key_index, + ) + .await + .unwrap(); + + Self { + desc: Arc::new(desc), + primary_key_index, + db: Arc::new(db), + } + } + + /// get the record with `key` as the primary key and process it using closure `cb` + pub async fn get(&self, key: JsValue, cb: Function) -> Result { + let key = parse_key(self.desc.get(self.primary_key_index).unwrap(), key, true)?; + let this = JsValue::null(); + + let record = self + .db + .get(&key, |entry| Some(entry.get().columns)) + .await + .map_err(|err| JsValue::from(err.to_string()))?; + + match record { + Some(record) => cb.call1(&this, &to_record(&record, self.primary_key_index).into()), + None => Ok(JsValue::null()), + } + } + + /// insert a single tonbo record + pub async fn insert(&self, record: Object) -> Result<(), JsValue> { + let record = parse_record(&record, &self.desc, self.primary_key_index)?; + self.db + .insert(record) + .await + .map_err(|err| JsValue::from(err.to_string()))?; + Ok(()) + } + + /// insert a sequence of data as a single batch + #[wasm_bindgen(js_name = "insertBatch")] + pub async fn insert_batch(&self, records: Vec) -> Result<(), JsValue> { + let records = records + .iter() + .map(|record| parse_record(&record, &self.desc, self.primary_key_index).unwrap()); + + self.db + .insert_batch(records.into_iter()) + .await + .map_err(|err| JsValue::from(err.to_string()))?; + Ok(()) + } + + /// delete the record with the primary key as the `key` + pub async fn remove(&self, key: JsValue) -> Result<(), JsValue> { + let key = parse_key(self.desc.get(self.primary_key_index).unwrap(), key, true)?; + self.db + .remove(key) + .await + .map_err(|err| JsValue::from(err.to_string()))?; + Ok(()) + } + + pub async fn scan( + &self, + lower: Bound, + high: Bound, + ) -> Result { + let desc = self.desc.get(self.primary_key_index).unwrap(); + let lower = lower.into_bound(desc)?; + let high = high.into_bound(desc)?; + + // FIXME: lifetime + let db = + unsafe { transmute::<&DB<_, _>, &'static DB>(self.db.as_ref()) }; + let stream = db + .scan( + ( + unsafe { + transmute::< + std::ops::Bound<&tonbo::record::Column>, + std::ops::Bound<&'static tonbo::record::Column>, + >(lower.as_ref()) + }, + unsafe { + transmute::< + std::ops::Bound<&tonbo::record::Column>, + std::ops::Bound<&'static tonbo::record::Column>, + >(high.as_ref()) + }, + ), + |entry| { + let record = entry.get(); + // to_record(&record.columns, record.primary_index).into() + to_record(&record.columns, record.primary_index) + }, + ) + .await + .map_err(|err| JsValue::from(err.to_string())); + + Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) + } + + /// open an optimistic ACID transaction + pub async fn transaction(&self, cb: Function) -> Result<(), JsValue> { + let txn = self.db.transaction().await; + + let this = JsValue::null(); + let txn = Transaction::new(txn, self.desc.clone(), self.primary_key_index); + + { + let js_txn = JsValue::from(txn); + cb.call1(&this, &js_txn)?; + } + + Ok(()) + } + + pub async fn flush(&self) -> Result<(), JsValue> { + self.db + .flush() + .await + .map_err(|err| JsValue::from(err.to_string()))?; + Ok(()) + } + + /// flush wal to disk + #[wasm_bindgen(js_name = "flushWal")] + pub async fn flush_wal(&self) -> Result<(), JsValue> { + self.db + .flush_wal() + .await + .map_err(|err| JsValue::from(err.to_string()))?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use fusio::{path::Path, DynFs}; + use js_sys::Object; + use wasm_bindgen::JsValue; + use wasm_bindgen_test::*; + + use crate::{options::DbOption, AwsCredential, S3Builder, TonboDB}; + + wasm_bindgen_test_configure!(run_in_browser); + + async fn remove(path: &str) { + let path = Path::from_opfs_path(path).unwrap(); + let fs = fusio::disk::LocalFs {}; + + fs.remove(&path).await.unwrap(); + } + + fn schema() -> Object { + let schema = Object::new(); + let id = Object::new(); + let name = Object::new(); + js_sys::Reflect::set( + &id, + &JsValue::from_str("primary"), + &JsValue::from_bool(true), + ) + .unwrap(); + js_sys::Reflect::set(&id, &JsValue::from_str("type"), &JsValue::from_str("UInt8")).unwrap(); + js_sys::Reflect::set( + &id, + &JsValue::from_str("nullable"), + &JsValue::from_bool(false), + ) + .unwrap(); + + js_sys::Reflect::set( + &name, + &JsValue::from_str("type"), + &JsValue::from_str("String"), + ) + .unwrap(); + js_sys::Reflect::set( + &name, + &JsValue::from_str("nullable"), + &JsValue::from_bool(true), + ) + .unwrap(); + + js_sys::Reflect::set(&schema, &JsValue::from_str("id"), &JsValue::from(id)).unwrap(); + js_sys::Reflect::set(&schema, &JsValue::from_str("name"), &JsValue::from(name)).unwrap(); + + schema + } + + fn test_items() -> Vec { + let mut items = vec![]; + for i in 0..50 { + let item = Object::new(); + js_sys::Reflect::set(&item, &JsValue::from_str("id"), &JsValue::from(i)).unwrap(); + js_sys::Reflect::set( + &item, + &JsValue::from_str("name"), + &JsValue::from_str(i.to_string().as_str()), + ) + .unwrap(); + + items.push(item); + } + + items + } + + #[wasm_bindgen_test] + pub async fn test_open() { + let option = DbOption::new("open".to_string()).expect("cannot open DB"); + + let schema = schema(); + let db = TonboDB::new(option, schema).await; + + drop(db); + remove("open").await; + } + + #[wasm_bindgen_test] + pub async fn test_write() { + let option = DbOption::new("write".to_string()).expect("cannot open DB"); + + let schema = schema(); + let db = TonboDB::new(option, schema).await; + + for item in test_items() { + db.insert(item).await.unwrap(); + } + + drop(db); + remove("write").await; + } + + #[ignore] + #[wasm_bindgen_test] + pub async fn test_write_s3() { + if option_env!("AWS_ACCESS_KEY_ID").is_none() { + return; + } + let key_id = option_env!("AWS_ACCESS_KEY_ID").unwrap().to_string(); + let secret_key = option_env!("AWS_SECRET_ACCESS_KEY").unwrap().to_string(); + + let fs_option = S3Builder::new("wasm-data".to_string()) + .region("ap-southeast-2".to_string()) + .credential(AwsCredential::new(key_id, secret_key, None)) + .build(); + + let option = DbOption::new("write_s3".to_string()) + .expect("cannot open DB") + .level_path(0, "js/l0".to_string(), fs_option.clone()) + .unwrap() + .level_path(1, "js/l1".to_string(), fs_option.clone()) + .unwrap() + .level_path(2, "js/l2".to_string(), fs_option.clone()) + .unwrap(); + + let schema = schema(); + let db = TonboDB::new(option, schema).await; + + for (i, item) in test_items().into_iter().enumerate() { + if i % 5 == 0 { + db.flush().await.unwrap(); + } + db.insert(item).await.unwrap(); + } + + drop(db); + remove("write_s3").await; + } +} diff --git a/bindings/js/src/fs.rs b/bindings/js/src/fs.rs new file mode 100644 index 0000000..1ac4f30 --- /dev/null +++ b/bindings/js/src/fs.rs @@ -0,0 +1,181 @@ +use fusio::path::Path; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; + +#[wasm_bindgen] +#[derive(Debug, Clone)] +pub struct AwsCredential { + #[wasm_bindgen(skip)] + pub key_id: String, + #[wasm_bindgen(skip)] + pub secret_key: String, + #[wasm_bindgen(skip)] + pub token: Option, +} + +impl From for fusio::remotes::aws::AwsCredential { + fn from(cred: AwsCredential) -> Self { + fusio::remotes::aws::AwsCredential { + key_id: cred.key_id, + secret_key: cred.secret_key, + token: cred.token, + } + } +} + +#[wasm_bindgen] +impl AwsCredential { + #[wasm_bindgen(constructor)] + pub fn new(key_id: String, secret_key: String, token: Option) -> Self { + Self { + key_id, + secret_key, + token, + } + } +} + +#[wasm_bindgen] +#[derive(Debug, Clone)] +pub struct FsOptions { + inner: FsOptionsInner, +} + +impl FsOptions { + pub(crate) fn path(&self, path: String) -> Result { + match self.inner { + FsOptionsInner::Local => { + Path::from_opfs_path(&path).map_err(|err| JsValue::from(err.to_string())) + } + FsOptionsInner::S3 { .. } => { + Path::from_url_path(&path).map_err(|err| JsValue::from(err.to_string())) + } + } + } +} + +#[derive(Debug, Clone)] +enum FsOptionsInner { + Local, + S3 { + bucket: String, + credential: Option, + region: Option, + sign_payload: Option, + checksum: Option, + endpoint: Option, + }, +} + +#[wasm_bindgen] +pub struct S3Builder { + bucket: String, + credential: Option, + region: Option, + sign_payload: Option, + checksum: Option, + endpoint: Option, +} + +#[wasm_bindgen] +impl S3Builder { + #[wasm_bindgen(constructor)] + pub fn new(bucket: String) -> Self { + Self { + bucket, + credential: None, + region: None, + sign_payload: None, + checksum: None, + endpoint: None, + } + } + + pub fn credential(self, credential: AwsCredential) -> Self { + Self { + credential: Some(credential), + ..self + } + } + + pub fn region(self, region: String) -> Self { + Self { + region: Some(region), + ..self + } + } + + pub fn sign_payload(self, sign_payload: bool) -> Self { + Self { + sign_payload: Some(sign_payload), + ..self + } + } + + pub fn checksum(self, checksum: bool) -> Self { + Self { + checksum: Some(checksum), + ..self + } + } + + pub fn endpoint(self, endpoint: String) -> Self { + Self { + endpoint: Some(endpoint), + ..self + } + } + + pub fn build(self) -> FsOptions { + let S3Builder { + bucket, + credential, + region, + sign_payload, + checksum, + endpoint, + } = self; + + FsOptions { + inner: FsOptionsInner::S3 { + bucket, + credential, + region, + sign_payload, + checksum, + endpoint, + }, + } + } +} + +#[wasm_bindgen] +impl FsOptions { + pub fn local() -> Self { + Self { + inner: FsOptionsInner::Local, + } + } +} + +impl FsOptions { + pub(crate) fn into_fs_options(self) -> fusio_dispatch::FsOptions { + match self.inner { + FsOptionsInner::Local => fusio_dispatch::FsOptions::Local, + FsOptionsInner::S3 { + bucket, + credential, + region, + sign_payload, + checksum, + endpoint, + } => fusio_dispatch::FsOptions::S3 { + bucket, + credential: credential.map(fusio::remotes::aws::AwsCredential::from), + endpoint, + region, + sign_payload, + checksum, + }, + } + } +} diff --git a/bindings/js/src/lib.rs b/bindings/js/src/lib.rs new file mode 100644 index 0000000..8b9edde --- /dev/null +++ b/bindings/js/src/lib.rs @@ -0,0 +1,11 @@ +pub mod datatype; +pub mod db; +pub mod options; +pub mod transaction; +mod utils; +pub use db::*; +pub use transaction::*; +pub mod range; +pub use range::*; +pub mod fs; +pub use fs::*; diff --git a/bindings/js/src/options.rs b/bindings/js/src/options.rs new file mode 100644 index 0000000..a34c18d --- /dev/null +++ b/bindings/js/src/options.rs @@ -0,0 +1,98 @@ +use fusio::path::Path; +use tonbo::record::DynRecord; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; + +use crate::FsOptions; + +pub(crate) const MAX_LEVEL: usize = 7; + +#[wasm_bindgen] +#[derive(Debug, Clone)] +pub struct DbOption { + /// cached message size in parquet cleaner + clean_channel_buffer: usize, + /// len threshold of `immutables` when minor compaction is triggered + immutable_chunk_num: usize, + /// magnification that triggers major compaction between different levels + level_sst_magnification: usize, + major_default_oldest_table_num: usize, + /// threshold for the number of `parquet` when major compaction is triggered + major_threshold_with_sst_size: usize, + /// Maximum size of each parquet + max_sst_file_size: usize, + version_log_snapshot_threshold: u32, + use_wal: bool, + /// Maximum size of WAL buffer size + wal_buffer_size: usize, + /// build the `DB` storage directory based on the passed path + path: String, + base_fs: FsOptions, + level_paths: Vec>, +} + +#[wasm_bindgen] +impl DbOption { + #[wasm_bindgen(constructor)] + pub fn new(path: String) -> Result { + let path = Path::from_opfs_path(path) + .map_err(|err| JsValue::from(err.to_string()))? + .to_string(); + Ok(Self { + clean_channel_buffer: 10, + immutable_chunk_num: 3, + level_sst_magnification: 10, + major_default_oldest_table_num: 3, + major_threshold_with_sst_size: 4, + max_sst_file_size: 256 * 1024 * 1024, + version_log_snapshot_threshold: 200, + use_wal: true, + wal_buffer_size: 4 * 1024, + path, + base_fs: FsOptions::local(), + level_paths: vec![None; MAX_LEVEL], + }) + } + + pub fn level_path( + mut self, + level: usize, + path: String, + fs_options: FsOptions, + ) -> Result { + self.level_paths[level] = Some((path.to_string(), fs_options)); + Ok(self) + } +} + +impl DbOption { + pub(crate) fn into_option( + self, + primary_key_index: usize, + primary_key_name: String, + ) -> tonbo::DbOption { + let mut opt = + tonbo::DbOption::with_path(Path::from(self.path), primary_key_name, primary_key_index) + .clean_channel_buffer(self.clean_channel_buffer) + .immutable_chunk_num(self.immutable_chunk_num) + .level_sst_magnification(self.level_sst_magnification) + .major_default_oldest_table_num(self.major_default_oldest_table_num) + .major_threshold_with_sst_size(self.major_threshold_with_sst_size) + .max_sst_file_size(self.max_sst_file_size) + .version_log_snapshot_threshold(self.version_log_snapshot_threshold) + .wal_buffer_size(self.wal_buffer_size) + .base_fs(self.base_fs.into_fs_options()); + + for (level, path) in self.level_paths.into_iter().enumerate() { + if let Some((path, fs_options)) = path { + let path = fs_options.path(path).unwrap(); + opt = opt + .level_path(level, path, fs_options.into_fs_options()) + .unwrap(); + } + } + if !self.use_wal { + opt = opt.disable_wal() + } + opt + } +} diff --git a/bindings/js/src/range.rs b/bindings/js/src/range.rs new file mode 100644 index 0000000..4e670cf --- /dev/null +++ b/bindings/js/src/range.rs @@ -0,0 +1,58 @@ +use tonbo::record::{Column, ColumnDesc}; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; + +use crate::utils::parse_key; + +#[wasm_bindgen] +pub struct Bound { + inner: BoundInner, +} + +enum BoundInner { + Included(JsValue), + Exculuded(JsValue), + Unbounded, +} + +#[wasm_bindgen] +impl Bound { + /// represent including bound of range, null or undefined are identical to [`Bound::unbounded`] + pub fn included(key: JsValue) -> Self { + if key.is_null() || key.is_undefined() { + return Self { + inner: BoundInner::Unbounded, + }; + } + Self { + inner: BoundInner::Included(key), + } + } + + /// represent exclusive bound of range, null or undefined are identical to [`Bound::unbounded`] + pub fn excluded(key: JsValue) -> Self { + if key.is_null() || key.is_undefined() { + return Self { + inner: BoundInner::Unbounded, + }; + } + Self { + inner: BoundInner::Exculuded(key), + } + } + + pub fn unbounded() -> Self { + Self { + inner: BoundInner::Unbounded, + } + } +} + +impl Bound { + pub(crate) fn into_bound(self, desc: &ColumnDesc) -> Result, JsValue> { + Ok(match self.inner { + BoundInner::Included(key) => std::ops::Bound::Included(parse_key(desc, key, true)?), + BoundInner::Exculuded(key) => std::ops::Bound::Excluded(parse_key(desc, key, true)?), + BoundInner::Unbounded => std::ops::Bound::Unbounded, + }) + } +} diff --git a/bindings/js/src/transaction.rs b/bindings/js/src/transaction.rs new file mode 100644 index 0000000..584d0c7 --- /dev/null +++ b/bindings/js/src/transaction.rs @@ -0,0 +1,172 @@ +use std::{mem::transmute, sync::Arc}; + +use futures::StreamExt; +use js_sys::Object; +use tonbo::{ + record::{ColumnDesc, DynRecord}, + transaction, Projection, +}; +use wasm_bindgen::{prelude::wasm_bindgen, JsValue}; + +use crate::{ + range::Bound, + utils::{parse_key, parse_record, to_record}, +}; + +#[wasm_bindgen] +pub struct Transaction { + txn: Option>, + desc: Arc>, + primary_key_index: usize, +} + +impl Transaction { + pub(crate) fn new<'txn>( + txn: transaction::Transaction<'txn, DynRecord>, + desc: Arc>, + primary_key_index: usize, + ) -> Self { + Transaction { + txn: Some(unsafe { + transmute::< + transaction::Transaction<'txn, DynRecord>, + transaction::Transaction<'static, DynRecord>, + >(txn) + }), + desc, + primary_key_index, + } + } + + fn projection(&self, projection: Vec) -> Vec { + match projection.contains(&"*".to_string()) { + true => (0..self.desc.len()).collect(), + false => self + .desc + .iter() + .enumerate() + .filter(|(_idx, col)| projection.contains(&col.name)) + .map(|(idx, _col)| idx) + .collect(), + } + } +} + +#[wasm_bindgen] +impl Transaction { + pub async fn get(&mut self, key: JsValue, projection: Vec) -> Result { + if self.txn.is_none() { + return Err("Can not operate a committed transaction".into()); + } + + let key = parse_key(self.desc.get(self.primary_key_index).unwrap(), key, true)?; + let projection = self.projection(projection); + + let entry = self + .txn + .as_ref() + .unwrap() + .get(&key, Projection::Parts(projection)) + .await + .map_err(|err| JsValue::from(err.to_string()))?; + + match entry { + Some(entry) => Ok(to_record(&entry.get().columns, self.primary_key_index).into()), + None => Ok(JsValue::NULL), + } + } + + pub fn insert(&mut self, record: Object) -> Result<(), JsValue> { + if self.txn.is_none() { + return Err("Can not operate a committed transaction".into()); + } + + let record = parse_record(&record, &self.desc, self.primary_key_index)?; + self.txn.as_mut().unwrap().insert(record); + Ok(()) + } + + pub fn remove(&mut self, key: JsValue) -> Result<(), JsValue> { + if self.txn.is_none() { + return Err("Can not operate a committed transaction".into()); + } + + let key = parse_key(self.desc.get(self.primary_key_index).unwrap(), key, true)?; + self.txn.as_mut().unwrap().remove(key); + Ok(()) + } + + pub async fn scan( + &self, + lower: Bound, + high: Bound, + limit: Option, + projection: Vec, + ) -> Result { + if self.txn.is_none() { + return Err("Can not operate a committed transaction".into()); + } + + let projection = self.projection(projection); + let desc = self.desc.get(self.primary_key_index).unwrap(); + let lower = lower.into_bound(desc)?; + let high = high.into_bound(desc)?; + + // FIXME: lifetime + let txn = self.txn.as_ref().unwrap(); + let txn = unsafe { + transmute::< + &transaction::Transaction<'_, DynRecord>, + &'static transaction::Transaction<'_, DynRecord>, + >(txn) + }; + let mut scan = txn + .scan(( + unsafe { + transmute::< + std::ops::Bound<&tonbo::record::Column>, + std::ops::Bound<&'static tonbo::record::Column>, + >(lower.as_ref()) + }, + unsafe { + transmute::< + std::ops::Bound<&tonbo::record::Column>, + std::ops::Bound<&'static tonbo::record::Column>, + >(high.as_ref()) + }, + )) + .projection(projection); + + if let Some(limit) = limit { + scan = scan.limit(limit); + } + + let stream = scan + .take() + .await + .map_err(|err| JsValue::from(err.to_string()))?; + + let stream = stream.map(|res| { + res.map(|entry| match entry.value() { + Some(record) => to_record(&record.columns, record.primary_index).into(), + None => JsValue::NULL, + }) + .map_err(|err| JsValue::from(err.to_string())) + }); + Ok(wasm_streams::ReadableStream::from_stream(stream).into_raw()) + } + + pub async fn commit(&mut self) -> Result<(), JsValue> { + if self.txn.is_none() { + return Err("Can not operate a committed transaction".into()); + } + + let txn = self.txn.take(); + txn.unwrap() + .commit() + .await + .map_err(|err| JsValue::from(err.to_string()))?; + + Ok(()) + } +} diff --git a/bindings/js/src/utils.rs b/bindings/js/src/utils.rs new file mode 100644 index 0000000..294f9f0 --- /dev/null +++ b/bindings/js/src/utils.rs @@ -0,0 +1,199 @@ +use std::{any::Any, sync::Arc}; + +use js_sys::{Object, Reflect, Uint8Array}; +use tonbo::record::{Column, ColumnDesc, Datatype, DynRecord}; +use wasm_bindgen::{JsCast, JsValue}; + +fn to_col_value(value: T, primary: bool) -> Arc { + match primary { + true => Arc::new(value) as Arc, + false => Arc::new(Some(value)) as Arc, + } +} + +fn none_value(datatype: Datatype) -> Arc { + match datatype { + Datatype::UInt8 => Arc::new(Option::::None), + Datatype::UInt16 => Arc::new(Option::::None), + Datatype::UInt32 => Arc::new(Option::::None), + Datatype::UInt64 => Arc::new(Option::::None), + Datatype::Int8 => Arc::new(Option::::None), + Datatype::Int16 => Arc::new(Option::::None), + Datatype::Int32 => Arc::new(Option::::None), + Datatype::Int64 => Arc::new(Option::::None), + Datatype::String => Arc::new(Option::::None), + Datatype::Boolean => Arc::new(Option::::None), + Datatype::Bytes => Arc::new(Option::>::None), + } +} + +pub(crate) fn parse_key(desc: &ColumnDesc, key: JsValue, primary: bool) -> Result { + if key.is_undefined() || key.is_null() { + match primary || !desc.is_nullable { + true => return Err(format!("{} can not be null", &desc.name).into()), + false => { + return Ok(Column::new( + desc.datatype, + desc.name.clone(), + none_value(desc.datatype), + desc.is_nullable, + )) + } + } + } + let value: Arc = match desc.datatype { + Datatype::UInt8 => to_col_value::(key.as_f64().unwrap().round() as u8, primary), + Datatype::UInt16 => to_col_value::(key.as_f64().unwrap().round() as u16, primary), + Datatype::UInt32 => to_col_value::(key.as_f64().unwrap().round() as u32, primary), + Datatype::UInt64 => to_col_value::(key.as_f64().unwrap().round() as u64, primary), + Datatype::Int8 => to_col_value::(key.as_f64().unwrap().round() as i8, primary), + Datatype::Int16 => to_col_value::(key.as_f64().unwrap().round() as i16, primary), + Datatype::Int32 => to_col_value::(key.as_f64().unwrap().round() as i32, primary), + Datatype::Int64 => to_col_value::(key.as_f64().unwrap().round() as i64, primary), + Datatype::String => to_col_value::(key.as_string().unwrap(), primary), + Datatype::Boolean => to_col_value::(key.as_bool().unwrap(), primary), + Datatype::Bytes => { + to_col_value::>(key.dyn_into::().unwrap().to_vec(), primary) + } + }; + + Ok(Column::new( + desc.datatype, + desc.name.clone(), + value, + desc.is_nullable, + )) +} + +pub(crate) fn parse_record( + record: &Object, + schema: &Vec, + primary_key_index: usize, +) -> Result { + let mut cols = Vec::with_capacity(schema.len()); + for (idx, col_desc) in schema.iter().enumerate() { + let name = col_desc.name.as_str(); + let js_val = Reflect::get(record, &name.into())?; + cols.push(parse_key(col_desc, js_val, primary_key_index == idx)?); + } + Ok(DynRecord::new(cols, primary_key_index)) +} + +pub(crate) fn to_record(cols: &Vec, primary_key_index: usize) -> JsValue { + let obj = Object::new(); + for (idx, col) in cols.iter().enumerate() { + let value = match col.datatype { + Datatype::UInt8 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::UInt16 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::UInt32 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::UInt64 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::Int8 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::Int16 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::Int32 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::Int64 => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::String => match idx == primary_key_index { + true => (col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (col + .value + .as_ref() + .downcast_ref::>() + .unwrap() + .clone()) + .into(), + }, + Datatype::Boolean => match idx == primary_key_index { + true => (*col.value.as_ref().downcast_ref::().unwrap()).into(), + false => (*col.value.as_ref().downcast_ref::>().unwrap()).into(), + }, + Datatype::Bytes => match idx == primary_key_index { + true => Uint8Array::from( + col.value + .as_ref() + .downcast_ref::>() + .as_ref() + .unwrap() + .as_slice(), + ) + .into(), + false => col + .value + .as_ref() + .downcast_ref::>>() + .unwrap() + .as_ref() + .map(|v| Uint8Array::from(v.as_slice()).into()) + .unwrap_or(JsValue::NULL), + }, + }; + + Reflect::set(&obj, &col.name.as_str().into(), &value).unwrap(); + } + obj.into() +} + +#[cfg(test)] +mod tests { + + use tonbo::record::{ColumnDesc, Datatype}; + use wasm_bindgen::JsValue; + use wasm_bindgen_test::wasm_bindgen_test; + + use crate::utils::parse_key; + + #[wasm_bindgen_test] + fn test_parse_key() { + { + let desc = ColumnDesc::new("id".to_string(), Datatype::UInt64, false); + let key_col = parse_key(&desc, JsValue::from(19), true).unwrap(); + assert_eq!(key_col.datatype, Datatype::UInt64); + assert_eq!(key_col.value.as_ref().downcast_ref::(), Some(&19_u64)); + } + { + let desc = ColumnDesc::new("id".to_string(), Datatype::UInt64, false); + let result = parse_key(&desc, JsValue::NULL, true); + assert!(result.is_err()); + } + { + let desc = ColumnDesc::new("name".to_string(), Datatype::String, false); + let key_col = parse_key(&desc, JsValue::from("Hello tonbo"), false).unwrap(); + assert_eq!(key_col.datatype, Datatype::String); + assert_eq!( + key_col.value.as_ref().downcast_ref::>(), + Some(&Some("Hello tonbo".to_string())) + ); + } + { + let desc = ColumnDesc::new("data".to_string(), Datatype::Bytes, false); + let key_col = parse_key(&desc, JsValue::from(b"Hello tonbo".to_vec()), false).unwrap(); + assert_eq!(key_col.datatype, Datatype::Bytes); + assert_eq!( + key_col.value.as_ref().downcast_ref::>>(), + Some(&Some(b"Hello tonbo".to_vec())) + ); + } + } +}