From 67ced7dffd4fb86f0033720791aa4bb391df7c24 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 14 Jan 2025 18:18:05 +0800 Subject: [PATCH 1/3] refactor: replace log with fusio-log --- Cargo.toml | 5 +- src/compaction/mod.rs | 2 + src/inmem/mutable.rs | 49 ++++---- src/lib.rs | 89 +++++++------- src/record/key/mod.rs | 3 +- src/record/mod.rs | 6 +- src/record/runtime/record.rs | 6 +- src/record/runtime/record_ref.rs | 2 +- src/record/runtime/value.rs | 6 +- src/scope.rs | 6 +- src/serdes/arc.rs | 65 ----------- src/serdes/boolean.rs | 58 ---------- src/serdes/bytes.rs | 73 ------------ src/serdes/list.rs | 63 ---------- src/serdes/mod.rs | 102 ---------------- src/serdes/num.rs | 98 ---------------- src/serdes/option.rs | 109 ------------------ src/serdes/string.rs | 79 ------------- src/timestamp/mod.rs | 2 +- src/timestamp/timestamped.rs | 6 +- src/version/edit.rs | 37 +++--- src/version/mod.rs | 4 +- src/version/set.rs | 99 ++++++++-------- src/wal/checksum.rs | 104 ----------------- src/wal/log.rs | 116 ++++++++++++++----- src/wal/mod.rs | 192 +++++++++++++++---------------- src/wal/record_entry.rs | 106 ----------------- tests/macros_correctness.rs | 2 +- tonbo_macros/src/record.rs | 6 +- 29 files changed, 353 insertions(+), 1142 deletions(-) delete mode 100644 src/serdes/arc.rs delete mode 100644 src/serdes/boolean.rs delete mode 100644 src/serdes/bytes.rs delete mode 100644 src/serdes/list.rs delete mode 100644 src/serdes/mod.rs delete mode 100644 src/serdes/num.rs delete mode 100644 src/serdes/option.rs delete mode 100644 src/serdes/string.rs delete mode 100644 src/wal/checksum.rs delete mode 100644 src/wal/record_entry.rs diff --git a/Cargo.toml b/Cargo.toml index a61c0eda..b4bacc1d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ version = "0.2.0" msrv = "1.79.0" [features] -aws = ["fusio-dispatch/aws", "fusio/aws"] +aws = ["fusio-dispatch/aws", "fusio/aws", "fusio-log/aws"] bench = ["redb", "rocksdb", "sled"] bytes = ["dep:bytes"] datafusion = ["dep:async-trait", "dep:datafusion"] @@ -24,6 +24,7 @@ object-store = ["fusio/object_store"] opfs = [ "dep:wasm-bindgen-futures", "fusio-dispatch/opfs", + "fusio-log/web", "fusio-parquet/web", "fusio/opfs", ] @@ -33,6 +34,7 @@ rocksdb = ["dep:rocksdb"] sled = ["dep:sled"] tokio = [ "fusio-dispatch/tokio", + "fusio-log/tokio", "fusio-parquet/tokio", "fusio/tokio", "parquet/default", @@ -86,6 +88,7 @@ fusio = { version = "0.3.4", features = [ "fs", ] } fusio-dispatch = "0.3.4" +fusio-log = {git = "https://github.com/tonbo-io/fusio-log", default-features = false, features = ["bytes"]} fusio-parquet = "0.3.4" futures-core = "0.3" futures-io = "0.3" diff --git a/src/compaction/mod.rs b/src/compaction/mod.rs index c6e1c0c4..acab5866 100644 --- a/src/compaction/mod.rs +++ b/src/compaction/mod.rs @@ -543,6 +543,8 @@ where Fusio(#[from] fusio::Error), #[error("compaction version error: {0}")] Version(#[from] VersionError), + #[error("compaction logger error: {0}")] + Logger(#[from] fusio_log::error::LogError), #[error("compaction channel is closed")] ChannelClose, #[error("database error: {0}")] diff --git a/src/inmem/mutable.rs b/src/inmem/mutable.rs index 0212bf35..790e3066 100644 --- a/src/inmem/mutable.rs +++ b/src/inmem/mutable.rs @@ -1,22 +1,25 @@ -use std::{intrinsics::transmute, ops::Bound, sync::Arc}; +use std::{ops::Bound, sync::Arc}; use async_lock::Mutex; use crossbeam_skiplist::{ map::{Entry, Range}, SkipMap, }; -use fusio::{buffered::BufWriter, DynFs, DynWrite}; +use fusio::DynFs; use crate::{ - fs::{generate_file_id, FileId, FileType}, + fs::{generate_file_id, FileId}, inmem::immutable::Immutable, - record::{Key, KeyRef, Record, Schema}, + record::{KeyRef, Record, Schema}, timestamp::{ timestamped::{Timestamped, TimestampedRef}, Timestamp, EPOCH, }, trigger::Trigger, - wal::{log::LogType, WalFile}, + wal::{ + log::{Log, LogType}, + WalFile, + }, DbError, DbOption, }; @@ -36,7 +39,7 @@ where R: Record, { pub(crate) data: SkipMap::Key>, Option>, - wal: Option, R>>>, + wal: Option>>, pub(crate) trigger: Arc + Send + Sync>>, pub(super) schema: Arc, @@ -56,13 +59,15 @@ where if option.use_wal { let file_id = generate_file_id(); - let file = Box::new(BufWriter::new( - fs.open_options(&option.wal_path(file_id), FileType::Wal.open_options(false)) - .await?, - option.wal_buffer_size, - )) as Box; - - wal = Some(Mutex::new(WalFile::new(file, file_id))); + wal = Some(Mutex::new( + WalFile::::new( + fs.clone(), + option.wal_path(file_id), + option.wal_buffer_size, + file_id, + ) + .await, + )); }; Ok(Self { @@ -106,21 +111,18 @@ where ) -> Result> { let timestamped_key = Timestamped::new(key, ts); - if let (Some(log_ty), Some(wal)) = (log_ty, &self.wal) { + let record_entry = Log::new(timestamped_key, value, log_ty); + if let (Some(_log_ty), Some(wal)) = (log_ty, &self.wal) { let mut wal_guard = wal.lock().await; wal_guard - .write( - log_ty, - timestamped_key.map(|key| unsafe { transmute(key.as_key_ref()) }), - value.as_ref().map(R::as_record_ref), - ) + .write(&record_entry) .await .map_err(|e| DbError::WalWrite(Box::new(e)))?; } - let is_exceeded = self.trigger.item(&value); - self.data.insert(timestamped_key, value); + let is_exceeded = self.trigger.item(&record_entry.value); + self.data.insert(record_entry.key, record_entry.value); Ok(is_exceeded) } @@ -176,7 +178,10 @@ where pub(crate) async fn into_immutable( self, - ) -> Result<(Option, Immutable<::Columns>), fusio::Error> { + ) -> Result< + (Option, Immutable<::Columns>), + fusio_log::error::LogError, + > { let mut file_id = None; if let Some(wal) = self.wal { diff --git a/src/lib.rs b/src/lib.rs index a89799a3..dedf544f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -126,7 +126,6 @@ mod ondisk; pub mod option; pub mod record; mod scope; -pub mod serdes; pub mod snapshot; pub mod stream; pub mod timestamp; @@ -135,15 +134,14 @@ mod trigger; mod version; mod wal; -use std::{ - collections::HashMap, io, io::Cursor, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc, -}; +use std::{collections::HashMap, io, marker::PhantomData, mem, ops::Bound, pin::pin, sync::Arc}; pub use arrow; use async_lock::RwLock; use async_stream::stream; use flume::{bounded, Sender}; use fs::FileId; +use fusio_log::Decode; use futures_core::Stream; use futures_util::StreamExt; use inmem::{immutable::Immutable, mutable::Mutable}; @@ -163,6 +161,7 @@ use tokio::sync::oneshot; pub use tonbo_macros::{KeyAttributes, Record}; use tracing::error; use transaction::{CommitError, Transaction, TransactionEntry}; +use wal::log::Log; pub use crate::option::*; use crate::{ @@ -170,13 +169,11 @@ use crate::{ executor::Executor, fs::{manager::StoreManager, parse_file_id, FileType}, record::Schema, - serdes::Decode, snapshot::Snapshot, stream::{ mem_projection::MemProjectionStream, merge::MergeStream, package::PackageStream, Entry, ScanStream, }, - timestamp::Timestamped, trigger::{Trigger, TriggerFactory}, version::{cleaner::Cleaner, set::VersionSet, TransactionTs, Version, VersionError}, wal::{log::LogType, RecoverError, WalFile}, @@ -522,49 +519,53 @@ where for wal_meta in wal_metas { let wal_path = wal_meta.path; - let file = base_fs - .open_options(&wal_path, FileType::Wal.open_options(false)) - .await?; // SAFETY: wal_stream return only file name let wal_id = parse_file_id(&wal_path, FileType::Wal)?.unwrap(); - let mut wal = WalFile::new(Cursor::new(file), wal_id); wal_ids.push(wal_id); - let mut recover_stream = pin!(wal.recover()); + let mut recover_stream = + pin!(WalFile::::recover(option.base_fs.clone(), wal_path).await); while let Some(record) = recover_stream.next().await { - let (log_type, Timestamped { ts, value: key }, value_option) = record?; - - let is_excess = match log_type { - LogType::Full => { - schema - .recover_append(key, version_set.increase_ts(), value_option) - .await? - } - LogType::First => { - transaction_map.insert(ts, vec![(key, value_option)]); - false - } - LogType::Middle => { - transaction_map - .get_mut(&ts) - .unwrap() - .push((key, value_option)); - false - } - LogType::Last => { - let mut is_excess = false; - let mut records = transaction_map.remove(&ts).unwrap(); - records.push((key, value_option)); - - let ts = version_set.increase_ts(); - for (key, value_option) in records { - is_excess = schema.recover_append(key, ts, value_option).await?; + let record_batch = record?; + + for entry in record_batch { + let Log { + key, + value, + log_type, + } = entry; + let ts = key.ts; + let key = key.value; + + let is_excess = match log_type.unwrap() { + LogType::Full => { + schema + .recover_append(key, version_set.increase_ts(), value) + .await? } - is_excess + LogType::First => { + transaction_map.insert(ts, vec![(key, value)]); + false + } + LogType::Middle => { + transaction_map.get_mut(&ts).unwrap().push((key, value)); + false + } + LogType::Last => { + let mut is_excess = false; + let mut records = transaction_map.remove(&ts).unwrap(); + records.push((key, value)); + + let ts = version_set.increase_ts(); + for (key, value_option) in records { + is_excess = schema.recover_append(key, ts, value_option).await?; + } + is_excess + } + }; + if is_excess { + let _ = schema.compaction_tx.try_send(CompactTask::Freeze); } - }; - if is_excess { - let _ = schema.compaction_tx.try_send(CompactTask::Freeze); } } } @@ -882,6 +883,8 @@ where WalWrite(Box), #[error("exceeds the maximum level(0-6)")] ExceedsMaxLevel, + #[error("write log error: {0}")] + Logger(#[from] fusio_log::error::LogError), } type LockMap = Arc>; @@ -909,6 +912,7 @@ pub(crate) mod tests { use flume::{bounded, Receiver}; use fusio::{disk::TokioFs, path::Path, DynFs, SeqRead, Write}; use fusio_dispatch::FsOptions; + use fusio_log::{Decode, Encode}; use futures::StreamExt; use parquet::arrow::ProjectionMask; use parquet_lru::NoCache; @@ -926,7 +930,6 @@ pub(crate) mod tests { Datatype, DynRecord, Key, RecordDecodeError, RecordEncodeError, RecordRef, Schema as RecordSchema, Value, }, - serdes::{Decode, Encode}, trigger::{TriggerFactory, TriggerType}, version::{cleaner::Cleaner, set::tests::build_version_set, Version}, wal::log::LogType, diff --git a/src/record/key/mod.rs b/src/record/key/mod.rs index e69051fb..c0f61246 100644 --- a/src/record/key/mod.rs +++ b/src/record/key/mod.rs @@ -4,8 +4,7 @@ mod str; use std::{hash::Hash, sync::Arc}; use arrow::array::Datum; - -use crate::serdes::{Decode, Encode}; +use fusio_log::{Decode, Encode}; pub trait Key: 'static + Encode + Decode + Ord + Clone + Send + Sync + Hash + std::fmt::Debug diff --git a/src/record/mod.rs b/src/record/mod.rs index 416ea976..af7bbdaf 100644 --- a/src/record/mod.rs +++ b/src/record/mod.rs @@ -7,16 +7,14 @@ pub(crate) mod test; use std::{error::Error, fmt::Debug, io, sync::Arc}; use arrow::{array::RecordBatch, datatypes::Schema as ArrowSchema}; +use fusio_log::{Decode, Encode}; use internal::InternalRecordRef; pub use key::{Key, KeyRef}; use parquet::{arrow::ProjectionMask, format::SortingColumn, schema::types::ColumnPath}; pub use runtime::*; use thiserror::Error; -use crate::{ - inmem::immutable::ArrowArrays, - serdes::{Decode, Encode}, -}; +use crate::inmem::immutable::ArrowArrays; pub trait Schema: Debug + Send + Sync { type Record: Record; diff --git a/src/record/runtime/record.rs b/src/record/runtime/record.rs index 81b5e900..a8d5a72e 100644 --- a/src/record/runtime/record.rs +++ b/src/record/runtime/record.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use fusio::SeqRead; +use fusio_log::{Decode, Encode}; use super::{schema::DynSchema, Datatype, DynRecordRef, Value}; -use crate::{ - record::{Record, RecordDecodeError}, - serdes::{Decode, Encode}, -}; +use crate::record::{Record, RecordDecodeError}; #[derive(Debug)] pub struct DynRecord { diff --git a/src/record/runtime/record_ref.rs b/src/record/runtime/record_ref.rs index b129212f..cfea930b 100644 --- a/src/record/runtime/record_ref.rs +++ b/src/record/runtime/record_ref.rs @@ -8,12 +8,12 @@ use arrow::{ }, }; use fusio::Write; +use fusio_log::Encode; use super::{Datatype, DynRecord, Value}; use crate::{ magic::USER_COLUMN_OFFSET, record::{internal::InternalRecordRef, Key, Record, RecordEncodeError, RecordRef, Schema}, - serdes::Encode, }; #[derive(Clone)] diff --git a/src/record/runtime/value.rs b/src/record/runtime/value.rs index f1216b55..946ba3ef 100644 --- a/src/record/runtime/value.rs +++ b/src/record/runtime/value.rs @@ -8,12 +8,10 @@ use arrow::{ datatypes::{DataType, Field}, }; use fusio::{SeqRead, Write}; +use fusio_log::{Decode, DecodeError, Encode}; use super::Datatype; -use crate::{ - record::{Key, KeyRef}, - serdes::{option::DecodeError, Decode, Encode}, -}; +use crate::record::{Key, KeyRef}; #[derive(Debug, Clone)] pub struct ValueDesc { diff --git a/src/scope.rs b/src/scope.rs index 1f618119..ba54e273 100644 --- a/src/scope.rs +++ b/src/scope.rs @@ -1,11 +1,9 @@ use std::ops::Bound; use fusio::{SeqRead, Write}; +use fusio_log::{Decode, Encode}; -use crate::{ - fs::FileId, - serdes::{Decode, Encode}, -}; +use crate::fs::FileId; #[derive(Debug, Eq, PartialEq)] pub(crate) struct Scope { diff --git a/src/serdes/arc.rs b/src/serdes/arc.rs deleted file mode 100644 index f8104a28..00000000 --- a/src/serdes/arc.rs +++ /dev/null @@ -1,65 +0,0 @@ -use std::sync::Arc; - -use fusio::{SeqRead, Write}; - -use super::{Decode, Encode}; - -impl Decode for Arc -where - T: Decode, -{ - type Error = T::Error; - - async fn decode(reader: &mut R) -> Result - where - R: SeqRead, - { - Ok(Arc::from(T::decode(reader).await?)) - } -} - -impl Encode for Arc -where - T: Encode + Send + Sync, -{ - type Error = T::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - self.as_ref().encode(writer).await - } - - fn size(&self) -> usize { - Encode::size(self.as_ref()) - } -} - -#[cfg(test)] -mod tests { - use std::{io::Cursor, sync::Arc}; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source_0 = Arc::new(1u64); - let source_1 = Arc::new("Hello! Tonbo".to_string()); - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source_0.encode(&mut cursor).await.unwrap(); - source_1.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded_0 = Arc::::decode(&mut cursor).await.unwrap(); - let decoded_1 = Arc::::decode(&mut cursor).await.unwrap(); - - assert_eq!(source_0, decoded_0); - assert_eq!(source_1, decoded_1); - } -} diff --git a/src/serdes/boolean.rs b/src/serdes/boolean.rs deleted file mode 100644 index 8240a69e..00000000 --- a/src/serdes/boolean.rs +++ /dev/null @@ -1,58 +0,0 @@ -use std::mem::size_of; - -use fusio::{SeqRead, Write}; - -use crate::serdes::{Decode, Encode}; - -impl Encode for bool { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { - if *self { 1u8 } else { 0u8 }.encode(writer).await - } - - fn size(&self) -> usize { - size_of::() - } -} - -impl Decode for bool { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result { - Ok(u8::decode(reader).await? == 1u8) - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source_0 = true; - let source_1 = false; - let source_2 = true; - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source_0.encode(&mut cursor).await.unwrap(); - source_1.encode(&mut cursor).await.unwrap(); - source_2.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - - let decoded_0 = bool::decode(&mut cursor).await.unwrap(); - let decoded_1 = bool::decode(&mut cursor).await.unwrap(); - let decoded_2 = bool::decode(&mut cursor).await.unwrap(); - - assert_eq!(source_0, decoded_0); - assert_eq!(source_1, decoded_1); - assert_eq!(source_2, decoded_2); - } -} diff --git a/src/serdes/bytes.rs b/src/serdes/bytes.rs deleted file mode 100644 index fca06460..00000000 --- a/src/serdes/bytes.rs +++ /dev/null @@ -1,73 +0,0 @@ -use bytes::Bytes; -use fusio::{IoBuf, SeqRead, Write}; - -use crate::serdes::{Decode, Encode}; - -impl Encode for &[u8] { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { - (self.len() as u32).encode(writer).await?; - let (result, _) = writer.write_all(*self).await; - result?; - - Ok(()) - } - - fn size(&self) -> usize { - self.len() - } -} - -impl Encode for Bytes { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { - (self.len() as u32).encode(writer).await?; - let (result, _) = writer.write_all(self.as_slice()).await; - result?; - - Ok(()) - } - - fn size(&self) -> usize { - self.len() - } -} - -impl Decode for Bytes { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result { - let len = u32::decode(reader).await?; - let (result, buf) = reader.read_exact(vec![0u8; len as usize]).await; - result?; - - Ok(buf.as_bytes()) - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use bytes::Bytes; - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source = Bytes::from_static(b"hello! Tonbo"); - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded = Bytes::decode(&mut cursor).await.unwrap(); - - assert_eq!(source, decoded); - } -} diff --git a/src/serdes/list.rs b/src/serdes/list.rs deleted file mode 100644 index 039cf824..00000000 --- a/src/serdes/list.rs +++ /dev/null @@ -1,63 +0,0 @@ -use fusio::{SeqRead, Write}; - -use super::{Decode, Encode}; - -impl Decode for Vec { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result - where - R: SeqRead, - { - let len = u32::decode(reader).await?; - let (result, buf) = reader - .read_exact(vec![0u8; len as usize * size_of::()]) - .await; - result?; - - Ok(buf) - } -} - -impl Encode for Vec { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - (self.len() as u32).encode(writer).await?; - let (result, _) = writer.write_all(self.as_slice()).await; - result?; - - Ok(()) - } - - fn size(&self) -> usize { - size_of::() + size_of::() * self.len() - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_u8_encode_decode() { - let source = b"hello! Tonbo".to_vec(); - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded = Vec::::decode(&mut cursor).await.unwrap(); - - assert_eq!(source, decoded); - } -} diff --git a/src/serdes/mod.rs b/src/serdes/mod.rs deleted file mode 100644 index 6962ae54..00000000 --- a/src/serdes/mod.rs +++ /dev/null @@ -1,102 +0,0 @@ -mod arc; -mod boolean; -#[cfg(feature = "bytes")] -mod bytes; -mod list; -mod num; -pub(crate) mod option; -mod string; - -use std::future::Future; - -use fusio::{MaybeSend, SeqRead, Write}; - -pub trait Encode { - type Error: From + std::error::Error + Send + Sync + 'static; - - fn encode( - &self, - writer: &mut W, - ) -> impl Future> + MaybeSend - where - W: Write; - - fn size(&self) -> usize; -} - -impl Encode for &T { - type Error = T::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - Encode::encode(*self, writer).await - } - - fn size(&self) -> usize { - Encode::size(*self) - } -} - -pub trait Decode: Sized { - type Error: From + std::error::Error + Send + Sync + 'static; - - fn decode(reader: &mut R) -> impl Future> - where - R: SeqRead; -} - -#[cfg(test)] -mod tests { - use std::io; - - use tokio::io::AsyncSeekExt; - - use super::*; - - #[tokio::test] - async fn test_encode_decode() { - // Implement a simple struct that implements Encode and Decode - struct TestStruct(u32); - - impl Encode for TestStruct { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - self.0.encode(writer).await?; - - Ok(()) - } - - fn size(&self) -> usize { - std::mem::size_of::() - } - } - - impl Decode for TestStruct { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result - where - R: SeqRead, - { - Ok(TestStruct(u32::decode(reader).await?)) - } - } - - // Test encoding and decoding - let original = TestStruct(42); - let mut buf = Vec::new(); - let mut cursor = io::Cursor::new(&mut buf); - original.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded = TestStruct::decode(&mut cursor).await.unwrap(); - - assert_eq!(original.0, decoded.0); - } -} diff --git a/src/serdes/num.rs b/src/serdes/num.rs deleted file mode 100644 index c165005a..00000000 --- a/src/serdes/num.rs +++ /dev/null @@ -1,98 +0,0 @@ -use std::mem::size_of; - -use fusio::{SeqRead, Write}; - -use super::{Decode, Encode}; - -#[macro_export] -macro_rules! implement_encode_decode { - ($struct_name:ident) => { - impl Encode for $struct_name { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> { - let (result, _) = writer.write_all(&self.to_le_bytes()[..]).await; - result?; - - Ok(()) - } - - fn size(&self) -> usize { - size_of::() - } - } - - impl Decode for $struct_name { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result { - let mut bytes = [0u8; size_of::()]; - let (result, _) = reader.read_exact(&mut bytes[..]).await; - result?; - - Ok(Self::from_le_bytes(bytes)) - } - } - }; -} - -implement_encode_decode!(i8); -implement_encode_decode!(i16); -implement_encode_decode!(i32); -implement_encode_decode!(i64); -implement_encode_decode!(u8); -implement_encode_decode!(u16); -implement_encode_decode!(u32); -implement_encode_decode!(u64); - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source_0 = 8u8; - let source_1 = 16u16; - let source_2 = 32u32; - let source_3 = 64u64; - let source_4 = 8i8; - let source_5 = 16i16; - let source_6 = 32i32; - let source_7 = 64i64; - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source_0.encode(&mut cursor).await.unwrap(); - source_1.encode(&mut cursor).await.unwrap(); - source_2.encode(&mut cursor).await.unwrap(); - source_3.encode(&mut cursor).await.unwrap(); - source_4.encode(&mut cursor).await.unwrap(); - source_5.encode(&mut cursor).await.unwrap(); - source_6.encode(&mut cursor).await.unwrap(); - source_7.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded_0 = u8::decode(&mut cursor).await.unwrap(); - let decoded_1 = u16::decode(&mut cursor).await.unwrap(); - let decoded_2 = u32::decode(&mut cursor).await.unwrap(); - let decoded_3 = u64::decode(&mut cursor).await.unwrap(); - let decoded_4 = i8::decode(&mut cursor).await.unwrap(); - let decoded_5 = i16::decode(&mut cursor).await.unwrap(); - let decoded_6 = i32::decode(&mut cursor).await.unwrap(); - let decoded_7 = i64::decode(&mut cursor).await.unwrap(); - - assert_eq!(source_0, decoded_0); - assert_eq!(source_1, decoded_1); - assert_eq!(source_2, decoded_2); - assert_eq!(source_3, decoded_3); - assert_eq!(source_4, decoded_4); - assert_eq!(source_5, decoded_5); - assert_eq!(source_6, decoded_6); - assert_eq!(source_7, decoded_7); - } -} diff --git a/src/serdes/option.rs b/src/serdes/option.rs deleted file mode 100644 index 5fa660f8..00000000 --- a/src/serdes/option.rs +++ /dev/null @@ -1,109 +0,0 @@ -use std::io; - -use fusio::{SeqRead, Write}; -use thiserror::Error; - -use super::{Decode, Encode}; - -#[derive(Debug, Error)] -#[error("option encode error")] -pub enum EncodeError -where - E: std::error::Error, -{ - #[error("io error: {0}")] - Io(#[from] io::Error), - #[error("fusio error: {0}")] - Fusio(#[from] fusio::Error), - #[error("inner error: {0}")] - Inner(#[source] E), -} - -#[derive(Debug, Error)] -#[error("option decode error")] -pub enum DecodeError -where - E: std::error::Error, -{ - #[error("io error: {0}")] - Io(#[from] io::Error), - #[error("fusio error: {0}")] - Fusio(#[from] fusio::Error), - #[error("inner error: {0}")] - Inner(#[source] E), -} - -impl Encode for Option -where - V: Encode + Sync, -{ - type Error = EncodeError; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - match self { - None => 0u8.encode(writer).await?, - Some(v) => { - 1u8.encode(writer).await?; - v.encode(writer).await.map_err(EncodeError::Inner)?; - } - } - Ok(()) - } - - fn size(&self) -> usize { - match self { - None => 1, - Some(v) => 1 + v.size(), - } - } -} - -impl Decode for Option -where - V: Decode, -{ - type Error = DecodeError; - - async fn decode(reader: &mut R) -> Result { - match u8::decode(reader).await? { - 0 => Ok(None), - 1 => Ok(Some(V::decode(reader).await.map_err(DecodeError::Inner)?)), - _ => panic!("invalid option tag"), - } - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source_0 = Some(1u64); - let source_1 = None; - let source_2 = Some("Hello! Tonbo".to_string()); - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source_0.encode(&mut cursor).await.unwrap(); - source_1.encode(&mut cursor).await.unwrap(); - source_2.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded_0 = Option::::decode(&mut cursor).await.unwrap(); - let decoded_1 = Option::::decode(&mut cursor).await.unwrap(); - let decoded_2 = Option::::decode(&mut cursor).await.unwrap(); - - assert_eq!(source_0, decoded_0); - assert_eq!(source_1, decoded_1); - assert_eq!(source_2, decoded_2); - } -} diff --git a/src/serdes/string.rs b/src/serdes/string.rs deleted file mode 100644 index 7c65f833..00000000 --- a/src/serdes/string.rs +++ /dev/null @@ -1,79 +0,0 @@ -use std::mem::size_of; - -use fusio::{SeqRead, Write}; - -use super::{Decode, Encode}; - -impl<'r> Encode for &'r str { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - (self.len() as u16).encode(writer).await?; - let (result, _) = writer.write_all(self.as_bytes()).await; - result?; - - Ok(()) - } - - fn size(&self) -> usize { - size_of::() + self.len() - } -} - -impl Encode for String { - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - self.as_str().encode(writer).await - } - - fn size(&self) -> usize { - self.as_str().size() - } -} - -impl Decode for String { - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result { - let len = u16::decode(reader).await?; - let (result, buf) = reader.read_exact(vec![0u8; len as usize]).await; - result?; - - Ok(unsafe { String::from_utf8_unchecked(buf.as_slice().to_vec()) }) - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::serdes::{Decode, Encode}; - - #[tokio::test] - async fn test_encode_decode() { - let source_0 = "Hello! World"; - let source_1 = "Hello! Tonbo".to_string(); - - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - source_0.encode(&mut cursor).await.unwrap(); - source_1.encode(&mut cursor).await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decoded_0 = String::decode(&mut cursor).await.unwrap(); - let decoded_1 = String::decode(&mut cursor).await.unwrap(); - - assert_eq!(source_0, decoded_0); - assert_eq!(source_1, decoded_1); - } -} diff --git a/src/timestamp/mod.rs b/src/timestamp/mod.rs index 54ca09a7..8cc78ab2 100644 --- a/src/timestamp/mod.rs +++ b/src/timestamp/mod.rs @@ -5,9 +5,9 @@ use arrow::{ datatypes::UInt32Type, }; use fusio::{SeqRead, Write}; +use fusio_log::{Decode, Encode}; pub(crate) use self::timestamped::*; -use crate::serdes::{Decode, Encode}; #[repr(transparent)] #[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Hash)] diff --git a/src/timestamp/timestamped.rs b/src/timestamp/timestamped.rs index 05138e8d..54c60e3c 100644 --- a/src/timestamp/timestamped.rs +++ b/src/timestamp/timestamped.rs @@ -1,11 +1,9 @@ use std::{borrow::Borrow, cmp::Ordering, marker::PhantomData, mem::size_of, ptr}; use fusio::{SeqRead, Write}; +use fusio_log::{Decode, Encode}; -use crate::{ - serdes::{Decode, Encode}, - timestamp::Timestamp, -}; +use crate::timestamp::Timestamp; #[derive(PartialEq, Eq, Debug, Clone)] pub struct Timestamped { diff --git a/src/version/edit.rs b/src/version/edit.rs index bc51a8e9..603c5958 100644 --- a/src/version/edit.rs +++ b/src/version/edit.rs @@ -1,13 +1,10 @@ use std::mem::size_of; use fusio::{SeqRead, Write}; +use fusio_log::{Decode, Encode, Options, Path}; +use futures_util::TryStreamExt; -use crate::{ - fs::FileId, - scope::Scope, - serdes::{Decode, Encode}, - timestamp::Timestamp, -}; +use crate::{fs::FileId, scope::Scope, timestamp::Timestamp}; #[derive(Debug, Clone, Eq, PartialEq)] pub(crate) enum VersionEdit { @@ -21,11 +18,19 @@ impl VersionEdit where K: Decode, { - pub(crate) async fn recover(reader: &mut R) -> Vec> { - let mut edits = Vec::new(); - - while let Ok(edit) = VersionEdit::decode(reader).await { - edits.push(edit) + pub(crate) async fn recover(path: Path) -> Vec> { + let mut edits = vec![]; + + let mut edits_stream = Options::new(path) + .disable_buf() + .recover::>() + .await + .unwrap(); + while let Ok(batch) = edits_stream.try_next().await { + match batch { + Some(mut batch) => edits.append(&mut batch), + None => break, + } } edits } @@ -121,9 +126,10 @@ where mod tests { use std::io::Cursor; + use fusio_log::{Decode, Encode}; use tokio::io::AsyncSeekExt; - use crate::{fs::generate_file_id, scope::Scope, serdes::Encode, version::edit::VersionEdit}; + use crate::{fs::generate_file_id, scope::Scope, version::edit::VersionEdit}; #[tokio::test] async fn encode_and_decode() { @@ -153,7 +159,12 @@ mod tests { } cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let decode_edits = { VersionEdit::::recover(&mut cursor).await }; + + let mut decode_edits = Vec::new(); + + while let Ok(edit) = VersionEdit::decode(&mut cursor).await { + decode_edits.push(edit); + } assert_eq!(edits, decode_edits); } diff --git a/src/version/mod.rs b/src/version/mod.rs index 49478035..2f9ae259 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -12,6 +12,7 @@ use std::{ use flume::{SendError, Sender}; use fusio::DynFs; +use fusio_log::{error::LogError, Encode}; use parquet::arrow::ProjectionMask; use thiserror::Error; use tracing::error; @@ -21,7 +22,6 @@ use crate::{ ondisk::sstable::SsTable, record::{Record, Schema}, scope::Scope, - serdes::Encode, stream::{level::LevelStream, record_batch::RecordBatchEntry, ScanStream}, timestamp::{Timestamp, TimestampedRef}, version::{cleaner::CleanTag, edit::VersionEdit}, @@ -342,4 +342,6 @@ where UlidDecode(#[from] ulid::DecodeError), #[error("version send error: {0}")] Send(#[from] SendError), + #[error("recover error: {0}")] + Logger(#[from] LogError), } diff --git a/src/version/set.rs b/src/version/set.rs index b7e9b25d..35497975 100644 --- a/src/version/set.rs +++ b/src/version/set.rs @@ -1,6 +1,5 @@ use std::{ collections::BinaryHeap, - io::Cursor, mem, sync::{ atomic::{AtomicU32, Ordering}, @@ -10,14 +9,14 @@ use std::{ use async_lock::RwLock; use flume::Sender; -use fusio::{dynamic::DynFile, fs::FileMeta}; +use fusio::{fs::FileMeta, DynFs}; +use fusio_log::{Logger, Options}; use futures_util::StreamExt; use super::{TransactionTs, MAX_LEVEL}; use crate::{ fs::{generate_file_id, manager::StoreManager, parse_file_id, FileId, FileType}, record::{Record, Schema}, - serdes::Encode, timestamp::Timestamp, version::{cleaner::CleanTag, edit::VersionEdit, Version, VersionError, VersionRef}, DbOption, @@ -50,7 +49,7 @@ where R: Record, { current: VersionRef, - log_with_id: (Box, FileId), + log_with_id: (Logger::Key>>, FileId), } pub(crate) struct VersionSet @@ -130,21 +129,26 @@ where fs.remove(&log_id.0.path).await?; } - let log_id = second_log_id + let mut edits = vec![]; + + let log_id = match second_log_id .or(latest_log_id) .map(|file_meta| parse_file_id(&file_meta.0.path, FileType::Log)) .transpose()? .flatten() - .unwrap_or_else(generate_file_id); - - let mut log = fs - .open_options( - &option.version_log_path(log_id), - FileType::Log.open_options(false), - ) - .await?; + { + Some(log_id) => { + let recover_edits = VersionEdit::<::Key>::recover( + option.version_log_path(log_id), + ) + .await; + edits = recover_edits; + log_id + } + None => generate_file_id(), + }; - let edits = VersionEdit::recover(&mut Cursor::new(&mut log)).await; + let log = Self::open_version_log(&option, fs.clone(), log_id).await?; let timestamp = Arc::new(AtomicU32::default()); drop(log_stream); @@ -189,14 +193,12 @@ where if !is_recover { version_edits.push(VersionEdit::NewLogLength { len: edit_len }); + log.write_batch(version_edits.iter()) + .await + .map_err(VersionError::Logger)?; } + for version_edit in version_edits { - if !is_recover { - version_edit - .encode(log) - .await - .map_err(VersionError::Encode)?; - } match version_edit { VersionEdit::Add { mut scope, level } => { if let Some(wal_ids) = scope.wal_ids.take() { @@ -264,40 +266,47 @@ where if edit_len >= option.version_log_snapshot_threshold { let fs = self.manager.base_fs(); let old_log_id = mem::replace(log_id, generate_file_id()); - let new_log = fs - .open_options( - &option.version_log_path(*log_id), - FileType::Log.open_options(false), - ) - .await?; + let new_log = Self::open_version_log(option, fs.clone(), *log_id).await?; let _old_log = mem::replace(log, new_log); new_version.log_length = 0; - for new_edit in new_version.to_edits() { - new_edit.encode(log).await.map_err(VersionError::Encode)?; - } + log.write_batch(new_version.to_edits().iter()) + .await + .map_err(VersionError::Logger)?; log.close().await?; fs.remove(&option.version_log_path(old_log_id)).await?; } guard.current = Arc::new(new_version); Ok(()) } + + async fn open_version_log( + option: &DbOption, + fs: Arc, + gen: FileId, + ) -> Result::Key>>, VersionError> { + Options::new(option.version_log_path(gen)) + .build_with_fs(fs) + .await + .map_err(VersionError::Logger) + } } #[cfg(all(test, feature = "tokio"))] pub(crate) mod tests { - use std::{io::Cursor, sync::Arc}; + use std::sync::Arc; use async_lock::RwLock; use flume::{bounded, Sender}; use fusio::path::Path; use fusio_dispatch::FsOptions; + use fusio_log::Options; use futures_util::StreamExt; use tempfile::TempDir; use crate::{ - fs::{generate_file_id, manager::StoreManager, FileType}, - record::{test::StringSchema, Record}, + fs::{generate_file_id, manager::StoreManager}, + record::{test::StringSchema, Record, Schema}, scope::Scope, version::{ cleaner::CleanTag, @@ -318,13 +327,12 @@ pub(crate) mod tests { R: Record, { let log_id = generate_file_id(); - let log = manager - .base_fs() - .open_options( - &option.version_log_path(log_id), - FileType::Log.open_options(false), - ) - .await?; + + let log = Options::new(option.version_log_path(log_id)) + .disable_buf() + .build_with_fs::::Key>>(manager.base_fs().clone()) + .await + .map_err(VersionError::Logger)?; let timestamp = version.timestamp.clone(); Ok(VersionSet:: { @@ -448,10 +456,10 @@ pub(crate) mod tests { .await .unwrap(); - let mut guard = version_set.inner.write().await; - let log = &mut guard.log_with_id.0; + let guard = version_set.inner.write().await; - let edits = VersionEdit::::recover(&mut Cursor::new(log)).await; + let edits = + VersionEdit::::recover(option.version_log_path(guard.log_with_id.1)).await; assert_eq!(edits.len(), 3); assert_eq!( @@ -482,12 +490,7 @@ pub(crate) mod tests { } logs.sort_by(|meta_a, meta_b| meta_a.path.cmp(&meta_b.path)); - let log = manager - .base_fs() - .open_options(&logs.pop().unwrap().path, FileType::Log.open_options(false)) - .await - .unwrap(); - let edits = VersionEdit::::recover(&mut Cursor::new(log)).await; + let edits = VersionEdit::::recover(logs.pop().unwrap().path).await; assert_eq!(edits.len(), 3); assert_eq!( diff --git a/src/wal/checksum.rs b/src/wal/checksum.rs deleted file mode 100644 index db40e7fd..00000000 --- a/src/wal/checksum.rs +++ /dev/null @@ -1,104 +0,0 @@ -use std::{future::Future, hash::Hasher}; - -use fusio::{Error, IoBuf, IoBufMut, MaybeSend, SeqRead, Write}; - -use crate::serdes::{Decode, Encode}; - -pub(crate) struct HashWriter { - hasher: crc32fast::Hasher, - writer: W, -} - -impl HashWriter { - pub(crate) fn new(writer: W) -> Self { - Self { - hasher: crc32fast::Hasher::new(), - writer, - } - } - - pub(crate) async fn eol(mut self) -> Result<(), fusio::Error> { - let i = self.hasher.finish(); - i.encode(&mut self.writer).await - } -} - -impl Write for HashWriter { - async fn write_all(&mut self, buf: B) -> (Result<(), Error>, B) { - let (result, buf) = self.writer.write_all(buf).await; - self.hasher.write(buf.as_slice()); - - (result, buf) - } - - fn flush(&mut self) -> impl Future> + MaybeSend { - self.writer.flush() - } - - fn close(&mut self) -> impl Future> + MaybeSend { - self.writer.close() - } -} - -pub(crate) struct HashReader { - hasher: crc32fast::Hasher, - reader: R, -} - -impl HashReader { - pub(crate) fn new(reader: R) -> Self { - Self { - hasher: crc32fast::Hasher::new(), - reader, - } - } - - pub(crate) async fn checksum(mut self) -> Result { - let checksum = u64::decode(&mut self.reader).await?; - - Ok(self.hasher.finish() == checksum) - } -} - -impl SeqRead for HashReader { - async fn read_exact(&mut self, buf: B) -> (Result<(), Error>, B) { - let (result, buf) = self.reader.read_exact(buf).await; - if result.is_ok() { - self.hasher.write(buf.as_slice()); - } - (result, buf) - } -} - -#[cfg(test)] -pub(crate) mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::{ - serdes::{Decode, Encode}, - wal::checksum::{HashReader, HashWriter}, - }; - - #[tokio::test] - async fn test_encode_decode() { - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - - let mut writer = HashWriter::new(&mut cursor); - 4_u64.encode(&mut writer).await.unwrap(); - 3_u32.encode(&mut writer).await.unwrap(); - 2_u16.encode(&mut writer).await.unwrap(); - 1_u8.encode(&mut writer).await.unwrap(); - writer.eol().await.unwrap(); - - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let mut reader = HashReader::new(&mut cursor); - assert_eq!(u64::decode(&mut reader).await.unwrap(), 4); - assert_eq!(u32::decode(&mut reader).await.unwrap(), 3); - assert_eq!(u16::decode(&mut reader).await.unwrap(), 2); - assert_eq!(u8::decode(&mut reader).await.unwrap(), 1); - assert!(reader.checksum().await.unwrap()); - } -} diff --git a/src/wal/log.rs b/src/wal/log.rs index fda52c9e..ca95de24 100644 --- a/src/wal/log.rs +++ b/src/wal/log.rs @@ -1,20 +1,10 @@ -use std::mem::size_of; - use fusio::{SeqRead, Write}; +use fusio_log::{Decode, Encode}; -use crate::serdes::{Decode, Encode}; - -#[derive(Debug)] -pub struct Log { - pub log_type: LogType, - pub record: Re, -} - -impl Log { - pub fn new(log_type: LogType, record: Re) -> Self { - Self { log_type, record } - } -} +use crate::{ + record::{Record, Schema}, + timestamp::Timestamped, +}; #[derive(Debug, Clone, Copy)] #[repr(u8)] @@ -37,41 +27,111 @@ impl From for LogType { } } -impl Encode for Log +pub(crate) struct Log +where + R: Record, +{ + pub(crate) key: Timestamped<::Key>, + pub(crate) value: Option, + pub(crate) log_type: Option, +} + +impl Log where - Re: Encode + Sync, + R: Record, { - type Error = Re::Error; + pub(crate) fn new( + ts: Timestamped<::Key>, + value: Option, + log_type: Option, + ) -> Self { + Self { + key: ts, + value, + log_type, + } + } +} + +impl Encode for Log +where + R: Record, +{ + type Error = fusio::Error; async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> where W: Write, { - (self.log_type as u8).encode(writer).await?; - self.record.encode(writer).await + if let Some(log_type) = self.log_type { + (log_type as u8).encode(writer).await?; + } else { + unreachable!() + } + self.key.encode(writer).await.unwrap(); + self.value + .as_ref() + .map(R::as_record_ref) + .encode(writer) + .await + .unwrap(); + Ok(()) } fn size(&self) -> usize { - size_of::() + self.record.size() + self.key.size() + self.value.as_ref().map(R::as_record_ref).size() + size_of::() } } impl Decode for Log where - Re: Decode, + Re: Record, { - type Error = Re::Error; + type Error = fusio::Error; async fn decode(reader: &mut R) -> Result where R: SeqRead, { let log_type = LogType::from(u8::decode(reader).await?); - let log = Re::decode(reader).await?; + let key = Timestamped::<::Key>::decode(reader) + .await + .unwrap(); + let record = Option::::decode(reader).await.unwrap(); - Ok(Self { - log_type, - record: log, - }) + Ok(Log::new(key, record, Some(log_type))) + } +} + +#[cfg(test)] +mod tests { + use std::io::Cursor; + + use fusio_log::{Decode, Encode}; + use tokio::io::AsyncSeekExt; + + use crate::{ + timestamp::Timestamped, + wal::log::{Log, LogType}, + }; + + #[tokio::test] + async fn encode_and_decode() { + let entry: Log = Log::new( + Timestamped::new("hello".into(), 1.into()), + Some("hello".into()), + Some(LogType::Middle), + ); + let mut bytes = Vec::new(); + let mut cursor = Cursor::new(&mut bytes); + entry.encode(&mut cursor).await.unwrap(); + + let decode_entry = { + cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); + Log::::decode(&mut cursor).await.unwrap() + }; + + assert_eq!(entry.value, decode_entry.value); + assert_eq!(entry.key, entry.key); } } diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 4fea3778..2f5b6905 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -1,38 +1,41 @@ -mod checksum; pub(crate) mod log; -pub(crate) mod record_entry; -use std::marker::PhantomData; +use std::sync::Arc; use async_stream::stream; -use checksum::{HashReader, HashWriter}; -use fusio::{SeqRead, Write}; +use fusio::DynFs; +use fusio_log::{error::LogError, Decode, FsOptions, Logger, Options, Path}; use futures_core::Stream; -use log::Log; +use futures_util::TryStreamExt; use thiserror::Error; -use crate::{ - fs::FileId, - record::{Key, Record, Schema}, - serdes::{Decode, Encode}, - timestamp::Timestamped, - wal::{log::LogType, record_entry::RecordEntry}, -}; - -#[derive(Debug)] -pub(crate) struct WalFile { - file: F, +use crate::{fs::FileId, record::Record, wal::log::Log}; + +pub(crate) struct WalFile +where + R: Record, +{ + file: Logger>, file_id: FileId, - _marker: PhantomData, } -impl WalFile { - pub(crate) fn new(file: F, file_id: FileId) -> Self { - Self { - file, - file_id, - _marker: PhantomData, - } +impl WalFile +where + R: Record, +{ + pub(crate) async fn new( + fs: Arc, + path: Path, + wal_buffer_size: usize, + file_id: FileId, + ) -> Self { + let file = Options::new(path) + .buf_size(wal_buffer_size) + .build_with_fs::>(fs) + .await + .unwrap(); + + Self { file, file_id } } pub(crate) fn file_id(&self) -> FileId { @@ -40,61 +43,39 @@ impl WalFile { } } -impl WalFile +impl WalFile where - F: Write, R: Record, { - pub(crate) async fn write<'r>( - &mut self, - log_ty: LogType, - key: Timestamped<<::Key as Key>::Ref<'r>>, - value: Option>, - ) -> Result<(), as Encode>::Error> { - let mut writer = HashWriter::new(&mut self.file); - Log::new(log_ty, RecordEntry::::Encode((key, value))) - .encode(&mut writer) - .await?; - writer.eol().await?; - Ok(()) + pub(crate) async fn write<'r>(&mut self, data: &Log) -> Result<(), LogError> { + self.file.write(data).await } - pub(crate) async fn flush(&mut self) -> Result<(), fusio::Error> { + pub(crate) async fn flush(&mut self) -> Result<(), LogError> { self.file.close().await } } -impl WalFile +impl WalFile where - F: SeqRead, R: Record, { - pub(crate) fn recover( - &mut self, - ) -> impl Stream< - Item = Result< - (LogType, Timestamped<::Key>, Option), - RecoverError<::Error>, - >, - > + '_ { + pub(crate) async fn recover( + fs_option: FsOptions, + path: Path, + ) -> impl Stream>, RecoverError<::Error>>> { stream! { - loop { - let mut reader = HashReader::new(&mut self.file); - - let record = match Log::>::decode(&mut reader).await { - Ok(record) => record, - Err(_) => return, - }; - if !reader.checksum().await? { - yield Err(RecoverError::Checksum); - return; - } - if let RecordEntry::Decode((key, value)) = record.record { - yield Ok((record.log_type, key, value)); - } else { - unreachable!() + let mut stream = Options::new(path) + .fs(fs_option) + .recover::>() + .await + .unwrap(); + while let Ok(batch) = stream.try_next().await { + match batch { + Some(batch) => yield Ok(batch), + None => break, + } } - } } } } @@ -109,67 +90,76 @@ pub enum RecoverError { Io(#[from] std::io::Error), #[error("wal recover fusio error")] Fusio(#[from] fusio::Error), + #[error("wal recover log error")] + Logger(#[from] LogError), } #[cfg(test)] mod tests { - use std::{io::Cursor, pin::pin}; + use std::{pin::pin, sync::Arc}; + use fusio::disk::TokioFs; + use fusio_log::Path; use futures_util::StreamExt; - use tokio::io::AsyncSeekExt; + use tempfile::TempDir; use super::{log::LogType, WalFile}; - use crate::{fs::generate_file_id, timestamp::Timestamped}; + use crate::{ + fs::{generate_file_id, FileType}, + timestamp::Timestamped, + wal::log::Log, + }; #[tokio::test] async fn write_and_recover() { - let mut bytes = Vec::new(); - let mut file = Cursor::new(&mut bytes); + let temp_dir = TempDir::new().unwrap(); + + let wal_id = generate_file_id(); + let wal_path = Path::from_filesystem_path(temp_dir.path()) + .unwrap() + .child(format!("{}.{}", wal_id, FileType::Wal)); + let fs_option = fusio_log::FsOptions::Local; + let mut wal = WalFile::::new(Arc::new(TokioFs), wal_path.clone(), 0, wal_id).await; { - let mut wal = WalFile::<_, String>::new(&mut file, generate_file_id()); - wal.write( - LogType::Full, - Timestamped::new("hello", 0.into()), - Some("hello"), - ) + wal.write(&Log::new( + Timestamped::new("hello".into(), 0.into()), + Some("hello".into()), + Some(LogType::Full), + )) .await .unwrap(); wal.flush().await.unwrap(); } { - file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let mut wal = WalFile::<_, String>::new(&mut file, generate_file_id()); - { - let mut stream = pin!(wal.recover()); - let (_, key, value) = stream.next().await.unwrap().unwrap(); - assert_eq!(key.ts, 0.into()); - assert_eq!(value, Some("hello".to_string())); + let mut stream = + pin!(WalFile::::recover(fs_option.clone(), wal_path.clone()).await); + for log in stream.next().await.unwrap().unwrap() { + assert_eq!(log.key.ts, 0.into()); + assert_eq!(log.value, Some("hello".to_string())); + } } - let mut wal = WalFile::<_, String>::new(&mut file, generate_file_id()); - - wal.write( - LogType::Full, - Timestamped::new("world", 1.into()), - Some("world"), - ) + wal.write(&Log::new( + Timestamped::new("world".into(), 1.into()), + Some("world".into()), + Some(LogType::Full), + )) .await .unwrap(); } { - file.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - let mut wal = WalFile::<_, String>::new(&mut file, generate_file_id()); - { - let mut stream = pin!(wal.recover()); - let (_, key, value) = stream.next().await.unwrap().unwrap(); - assert_eq!(key.ts, 0.into()); - assert_eq!(value, Some("hello".to_string())); - let (_, key, value) = stream.next().await.unwrap().unwrap(); - assert_eq!(key.ts, 1.into()); - assert_eq!(value, Some("world".to_string())); + let mut stream = pin!(WalFile::::recover(fs_option, wal_path).await); + for log in stream.next().await.unwrap().unwrap() { + assert_eq!(log.key.ts, 0.into()); + assert_eq!(log.value, Some("hello".to_string())); + } + for log in stream.next().await.unwrap().unwrap() { + assert_eq!(log.key.ts, 1.into()); + assert_eq!(log.value, Some("world".to_string())); + } } } } diff --git a/src/wal/record_entry.rs b/src/wal/record_entry.rs deleted file mode 100644 index 07929553..00000000 --- a/src/wal/record_entry.rs +++ /dev/null @@ -1,106 +0,0 @@ -use fusio::{SeqRead, Write}; - -use crate::{ - record::{Key, Record, Schema}, - serdes::{Decode, Encode}, - timestamp::Timestamped, -}; - -pub(crate) enum RecordEntry<'r, R> -where - R: Record, -{ - Encode( - ( - Timestamped<<::Key as Key>::Ref<'r>>, - Option>, - ), - ), - Decode((Timestamped<::Key>, Option)), -} - -impl Encode for RecordEntry<'_, R> -where - R: Record, -{ - type Error = fusio::Error; - - async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> - where - W: Write, - { - if let RecordEntry::Encode((key, recode_ref)) = self { - key.encode(writer).await.unwrap(); - recode_ref.encode(writer).await.unwrap(); - - return Ok(()); - } - unreachable!() - } - - fn size(&self) -> usize { - if let RecordEntry::Encode((key, recode_ref)) = self { - return key.size() + recode_ref.size(); - } - unreachable!() - } -} - -impl Decode for RecordEntry<'_, Re> -where - Re: Record, -{ - type Error = fusio::Error; - - async fn decode(reader: &mut R) -> Result - where - R: SeqRead, - { - let key = Timestamped::<::Key>::decode(reader) - .await - .unwrap(); - let record = Option::::decode(reader).await.unwrap(); - - Ok(RecordEntry::Decode((key, record))) - } -} - -#[cfg(test)] -mod tests { - use std::io::Cursor; - - use tokio::io::AsyncSeekExt; - - use crate::{ - serdes::{Decode, Encode}, - timestamp::Timestamped, - wal::record_entry::RecordEntry, - }; - - #[tokio::test] - async fn encode_and_decode() { - let entry: RecordEntry<'static, String> = - RecordEntry::Encode((Timestamped::new("hello", 0.into()), Some("hello"))); - let mut bytes = Vec::new(); - let mut cursor = Cursor::new(&mut bytes); - entry.encode(&mut cursor).await.unwrap(); - - let decode_entry = { - cursor.seek(std::io::SeekFrom::Start(0)).await.unwrap(); - RecordEntry::<'static, String>::decode(&mut cursor) - .await - .unwrap() - }; - - if let (RecordEntry::Encode((key_1, value_1)), RecordEntry::Decode((key_2, value_2))) = - (entry, decode_entry) - { - assert_eq!(key_1.value, key_2.value.as_str()); - assert_eq!(key_1.ts, key_2.ts); - assert_eq!(value_1, value_2.as_deref()); - - return; - } - unreachable!() - } -} diff --git a/tests/macros_correctness.rs b/tests/macros_correctness.rs index cd7b0fea..a1361abd 100644 --- a/tests/macros_correctness.rs +++ b/tests/macros_correctness.rs @@ -13,6 +13,7 @@ mod tests { use std::{io::Cursor, sync::Arc}; use arrow::array::{BooleanArray, RecordBatch, StringArray, UInt32Array, UInt8Array}; + use fusio_log::{Decode, Encode}; use parquet::{ arrow::{arrow_to_parquet_schema, ProjectionMask}, format::SortingColumn, @@ -23,7 +24,6 @@ mod tests { inmem::immutable::{ArrowArrays, Builder}, magic, record::{Record, RecordRef, Schema}, - serdes::{Decode, Encode}, timestamp::timestamped::Timestamped, }; diff --git a/tonbo_macros/src/record.rs b/tonbo_macros/src/record.rs index 8b2703e1..c181dc56 100644 --- a/tonbo_macros/src/record.rs +++ b/tonbo_macros/src/record.rs @@ -273,7 +273,7 @@ fn trait_decode_codegen(struct_name: &Ident, fields: &[RecordStructFieldOpt]) -> } quote! { - impl ::tonbo::serdes::Decode for #struct_name { + impl ::fusio_log::Decode for #struct_name { type Error = ::tonbo::record::RecordDecodeError; async fn decode(reader: &mut R) -> Result @@ -514,7 +514,7 @@ fn trait_encode_codegen(struct_name: &Ident, fields: &[RecordStructFieldOpt]) -> let field_name = field.ident.as_ref().unwrap(); encode_method_fields.push(quote! { - ::tonbo::serdes::Encode::encode(&self.#field_name, writer).await.map_err(|err| ::tonbo::record::RecordEncodeError::Encode { + ::fusio_log::Encode::encode(&self.#field_name, writer).await.map_err(|err| ::tonbo::record::RecordEncodeError::Encode { field_name: stringify!(#field_name).to_string(), error: Box::new(err), })?; @@ -527,7 +527,7 @@ fn trait_encode_codegen(struct_name: &Ident, fields: &[RecordStructFieldOpt]) -> let struct_ref_name = struct_name.to_ref_ident(); quote! { - impl<'r> ::tonbo::serdes::Encode for #struct_ref_name<'r> { + impl<'r> ::fusio_log::Encode for #struct_ref_name<'r> { type Error = ::tonbo::record::RecordEncodeError; async fn encode(&self, writer: &mut W) -> Result<(), Self::Error> From e71d3e2d7af9880b06639ec458dbc96c100a395d Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Tue, 14 Jan 2025 18:32:59 +0800 Subject: [PATCH 2/3] fix: add missing pattern in python --- bindings/python/src/error.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/bindings/python/src/error.rs b/bindings/python/src/error.rs index 8e91373a..35c194ce 100644 --- a/bindings/python/src/error.rs +++ b/bindings/python/src/error.rs @@ -51,6 +51,7 @@ impl From for PyErr { tonbo::DbError::Recover(err) => RecoverError::new_err(err.to_string()), tonbo::DbError::WalWrite(err) => PyIOError::new_err(err.to_string()), tonbo::DbError::ExceedsMaxLevel => ExceedsMaxLevelError::new_err("Exceeds max level"), + tonbo::DbError::Logger(err) => PyIOError::new_err(err.to_string()), } } } From 62ec87489f7cf6379acf47aecb0b8b7d57c109d3 Mon Sep 17 00:00:00 2001 From: crwen <1543720935@qq.com> Date: Thu, 16 Jan 2025 14:56:02 +0800 Subject: [PATCH 3/3] chore: update fusio deps --- Cargo.toml | 8 ++++---- bindings/python/Cargo.toml | 4 ++-- src/wal/mod.rs | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index b4bacc1d..5ef1c4e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,13 +83,13 @@ crc32fast = "1" crossbeam-skiplist = "0.1" datafusion = { version = "42.2.0", optional = true } flume = { version = "0.11", features = ["async"] } -fusio = { version = "0.3.4", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio", features = [ "dyn", "fs", ] } -fusio-dispatch = "0.3.4" -fusio-log = {git = "https://github.com/tonbo-io/fusio-log", default-features = false, features = ["bytes"]} -fusio-parquet = "0.3.4" +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-dispatch" } +fusio-log = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-log" , default-features = false, features = ["bytes"] } +fusio-parquet = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-parquet" } futures-core = "0.3" futures-io = "0.3" futures-util = "0.3" diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 19da871a..d19f3d82 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -9,11 +9,11 @@ crate-type = ["cdylib"] [workspace] [dependencies] -fusio = { version = "0.3.4", features = [ +fusio = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio", features = [ "aws", "tokio", ] } -fusio-dispatch = { version = "0.3.4", features = [ +fusio-dispatch = { git = "https://github.com/tonbo-io/fusio", rev = "1fb503916a4945e5ff5fcc136f3d65e56375fb3d", version = "0.3.4", package = "fusio-dispatch", features = [ "aws", "tokio", ] } diff --git a/src/wal/mod.rs b/src/wal/mod.rs index 2f5b6905..f8b913fc 100644 --- a/src/wal/mod.rs +++ b/src/wal/mod.rs @@ -94,7 +94,7 @@ pub enum RecoverError { Logger(#[from] LogError), } -#[cfg(test)] +#[cfg(all(test, feature = "tokio"))] mod tests { use std::{pin::pin, sync::Arc};