Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GroveDBG v1.1.0 #340

Merged
merged 5 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ let's say 10000, the following snippet should do:

```rust
let db = Arc::new(GroveDb::open("db").unwrap());
db.start_visualzier(10000);
db.start_visualizer(10000);
```

Just remember to use Arc because the HTTP server might outlast the GroveDB instance.
Expand Down
8 changes: 5 additions & 3 deletions grovedb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ indexmap = "2.2.6"
intmap = { version = "2.0.0", optional = true }
grovedb-path = { version = "2.0.3", path = "../path" }
grovedbg-types = { version = "2.0.3", path = "../grovedbg-types", optional = true }
tokio = { version = "1.37.0", features = ["rt-multi-thread", "net"], optional = true }
axum = { version = "0.7.5", features = ["macros"], optional = true }
tower-http = { version = "0.5.2", features = ["fs"], optional = true }
tokio = { version = "1.37", features = ["rt-multi-thread", "net"], optional = true }
axum = { version = "0.7", features = ["macros"], optional = true }
tower-http = { version = "0.5", features = ["fs"], optional = true }
fominok marked this conversation as resolved.
Show resolved Hide resolved
blake3 = "1.4.0"
bitvec = "1"
zip-extensions = { version ="0.6.2", optional = true }
grovedb-version = { path = "../grovedb-version", version = "2.0.3" }
tokio-util = { version = "0.7", optional = true }
fominok marked this conversation as resolved.
Show resolved Hide resolved

[dev-dependencies]
rand = "0.8.5"
Expand Down Expand Up @@ -74,6 +75,7 @@ estimated_costs = ["full"]
grovedbg = [
"grovedbg-types",
"tokio",
"tokio-util",
"full",
"grovedb-merk/grovedbg",
"axum",
Expand Down
4 changes: 2 additions & 2 deletions grovedb/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ fn main() {
use sha2::{digest::FixedOutput, Digest, Sha256};

const GROVEDBG_SHA256: [u8; 32] =
hex!("ea7d9258973aa765eaf5064451fc83efa22e0ce6eaf2938507e2703571364e35");
const GROVEDBG_VERSION: &str = "v1.0.0-rc.6";
hex!("5a1ee5a3033190974f580e41047ef9267ba03fafe0a70bbcf7878c1bb4f6126d");
const GROVEDBG_VERSION: &str = "v1.1.0";

let out_dir = PathBuf::from(&env::var_os("OUT_DIR").unwrap());
let grovedbg_zip_path = out_dir.join("grovedbg.zip");
Expand Down
217 changes: 171 additions & 46 deletions grovedb/src/debugger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
//! GroveDB debugging support module.

use std::{collections::BTreeMap, fs, sync::Weak};
use std::{
collections::{BTreeMap, HashMap},
fs,
sync::{Arc, Weak},
time::{Duration, Instant, SystemTime},
};

use axum::{extract::State, http::StatusCode, response::IntoResponse, routing::post, Json, Router};
use grovedb_merk::{
Expand All @@ -11,14 +16,19 @@ use grovedb_merk::{
use grovedb_path::SubtreePath;
use grovedb_version::version::GroveVersion;
use grovedbg_types::{
MerkProofNode, MerkProofOp, NodeFetchRequest, NodeUpdate, Path, PathQuery, Query, QueryItem,
SizedQuery, SubqueryBranch,
DropSessionRequest, MerkProofNode, MerkProofOp, NewSessionResponse, NodeFetchRequest,
NodeUpdate, Path, PathQuery, Query, QueryItem, SessionId, SizedQuery, SubqueryBranch,
WithSession,
};
use indexmap::IndexMap;
use tempfile::tempdir;
use tokio::{
net::ToSocketAddrs,
sync::mpsc::{self, Sender},
select,
sync::{RwLock, RwLockReadGuard},
time::sleep,
};
use tokio_util::sync::CancellationToken;
use tower_http::services::ServeDir;

use crate::{
Expand All @@ -30,6 +40,8 @@ use crate::{

const GROVEDBG_ZIP: &[u8] = include_bytes!(concat!(env!("OUT_DIR"), "/grovedbg.zip"));

const SESSION_TIMEOUT: Duration = Duration::from_secs(60 * 10);

pub(super) fn start_visualizer<A>(grovedb: Weak<GroveDb>, addr: A)
where
A: ToSocketAddrs + Send + 'static,
Expand All @@ -44,33 +56,132 @@ where
zip_extensions::read::zip_extract(&grovedbg_zip, &grovedbg_www)
.expect("cannot extract grovedbg contents");

let (shutdown_send, mut shutdown_receive) = mpsc::channel::<()>(1);
let cancellation_token = CancellationToken::new();

let state: AppState = AppState {
cancellation_token: cancellation_token.clone(),
grovedb,
sessions: Default::default(),
};

let app = Router::new()
.route("/new_session", post(new_session))
.route("/drop_session", post(drop_session))
.route("/fetch_node", post(fetch_node))
.route("/fetch_root_node", post(fetch_root_node))
.route("/prove_path_query", post(prove_path_query))
.route("/fetch_with_path_query", post(fetch_with_path_query))
.fallback_service(ServeDir::new(grovedbg_www))
.with_state((shutdown_send, grovedb));

tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("can't bind visualizer port");
axum::serve(listener, app)
.with_graceful_shutdown(async move {
shutdown_receive.recv().await;
})
.await
.unwrap()
});
.with_state(state.clone());

let rt = tokio::runtime::Runtime::new().unwrap();

let cloned_cancellation_token = cancellation_token.clone();
rt.spawn(async move {
loop {
select! {
_ = cloned_cancellation_token.cancelled() => break,
_ = sleep(Duration::from_secs(10)) => {
let now = Instant::now();
let mut lock = state.sessions.write().await;
let to_delete: Vec<SessionId> = lock.iter().filter_map(
|(id, session)| (session.last_access < now - SESSION_TIMEOUT).then_some(*id)
).collect();

to_delete.into_iter().for_each(|id| { lock.remove(&id); });
}
}
}
});

rt.block_on(async move {
let listener = tokio::net::TcpListener::bind(addr)
.await
.expect("can't bind visualizer port");
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancellation_token.cancelled().await;
})
.await
.unwrap()
});
});
}

