From 8165074f95395ae043f3343386d388b357b5b8f7 Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Mon, 16 Sep 2024 23:51:08 +0200 Subject: [PATCH 1/5] wip --- grovedb/src/debugger.rs | 166 +++++++++++++++++++++++++++++++------- grovedbg-types/src/lib.rs | 17 ++++ 2 files changed, 152 insertions(+), 31 deletions(-) diff --git a/grovedb/src/debugger.rs b/grovedb/src/debugger.rs index da4657ec..c67f901e 100644 --- a/grovedb/src/debugger.rs +++ b/grovedb/src/debugger.rs @@ -1,6 +1,11 @@ //! GroveDB debugging support module. -use std::{collections::BTreeMap, fs, sync::Weak}; +use std::{ + collections::{BTreeMap, HashMap}, + fs, + sync::{Arc, Weak}, + time::{Instant, SystemTime}, +}; use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router}; use grovedb_merk::{ @@ -11,13 +16,18 @@ use grovedb_merk::{ use grovedb_path::SubtreePath; use grovedb_version::version::GroveVersion; use grovedbg_types::{ - MerkProofNode, MerkProofOp, NodeFetchRequest, NodeUpdate, Path, PathQuery, Query, QueryItem, - SizedQuery, SubqueryBranch, + DropSessionRequest, MerkProofNode, MerkProofOp, NewSessionResponse, NodeFetchRequest, + NodeUpdate, Path, PathQuery, Query, QueryItem, SessionId, SizedQuery, SubqueryBranch, + WithSession, }; use indexmap::IndexMap; +use tempfile::tempdir; use tokio::{ net::ToSocketAddrs, - sync::mpsc::{self, Sender}, + sync::{ + mpsc::{self, Sender}, + RwLock, RwLockReadGuard, + }, }; use tower_http::services::ServeDir; @@ -45,13 +55,22 @@ where .expect("cannot extract grovedbg contents"); let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1); + + let state: AppState = AppState { + shutdown: shutdown_send, + grovedb, + sessions: Default::default(), + }; + let app = Router::new() + .route("/new_session", post(new_session)) + .route("/drop_session", post(drop_session)) .route("/fetch_node", post(fetch_node)) .route("/fetch_root_node", post(fetch_root_node)) .route("/prove_path_query", post(prove_path_query)) .route("/fetch_with_path_query", post(fetch_with_path_query)) .fallback_service(ServeDir::new(grovedbg_www)) - .with_state((shutdown_send, grovedb)); + .with_state(state); tokio::runtime::Runtime::new() .unwrap() @@ -69,8 +88,79 @@ where }); } +#[derive(Clone)] +struct AppState { + shutdown: Sender<()>, + grovedb: Weak, + sessions: Arc>>, +} + +impl AppState { + async fn verify_running(&self) -> Result<(), AppError> { + if self.grovedb.strong_count() == 0 { + self.shutdown.send(()).await.ok(); + Err(AppError::Closed) + } else { + Ok(()) + } + } + + async fn new_session(&self) -> Result { + let grovedb = self.grovedb.upgrade().ok_or(AppError::Closed)?; + let id = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .expect("time went backwards") + .as_secs(); + self.sessions + .write() + .await + .insert(id, Session::new(&grovedb)?); + + Ok(id) + } + + async fn drop_session(&self, id: SessionId) { + self.sessions.write().await.remove(&id); + } + + async fn get_snapshot(&self, id: SessionId) -> Result, AppError> { + self.verify_running().await?; + let mut lock = self.sessions.write().await; + if let Some(session) = lock.get_mut(&id) { + session.last_access = Instant::now(); + Ok(RwLockReadGuard::map(lock.downgrade(), |l| { + &l.get(&id).as_ref().expect("checked above").snapshot + })) + } else { + Err(AppError::NoSession) + } + } +} + +struct Session { + last_access: Instant, + _tempdir: tempfile::TempDir, + snapshot: GroveDb, +} + +impl Session { + fn new(grovedb: &GroveDb) -> Result { + let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?; + grovedb + .create_checkpoint(tempdir.path()) + .map_err(|e| AppError::Any(e.to_string()))?; + let snapshot = GroveDb::open(tempdir.path()).map_err(|e| AppError::Any(e.to_string()))?; + Ok(Session { + last_access: Instant::now(), + _tempdir: tempdir, + snapshot, + }) + } +} + enum AppError { Closed, + NoSession, Any(String), } @@ -80,6 +170,9 @@ impl IntoResponse for AppError { AppError::Closed => { (StatusCode::SERVICE_UNAVAILABLE, "GroveDB is closed").into_response() } + AppError::NoSession => { + (StatusCode::UNAUTHORIZED, "No session with this id").into_response() + } AppError::Any(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(), } } @@ -91,16 +184,28 @@ impl From for AppError { } } +async fn new_session(State(state): State) -> Result, AppError> { + Ok(Json(NewSessionResponse { + session_id: state.new_session().await?, + })) +} + +async fn drop_session( + State(state): State, + Json(DropSessionRequest { session_id }): Json, +) { + state.drop_session(session_id).await; +} + async fn fetch_node( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(NodeFetchRequest { path, key }): Json, + State(state): State, + Json(WithSession { + session_id, + request: NodeFetchRequest { path, key }, + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; - // todo: GroveVersion::latest() to actual version let merk = db .open_non_transactional_merk_at_path(path.as_slice().into(), None, GroveVersion::latest()) .unwrap()?; @@ -115,14 +220,14 @@ async fn fetch_node( } async fn fetch_root_node( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, + State(state): State, + Json(WithSession { + session_id, + request: (), + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; - // todo: GroveVersion::latest() to actual version let merk = db .open_non_transactional_merk_at_path(SubtreePath::empty(), None, GroveVersion::latest()) .unwrap()?; @@ -138,13 +243,13 @@ async fn fetch_root_node( } async fn prove_path_query( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(json_path_query): Json, + State(state): State, + Json(WithSession { + session_id, + request: json_path_query, + }): Json>, ) -> Result, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; let path_query = path_query_to_grovedb(json_path_query); @@ -155,13 +260,13 @@ async fn prove_path_query( } async fn fetch_with_path_query( - State((shutdown, grovedb)): State<(Sender<()>, Weak)>, - Json(json_path_query): Json, + State(state): State, + Json(WithSession { + session_id, + request: json_path_query, + }): Json>, ) -> Result>, AppError> { - let Some(db) = grovedb.upgrade() else { - shutdown.send(()).await.ok(); - return Err(AppError::Closed); - }; + let db = state.get_snapshot(session_id).await?; let path_query = path_query_to_grovedb(json_path_query); @@ -487,7 +592,6 @@ fn node_to_update( feature_type, }: NodeDbg, ) -> Result { - // todo: GroveVersion::latest() to actual version let grovedb_element = crate::Element::deserialize(&value, GroveVersion::latest())?; let element = element_to_grovedbg(grovedb_element); diff --git a/grovedbg-types/src/lib.rs b/grovedbg-types/src/lib.rs index f1cc53bd..dd9fc007 100644 --- a/grovedbg-types/src/lib.rs +++ b/grovedbg-types/src/lib.rs @@ -6,6 +6,23 @@ use serde_with::{base64::Base64, serde_as}; pub type Key = Vec; pub type Path = Vec; pub type PathSegment = Vec; +pub type SessionId = u64; + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct WithSession { + pub session_id: SessionId, + pub request: R, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct NewSessionResponse { + pub session_id: SessionId, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DropSessionRequest { + pub session_id: SessionId, +} #[serde_as] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] From 5fc422919027abaf0a16b6c60f351dd89d40c05b Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Tue, 17 Sep 2024 18:41:50 +0200 Subject: [PATCH 2/5] bump grovedbg zip --- grovedb/Cargo.toml | 8 +++-- grovedb/build.rs | 4 +-- grovedb/src/debugger.rs | 77 ++++++++++++++++++++++++++--------------- 3 files changed, 56 insertions(+), 33 deletions(-) diff --git a/grovedb/Cargo.toml b/grovedb/Cargo.toml index 2aeeae9a..e8c009aa 100644 --- a/grovedb/Cargo.toml +++ b/grovedb/Cargo.toml @@ -27,13 +27,14 @@ indexmap = "2.2.6" intmap = { version = "2.0.0", optional = true } grovedb-path = { version = "2.0.3", path = "../path" } grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true } -tokio = { version = "1.37.0", features = ["rt-multi-thread", "net"], optional = true } -axum = { version = "0.7.5", features = ["macros"], optional = true } -tower-http = { version = "0.5.2", features = ["fs"], optional = true } +tokio = { version = "1.37", features = ["rt-multi-thread", "net"], optional = true } +axum = { version = "0.7", features = ["macros"], optional = true } +tower-http = { version = "0.5", features = ["fs"], optional = true } blake3 = "1.4.0" bitvec = "1" zip-extensions = { version ="0.6.2", optional = true } grovedb-version = { path = "../grovedb-version", version = "2.0.3" } +tokio-util = { version = "0.7", optional = true } [dev-dependencies] rand = "0.8.5" @@ -74,6 +75,7 @@ estimated_costs = ["full"] grovedbg = [ "grovedbg-types", "tokio", + "tokio-util", "full", "grovedb-merk/grovedbg", "axum", diff --git a/grovedb/build.rs b/grovedb/build.rs index 66e72d15..ba614eff 100644 --- a/grovedb/build.rs +++ b/grovedb/build.rs @@ -6,8 +6,8 @@ fn main() { use sha2::{digest::FixedOutput, Digest, Sha256}; const GROVEDBG_SHA256: [u8; 32] = - hex!("ea7d9258973aa765eaf5064451fc83efa22e0ce6eaf2938507e2703571364e35"); - const GROVEDBG_VERSION: &str = "v1.0.0-rc.6"; + hex!("4c05d47613b1a68caaf5229e460720d7958b2fe0148fd0f5edd04f827052db64"); + const GROVEDBG_VERSION: &str = "v1.0.0"; let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap()); let grovedbg_zip_path = out_dir.join("grovedbg.zip"); diff --git a/grovedb/src/debugger.rs b/grovedb/src/debugger.rs index c67f901e..1c07504f 100644 --- a/grovedb/src/debugger.rs +++ b/grovedb/src/debugger.rs @@ -4,7 +4,7 @@ use std::{ collections::{BTreeMap, HashMap}, fs, sync::{Arc, Weak}, - time::{Instant, SystemTime}, + time::{Duration, Instant, SystemTime}, }; use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router}; @@ -24,11 +24,11 @@ use indexmap::IndexMap; use tempfile::tempdir; use tokio::{ net::ToSocketAddrs, - sync::{ - mpsc::{self, Sender}, - RwLock, RwLockReadGuard, - }, + select, + sync::{RwLock, RwLockReadGuard}, + time::sleep, }; +use tokio_util::sync::CancellationToken; use tower_http::services::ServeDir; use crate::{ @@ -40,6 +40,8 @@ use crate::{ const GROVEDBG_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/grovedbg.zip")); +const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 10); + pub(super) fn start_visualizer(grovedb: Weak, addr: A) where A: ToSocketAddrs + Send + 'static, @@ -54,10 +56,10 @@ where zip_extensions::read::zip_extract(&grovedbg_zip, &grovedbg_www) .expect("cannot extract grovedbg contents"); - let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1); + let cancellation_token = CancellationToken::new(); let state: AppState = AppState { - shutdown: shutdown_send, + cancellation_token: cancellation_token.clone(), grovedb, sessions: Default::default(), }; @@ -70,35 +72,53 @@ where .route("/prove_path_query", post(prove_path_query)) .route("/fetch_with_path_query", post(fetch_with_path_query)) .fallback_service(ServeDir::new(grovedbg_www)) - .with_state(state); - - tokio::runtime::Runtime::new() - .unwrap() - .block_on(async move { - let listener = tokio::net::TcpListener::bind(addr) - .await - .expect("can't bind visualizer port"); - axum::serve(listener, app) - .with_graceful_shutdown(async move { - shutdown_receive.recv().await; - }) - .await - .unwrap() - }); + .with_state(state.clone()); + + let rt = tokio::runtime::Runtime::new().unwrap(); + + let cloned_cancellation_token = cancellation_token.clone(); + rt.spawn(async move { + loop { + select! { + _ = cloned_cancellation_token.cancelled() => break, + _ = sleep(Duration::from_secs(10)) => { + let now = Instant::now(); + let mut lock = state.sessions.write().await; + let to_delete: Vec = lock.iter().filter_map( + |(id, session)| (session.last_access < now - SESSION_TIMEOUT).then_some(*id) + ).collect(); + + to_delete.into_iter().for_each(|id| { lock.remove(&id); }); + } + } + } + }); + + rt.block_on(async move { + let listener = tokio::net::TcpListener::bind(addr) + .await + .expect("can't bind visualizer port"); + axum::serve(listener, app) + .with_graceful_shutdown(async move { + cancellation_token.cancelled().await; + }) + .await + .unwrap() + }); }); } #[derive(Clone)] struct AppState { - shutdown: Sender<()>, + cancellation_token: CancellationToken, grovedb: Weak, sessions: Arc>>, } impl AppState { - async fn verify_running(&self) -> Result<(), AppError> { + fn verify_running(&self) -> Result<(), AppError> { if self.grovedb.strong_count() == 0 { - self.shutdown.send(()).await.ok(); + self.cancellation_token.cancel(); Err(AppError::Closed) } else { Ok(()) @@ -124,7 +144,7 @@ impl AppState { } async fn get_snapshot(&self, id: SessionId) -> Result, AppError> { - self.verify_running().await?; + self.verify_running()?; let mut lock = self.sessions.write().await; if let Some(session) = lock.get_mut(&id) { session.last_access = Instant::now(); @@ -146,10 +166,11 @@ struct Session { impl Session { fn new(grovedb: &GroveDb) -> Result { let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?; + let path = tempdir.path().join("grovedbg_session"); grovedb - .create_checkpoint(tempdir.path()) + .create_checkpoint(&path) .map_err(|e| AppError::Any(e.to_string()))?; - let snapshot = GroveDb::open(tempdir.path()).map_err(|e| AppError::Any(e.to_string()))?; + let snapshot = GroveDb::open(path).map_err(|e| AppError::Any(e.to_string()))?; Ok(Session { last_access: Instant::now(), _tempdir: tempdir, From a4fffb66fad249f137e21bd5454aa09651b0e9fa Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Wed, 18 Sep 2024 11:52:22 +0200 Subject: [PATCH 3/5] typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index d55137b1..ee88b547 100644 --- a/README.md +++ b/README.md @@ -242,7 +242,7 @@ let's say 10000, the following snippet should do: ```rust let db = Arc::new(GroveDb::open("db").unwrap()); - db.start_visualzier(10000); + db.start_visualizer(10000); ``` Just remember to use Arc because the HTTP server might outlast the GroveDB instance. From 180aeb73b76a06cce35806f58cd3831cf0ea9aba Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Tue, 24 Sep 2024 15:26:40 +0200 Subject: [PATCH 4/5] update grovedbg version --- grovedb/build.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/grovedb/build.rs b/grovedb/build.rs index ba614eff..9c0441b7 100644 --- a/grovedb/build.rs +++ b/grovedb/build.rs @@ -6,8 +6,8 @@ fn main() { use sha2::{digest::FixedOutput, Digest, Sha256}; const GROVEDBG_SHA256: [u8; 32] = - hex!("4c05d47613b1a68caaf5229e460720d7958b2fe0148fd0f5edd04f827052db64"); - const GROVEDBG_VERSION: &str = "v1.0.0"; + hex!("5a1ee5a3033190974f580e41047ef9267ba03fafe0a70bbcf7878c1bb4f6126d"); + const GROVEDBG_VERSION: &str = "v1.1.0"; let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap()); let grovedbg_zip_path = out_dir.join("grovedbg.zip"); From 3bddcffa7b47de29d6d1295ec7641a3324af9e88 Mon Sep 17 00:00:00 2001 From: Evgeny Fomin Date: Tue, 24 Sep 2024 15:39:23 +0200 Subject: [PATCH 5/5] lock versions' patch --- grovedb/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/grovedb/Cargo.toml b/grovedb/Cargo.toml index e8c009aa..680a0e15 100644 --- a/grovedb/Cargo.toml +++ b/grovedb/Cargo.toml @@ -27,14 +27,14 @@ indexmap = "2.2.6" intmap = { version = "2.0.0", optional = true } grovedb-path = { version = "2.0.3", path = "../path" } grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true } -tokio = { version = "1.37", features = ["rt-multi-thread", "net"], optional = true } -axum = { version = "0.7", features = ["macros"], optional = true } -tower-http = { version = "0.5", features = ["fs"], optional = true } +tokio = { version = "1.40.0", features = ["rt-multi-thread", "net"], optional = true } +axum = { version = "0.7.5", features = ["macros"], optional = true } +tower-http = { version = "0.5.2", features = ["fs"], optional = true } blake3 = "1.4.0" bitvec = "1" zip-extensions = { version ="0.6.2", optional = true } grovedb-version = { path = "../grovedb-version", version = "2.0.3" } -tokio-util = { version = "0.7", optional = true } +tokio-util = { version = "0.7.12", optional = true } [dev-dependencies] rand = "0.8.5"