Skip to content

Commit

Permalink
feat: impl checkpoint for SnapShot
Browse files Browse the repository at this point in the history
  • Loading branch information
KKould committed Nov 14, 2024
1 parent f3d29c6 commit 532e0cd
Show file tree
Hide file tree
Showing 14 changed files with 424 additions and 93 deletions.
6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,19 +58,19 @@ crc32fast = "1"
crossbeam-skiplist = "0.1"
datafusion = { version = "42", optional = true }
flume = { version = "0.11", features = ["async"] }
fusio = { package = "fusio", version = "0.3.3", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "e94f40271498364a12e5dc9f08bec965a63a5b2f", package = "fusio", version = "0.3.3", features = [
"aws",
"dyn",
"fs",
"object_store",
"tokio",
"tokio-http",
] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.1", features = [
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "e94f40271498364a12e5dc9f08bec965a63a5b2f", package = "fusio-dispatch", version = "0.2.1", features = [
"aws",
"tokio",
] }
fusio-parquet = { package = "fusio-parquet", version = "0.2.1" }
fusio-parquet = { git = "https://github.com/tonbo-io/fusio.git", rev = "e94f40271498364a12e5dc9f08bec965a63a5b2f", package = "fusio-parquet", version = "0.2.1" }
futures-core = "0.3"
futures-io = "0.3"
futures-util = "0.3"
Expand Down
4 changes: 2 additions & 2 deletions bindings/python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ crate-type = ["cdylib"]
[workspace]

[dependencies]
fusio = { package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { package = "fusio-dispatch", version = "0.2.0", features = [
fusio = { git = "https://github.com/tonbo-io/fusio.git", rev = "e94f40271498364a12e5dc9f08bec965a63a5b2f", package = "fusio", version = "0.3.1", features = ["aws", "tokio"] }
fusio-dispatch = { git = "https://github.com/tonbo-io/fusio.git", rev = "e94f40271498364a12e5dc9f08bec965a63a5b2f", package = "fusio-dispatch", version = "0.2.0", features = [
"aws",
"tokio",
] }
Expand Down
2 changes: 1 addition & 1 deletion src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ pub(crate) mod tests {
where
R: Record + Send,
{
let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable: Mutable<R> = Mutable::new(option, trigger, fs).await?;

Expand Down
2 changes: 1 addition & 1 deletion src/inmem/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
timestamp::{Timestamp, Timestamped, TimestampedRef, EPOCH},
};

pub trait ArrowArrays: Sized + Sync {
pub trait ArrowArrays: Sized + Send + Sync {
type Record: Record;

type Builder: Builder<Self>;
Expand Down
18 changes: 13 additions & 5 deletions src/inmem/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ where
{
pub(crate) data: SkipMap<Timestamped<R::Key>, Option<R>>,
wal: Option<Mutex<WalFile<Box<dyn DynWrite>, R>>>,
pub(crate) trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
pub(crate) trigger: Arc<dyn Trigger<R> + Send + Sync>,
}

impl<R> Mutable<R>
Expand All @@ -47,7 +47,7 @@ where
{
pub async fn new(
option: &DbOption<R>,
trigger: Arc<Box<dyn Trigger<R> + Send + Sync>>,
trigger: Arc<dyn Trigger<R> + Send + Sync>,
fs: &Arc<dyn DynFs>,
) -> Result<Self, fusio::Error> {
let mut wal = None;
Expand Down Expand Up @@ -193,6 +193,14 @@ where
}
Ok(())
}

pub(crate) async fn wal_id(&self) -> Option<FileId> {
if let Some(wal) = self.wal.as_ref() {
let wal_guard = wal.lock().await;
return Some(wal_guard.file_id());
}
None
}
}

impl<R> Mutable<R>
Expand Down Expand Up @@ -231,7 +239,7 @@ mod tests {
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);
let mem_table = Mutable::<Test>::new(&option, trigger, &fs).await.unwrap();

mem_table
Expand Down Expand Up @@ -279,7 +287,7 @@ mod tests {
let option = DbOption::from(Path::from_filesystem_path(temp_dir.path()).unwrap());
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable = Mutable::<String>::new(&option, trigger, &fs).await.unwrap();

Expand Down Expand Up @@ -367,7 +375,7 @@ mod tests {
let fs = Arc::new(TokioFs) as Arc<dyn DynFs>;
fs.create_dir_all(&option.wal_dir_path()).await.unwrap();

let trigger = Arc::new(TriggerFactory::create(option.trigger_type));
let trigger = TriggerFactory::create(option.trigger_type);

let mutable = Mutable::<DynRecord>::new(&option, trigger, &fs)
.await
Expand Down
Loading

0 comments on commit 532e0cd

Please sign in to comment.