#[derive(Clone)]
struct AppState {
cancellation_token: CancellationToken,
grovedb: Weak<GroveDb>,
sessions: Arc<RwLock<HashMap<SessionId, Session>>>,
}

impl AppState {
fn verify_running(&self) -> Result<(), AppError> {
if self.grovedb.strong_count() == 0 {
self.cancellation_token.cancel();
Err(AppError::Closed)
} else {
Ok(())
}
}

async fn new_session(&self) -> Result<SessionId, AppError> {
let grovedb = self.grovedb.upgrade().ok_or(AppError::Closed)?;
let id = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time went backwards")
.as_secs();
self.sessions
.write()
.await
.insert(id, Session::new(&grovedb)?);

Ok(id)
}

async fn drop_session(&self, id: SessionId) {
self.sessions.write().await.remove(&id);
}

async fn get_snapshot(&self, id: SessionId) -> Result<RwLockReadGuard<GroveDb>, AppError> {
self.verify_running()?;
let mut lock = self.sessions.write().await;
if let Some(session) = lock.get_mut(&id) {
session.last_access = Instant::now();
Ok(RwLockReadGuard::map(lock.downgrade(), |l| {
&l.get(&id).as_ref().expect("checked above").snapshot
}))
} else {
Err(AppError::NoSession)
}
}
}

struct Session {
last_access: Instant,
_tempdir: tempfile::TempDir,
snapshot: GroveDb,
}

impl Session {
fn new(grovedb: &GroveDb) -> Result<Self, AppError> {
let tempdir = tempdir().map_err(|e| AppError::Any(e.to_string()))?;
let path = tempdir.path().join("grovedbg_session");
grovedb
.create_checkpoint(&path)
.map_err(|e| AppError::Any(e.to_string()))?;
let snapshot = GroveDb::open(path).map_err(|e| AppError::Any(e.to_string()))?;
Ok(Session {
last_access: Instant::now(),
_tempdir: tempdir,
snapshot,
})
}
}

enum AppError {
Closed,
NoSession,
Any(String),
}

Expand All @@ -80,6 +191,9 @@ impl IntoResponse for AppError {
AppError::Closed => {
(StatusCode::SERVICE_UNAVAILABLE, "GroveDB is closed").into_response()
}
AppError::NoSession => {
(StatusCode::UNAUTHORIZED, "No session with this id").into_response()
}
AppError::Any(e) => (StatusCode::INTERNAL_SERVER_ERROR, e).into_response(),
}
}
Expand All @@ -91,16 +205,28 @@ impl<E: std::error::Error> From<E> for AppError {
}
}

async fn new_session(State(state): State<AppState>) -> Result<Json<NewSessionResponse>, AppError> {
Ok(Json(NewSessionResponse {
session_id: state.new_session().await?,
}))
}

async fn drop_session(
State(state): State<AppState>,
Json(DropSessionRequest { session_id }): Json<DropSessionRequest>,
) {
state.drop_session(session_id).await;
}

async fn fetch_node(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(NodeFetchRequest { path, key }): Json<NodeFetchRequest>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: NodeFetchRequest { path, key },
}): Json<WithSession<NodeFetchRequest>>,
) -> Result<Json<Option<NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

// todo: GroveVersion::latest() to actual version
let merk = db
.open_non_transactional_merk_at_path(path.as_slice().into(), None, GroveVersion::latest())
.unwrap()?;
Expand All @@ -115,14 +241,14 @@ async fn fetch_node(
}

async fn fetch_root_node(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: (),
}): Json<WithSession<()>>,
) -> Result<Json<Option<NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

// todo: GroveVersion::latest() to actual version
let merk = db
.open_non_transactional_merk_at_path(SubtreePath::empty(), None, GroveVersion::latest())
.unwrap()?;
Expand All @@ -138,13 +264,13 @@ async fn fetch_root_node(
}

async fn prove_path_query(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(json_path_query): Json<PathQuery>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: json_path_query,
}): Json<WithSession<PathQuery>>,
) -> Result<Json<grovedbg_types::Proof>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

let path_query = path_query_to_grovedb(json_path_query);

Expand All @@ -155,13 +281,13 @@ async fn prove_path_query(
}

async fn fetch_with_path_query(
State((shutdown, grovedb)): State<(Sender<()>, Weak<GroveDb>)>,
Json(json_path_query): Json<PathQuery>,
State(state): State<AppState>,
Json(WithSession {
session_id,
request: json_path_query,
}): Json<WithSession<PathQuery>>,
) -> Result<Json<Vec<grovedbg_types::NodeUpdate>>, AppError> {
let Some(db) = grovedb.upgrade() else {
shutdown.send(()).await.ok();
return Err(AppError::Closed);
};
let db = state.get_snapshot(session_id).await?;

let path_query = path_query_to_grovedb(json_path_query);

Expand Down Expand Up @@ -487,7 +613,6 @@ fn node_to_update(
feature_type,
}: NodeDbg,
) -> Result<NodeUpdate, crate::Error> {
// todo: GroveVersion::latest() to actual version
let grovedb_element = crate::Element::deserialize(&value, GroveVersion::latest())?;

let element = element_to_grovedbg(grovedb_element);
Expand Down
17 changes: 17 additions & 0 deletions grovedbg-types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,23 @@ use serde_with::{base64::Base64, serde_as};
pub type Key = Vec<u8>;
pub type Path = Vec<PathSegment>;
pub type PathSegment = Vec<u8>;
pub type SessionId = u64;

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct WithSession<R> {
pub session_id: SessionId,
pub request: R,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct NewSessionResponse {
pub session_id: SessionId,
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DropSessionRequest {
pub session_id: SessionId,
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
Expand Down
Loading