From e2d739121273fa1efbc223aec7774ccc64a3dc42 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Mon, 30 Dec 2024 12:32:57 +0100 Subject: [PATCH 01/19] feat: implement file blockstore --- Cargo.lock | 29 +++- Cargo.toml | 1 + mater/lib/Cargo.toml | 1 + mater/lib/src/lib.rs | 2 +- mater/lib/src/stores/file.rs | 295 +++++++++++++++++++++++++++++++++++ mater/lib/src/stores/mod.rs | 2 + mater/lib/src/v2/mod.rs | 11 +- mater/lib/src/v2/writer.rs | 32 ++-- 8 files changed, 357 insertions(+), 16 deletions(-) create mode 100644 mater/lib/src/stores/file.rs diff --git a/Cargo.lock b/Cargo.lock index 1e09c3ae3..4769ed4f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1692,6 +1692,18 @@ dependencies = [ "piper", ] +[[package]] +name = "blockstore" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0a8962daed8fb337472d9c4215006443acba1e40c6c91c9d4a3f440d1fb30436" +dependencies = [ + "cid 0.11.1", + "dashmap 6.1.0", + "multihash 0.19.3", + "thiserror 1.0.69", +] + [[package]] name = "bls12_381" version = "0.8.0" @@ -4013,6 +4025,20 @@ dependencies = [ "parking_lot_core 0.9.10", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core 0.9.10", +] + [[package]] name = "data-encoding" version = "2.6.0" @@ -6082,7 +6108,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68a7f542ee6b35af73b06abc0dad1c1bae89964e4e253bc4b587b91c9637867b" dependencies = [ "cfg-if", - "dashmap", + "dashmap 5.5.3", "futures", "futures-timer", "no-std-compat", @@ -8615,6 +8641,7 @@ version = "0.1.0" dependencies = [ "async-stream", "bitflags 2.6.0", + "blockstore", "byteorder", "bytes", "criterion", diff --git a/Cargo.toml b/Cargo.toml index f5c3eea0c..f7fd93e0b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ axum = "0.7.5" base64 = "0.22.1" bitflags = "2.5.0" blake2b_simd = { version = "1.0.2", default-features = false } +blockstore = "0.7.1" bls12_381 = "0.8" bs58 = "0.5.1" byteorder = "1.5.0" diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index ba72449ea..99dab4f7b 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -10,6 +10,7 @@ version = "0.1.0" [dependencies] async-stream.workspace = true bitflags.workspace = true +blockstore.workspace = true byteorder = { workspace = true, features = ["i128"] } bytes.workspace = true digest.workspace = true diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index e9e6dd62e..0c1d1b214 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -19,7 +19,7 @@ mod v2; // We need to re-expose this because `read_block` returns `(Cid, Vec)`. pub use ipld_core::cid::Cid; -pub use stores::{create_filestore, Blockstore, Config}; +pub use stores::{create_filestore, Blockstore, Config, FileBlockstore}; pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer}; pub use v2::{ verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted, diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs new file mode 100644 index 000000000..37e7cb24a --- /dev/null +++ b/mater/lib/src/stores/file.rs @@ -0,0 +1,295 @@ +use std::{io::SeekFrom, path::Path}; + +use blockstore::Blockstore; +use indexmap::IndexMap; +use ipld_core::cid::{Cid, CidGeneric}; +use sha2::{Digest, Sha256}; +use tokio::{ + fs::File, + io::{AsyncSeekExt, AsyncWriteExt}, + sync::{Mutex, RwLock}, +}; + +use crate::{ + multicodec::SHA_256_CODE, + v1::{self, read_block, write_block}, + v2::{self}, + CarV1Header, CarV2Header, Characteristics, Error, Index, IndexEntry, MultihashIndexSorted, + SingleWidthIndex, +}; + +/// Implements a blockstore that stores blocks in a CARv2 format. Blocks put +/// into the blockstore can be read back once they are successfully written. The +/// blocks are written immediately, while the index is stored in memory and +/// updated incrementally. +/// +/// The blockstore should be closed once the putting blocks is finished. Upon +/// closing the blockstore, the index is written out to underlying file. +pub struct FileBlockstore { + // Inner store + inner: Mutex, + // Index of blocks that will be appended to the file at the finalization + index: RwLock>, +} + +/// Inner file store. Encapsulating state that is locked and used together. +struct FileBlockstoreInner { + // Car file data store + // TODO: Buffered writer and reader? + store: File, + // The byte length of the CARv1 payload. This is used by the indexing, so we + // know the locations of each blocks in the file. + data_size: u64, + // Is true if the blockstore was finalized. + is_finalized: bool, +} + +impl FileBlockstore { + /// Create a new blockstore. If file at the path already exists it is truncated. + pub async fn new

(path: P, roots: Vec) -> Result + where + P: AsRef, + { + let mut file = File::options() + .truncate(true) + .write(true) + .read(true) + .open(path) + .await?; + + // Write headers + v2::write_header(&mut file, &CarV2Header::default()).await?; + let written = v1::write_header(&mut file, &CarV1Header::new(roots.clone())).await?; + + let inner = FileBlockstoreInner { + store: file, + data_size: written as u64, + is_finalized: false, + }; + + Ok(Self { + inner: Mutex::new(inner), + index: RwLock::new(IndexMap::new()), + }) + } + + async fn has(&self, cid: Cid) -> Result { + Ok(self.index.read().await.get(&cid).is_some()) + } + + /// Get specific block from the store + async fn get(&self, cid: Cid) -> Result>, Error> { + // Get the index if exists + let Some(index) = self.index.read().await.get(&cid).copied() else { + return Ok(None); + }; + + // The lock is hold through out the method execution. That way we are + // certain that the file is not used and we are moving the cursor back + // to the correct place after the read. + let mut inner = self.inner.lock().await; + let current_cursor_location = inner.store.stream_position().await?; + + // Move cursor to the location of the block + inner + .store + .seek(SeekFrom::Start(CarV2Header::SIZE + index)) + .await?; + + // Read block + let (block_cid, block_data) = read_block(&mut inner.store).await?; + debug_assert_eq!(block_cid, cid); + + // Move cursor back to the original position + inner + .store + .seek(SeekFrom::Start(current_cursor_location)) + .await?; + + return Ok(Some(block_data)); + } + + /// Put a new block in the store + async fn put(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { + // Lock writer + let mut inner = self.inner.lock().await; + + // This is a current position of the writer. We save this to the indexer + // so that we know where we wrote the current block. + let current_position = inner.store.stream_position().await?; + let index_location = current_position - CarV2Header::SIZE; + + // Write block + // let buffered_writer = BufWriter::new(inner.store); + let written = write_block(&mut inner.store, &cid, data).await?; + inner.data_size += written as u64; + + // Add current block to the index + self.index.write().await.insert(*cid, index_location); + + Ok(()) + } + + /// Finalize this blockstore by writing the CARv2 header, along with + /// flattened index for more efficient subsequent read. + async fn finalize(self) -> Result<(), Error> { + // Locked underlying file handler + let mut inner = self.inner.lock().await; + + // The blockstore was already finalized + if inner.is_finalized { + return Ok(()); + } + + // Correct CARv2 header + let header = CarV2Header { + characteristics: Characteristics::EMPTY, + data_offset: CarV2Header::SIZE, + data_size: inner.data_size, + index_offset: CarV2Header::SIZE + inner.data_size, + }; + + // Write correct CARv2 header + inner.store.rewind().await?; + v2::write_header(&mut inner.store, &header).await?; + + // Flatten and write the index + inner + .store + .seek(SeekFrom::Start(header.index_offset)) + .await?; + let index = self.index.read().await.clone(); + let count = index.len() as u64; + let entries = index + .into_iter() + .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) + .collect(); + let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( + SHA_256_CODE, + SingleWidthIndex::new(Sha256::output_size() as u32, count, entries).into(), + )); + v2::write_index(&mut inner.store, &index).await?; + + // Flush underlying writer + inner.store.flush().await?; + inner.is_finalized = true; + + Ok(()) + } +} + +impl Blockstore for FileBlockstore { + async fn get( + &self, + cid: &CidGeneric, + ) -> Result>, blockstore::Error> { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; + + self.get(cid) + .await + .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + } + + async fn has(&self, cid: &CidGeneric) -> blockstore::Result { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; + + self.has(cid) + .await + .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + } + + async fn put_keyed( + &self, + cid: &CidGeneric, + data: &[u8], + ) -> Result<(), blockstore::Error> { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; + + self.put(&cid, data) + .await + .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + } + + async fn remove(&self, _cid: &CidGeneric) -> Result<(), blockstore::Error> { + unimplemented!("Operation not supported") + } + + async fn close(self) -> Result<(), blockstore::Error> { + self.finalize() + .await + .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + } +} + +#[cfg(test)] +mod tests { + use std::{io::Cursor, path::PathBuf, str::FromStr}; + + use blockstore::Blockstore; + use tempfile::NamedTempFile; + use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncSeekExt}, + }; + + use crate::{CarV2Reader, Error, FileBlockstore}; + + #[tokio::test] + async fn test_blockstore() { + // Car file + let mut file = + File::open(PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap()) + .await + .unwrap(); + let mut original_archive = Vec::new(); + file.read_to_end(&mut original_archive).await.unwrap(); + + let mut reader = CarV2Reader::new(Cursor::new(original_archive.clone())); + reader.read_pragma().await.unwrap(); + let header = reader.read_header().await.unwrap(); + + let v1_header = reader.read_v1_header().await.unwrap(); + + let blockstore_file_path = NamedTempFile::new().unwrap(); + let blockstore = FileBlockstore::new(&blockstore_file_path, v1_header.roots) + .await + .unwrap(); + + loop { + // NOTE(@jmg-duarte,22/05/2024): review this + match reader.read_block().await { + Ok((cid, data)) => { + // Add block to the store + blockstore.put_keyed(&cid, &data).await.unwrap(); + + // Get the same block back and check if it's the same + let block = blockstore.get(cid).await.unwrap().unwrap(); + assert_eq!(block, data); + + // Kinda hacky, but better than doing a seek later on + let position = reader.get_inner_mut().stream_position().await.unwrap(); + let data_end = header.data_offset + header.data_size; + if position >= data_end { + break; + } + } + else_ => { + // With the length check above this branch should actually be unreachable + assert!(matches!(else_, Err(Error::IoError(_)))); + break; + } + } + } + + // Finalize blockstore + blockstore.finalize().await.unwrap(); + + // Load new archive file to memory + let mut file = File::open(blockstore_file_path).await.unwrap(); + let mut new_archive = Vec::new(); + file.read_to_end(&mut new_archive).await.unwrap(); + + // Compare both files + assert_eq!(original_archive, new_archive); + } +} diff --git a/mater/lib/src/stores/mod.rs b/mater/lib/src/stores/mod.rs index 948023ce1..66920d97d 100644 --- a/mater/lib/src/stores/mod.rs +++ b/mater/lib/src/stores/mod.rs @@ -1,7 +1,9 @@ mod blockstore; +mod file; mod filestore; pub use blockstore::Blockstore; +pub use file::FileBlockstore; pub use filestore::create_filestore; /// The default block size, as defined in diff --git a/mater/lib/src/v2/mod.rs b/mater/lib/src/v2/mod.rs index e9770d896..a884e5b32 100644 --- a/mater/lib/src/v2/mod.rs +++ b/mater/lib/src/v2/mod.rs @@ -3,9 +3,11 @@ mod reader; mod writer; use bitflags::bitflags; -pub use index::{Index, IndexEntry, IndexSorted, MultihashIndexSorted, SingleWidthIndex}; +pub use index::{ + write_index, Index, IndexEntry, IndexSorted, MultihashIndexSorted, SingleWidthIndex, +}; pub use reader::{verify_cid, Reader}; -pub use writer::Writer; +pub use writer::{write_header, Writer}; /// The pragma for a CARv2. This is also a valid CARv1 header, with version 2 and no root CIDs. /// @@ -18,6 +20,9 @@ pub const PRAGMA: [u8; 11] = [ 0x02, // uint(2) ]; +/// Number of bytes in [`PRAGMA`] +pub const PRAGMA_SIZE: u64 = PRAGMA.len() as u64; + bitflags! { /// Characteristics of the enclosed data. #[derive(Debug, PartialEq, Eq)] @@ -82,7 +87,7 @@ impl Header { /// The [`Header`] size in bytes (includes the pragma). /// /// As defined in the [specification](https://ipld.io/specs/transport/car/carv2/#header). - pub const SIZE: usize = PRAGMA.len() + 40; + pub const SIZE: u64 = PRAGMA_SIZE + 40; } impl Default for Header { diff --git a/mater/lib/src/v2/writer.rs b/mater/lib/src/v2/writer.rs index 3622e7a6c..e27c5da44 100644 --- a/mater/lib/src/v2/writer.rs +++ b/mater/lib/src/v2/writer.rs @@ -27,17 +27,7 @@ where /// /// Returns the number of bytes written. pub async fn write_header(&mut self, header: &Header) -> Result { - self.writer.write_all(&PRAGMA).await?; - - let mut buffer = [0; 40]; - let mut handle = &mut buffer[..]; - WriteBytesExt::write_u128::(&mut handle, header.characteristics.bits())?; - WriteBytesExt::write_u64::(&mut handle, header.data_offset)?; - WriteBytesExt::write_u64::(&mut handle, header.data_size)?; - WriteBytesExt::write_u64::(&mut handle, header.index_offset)?; - - self.writer.write_all(&buffer).await?; - Ok(PRAGMA.len() + buffer.len()) + write_header(&mut self.writer, header).await } /// Write a [`crate::v1::Header`]. @@ -86,6 +76,26 @@ where } } +/// Write a [`Header`]. +/// +/// Returns the number of bytes written. +pub async fn write_header(writer: &mut W, header: &Header) -> Result +where + W: AsyncWrite + Unpin, +{ + writer.write_all(&PRAGMA).await?; + + let mut buffer = [0; 40]; + let mut handle = &mut buffer[..]; + WriteBytesExt::write_u128::(&mut handle, header.characteristics.bits())?; + WriteBytesExt::write_u64::(&mut handle, header.data_offset)?; + WriteBytesExt::write_u64::(&mut handle, header.data_size)?; + WriteBytesExt::write_u64::(&mut handle, header.index_offset)?; + + writer.write_all(&buffer).await?; + Ok(PRAGMA.len() + buffer.len()) +} + #[cfg(test)] mod tests { use std::{collections::BTreeMap, io::Cursor}; From bb447153f2430aa7fd4c6c24cb15f819bd1bb19b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Mon, 30 Dec 2024 13:19:16 +0100 Subject: [PATCH 02/19] make blockstore impl optional --- mater/lib/Cargo.toml | 7 ++++++- mater/lib/src/stores/file.rs | 12 +++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/mater/lib/Cargo.toml b/mater/lib/Cargo.toml index 99dab4f7b..0191772cf 100644 --- a/mater/lib/Cargo.toml +++ b/mater/lib/Cargo.toml @@ -7,10 +7,12 @@ name = "mater" # name WIP repository.workspace = true version = "0.1.0" +[features] +blockstore = ["dep:blockstore"] + [dependencies] async-stream.workspace = true bitflags.workspace = true -blockstore.workspace = true byteorder = { workspace = true, features = ["i128"] } bytes.workspace = true digest.workspace = true @@ -28,6 +30,9 @@ tokio = { workspace = true, features = ["fs", "macros", "rt-multi-thread"] } tokio-stream.workspace = true tokio-util = { workspace = true, features = ["io"] } +# Optional dependencies +blockstore = { workspace = true, optional = true } + [dev-dependencies] criterion = { workspace = true, features = ["async_tokio", "html_reports"] } rand = { workspace = true, default_features = true } diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 37e7cb24a..751e623fc 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -1,8 +1,7 @@ use std::{io::SeekFrom, path::Path}; -use blockstore::Blockstore; use indexmap::IndexMap; -use ipld_core::cid::{Cid, CidGeneric}; +use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; use tokio::{ fs::File, @@ -178,7 +177,8 @@ impl FileBlockstore { } } -impl Blockstore for FileBlockstore { +#[cfg(feature = "blockstore")] +impl blockstore::Blockstore for FileBlockstore { async fn get( &self, cid: &CidGeneric, @@ -225,7 +225,6 @@ impl Blockstore for FileBlockstore { mod tests { use std::{io::Cursor, path::PathBuf, str::FromStr}; - use blockstore::Blockstore; use tempfile::NamedTempFile; use tokio::{ fs::File, @@ -260,7 +259,10 @@ mod tests { match reader.read_block().await { Ok((cid, data)) => { // Add block to the store - blockstore.put_keyed(&cid, &data).await.unwrap(); + blockstore.put(&cid, &data).await.unwrap(); + + // Check if the blockstore has a new block + assert!(blockstore.has(cid).await.unwrap()); // Get the same block back and check if it's the same let block = blockstore.get(cid).await.unwrap().unwrap(); From 83812ecb4024fc8c83ed93819485fd112054229b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Mon, 30 Dec 2024 13:56:24 +0100 Subject: [PATCH 03/19] remove comment --- mater/lib/src/stores/file.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 751e623fc..23da595d4 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -34,7 +34,6 @@ pub struct FileBlockstore { /// Inner file store. Encapsulating state that is locked and used together. struct FileBlockstoreInner { // Car file data store - // TODO: Buffered writer and reader? store: File, // The byte length of the CARv1 payload. This is used by the indexing, so we // know the locations of each blocks in the file. From ba97e70915f049904e4b4ad1523d209df8cbfac5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 31 Dec 2024 09:15:06 +0100 Subject: [PATCH 04/19] add buffered writter --- mater/lib/src/stores/file.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 23da595d4..a9a0cc3be 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -5,7 +5,7 @@ use ipld_core::cid::Cid; use sha2::{Digest, Sha256}; use tokio::{ fs::File, - io::{AsyncSeekExt, AsyncWriteExt}, + io::{AsyncSeekExt, AsyncWriteExt, BufWriter}, sync::{Mutex, RwLock}, }; @@ -118,8 +118,9 @@ impl FileBlockstore { let index_location = current_position - CarV2Header::SIZE; // Write block - // let buffered_writer = BufWriter::new(inner.store); - let written = write_block(&mut inner.store, &cid, data).await?; + let mut buffered_writer = BufWriter::new(&mut inner.store); + let written = write_block(&mut buffered_writer, &cid, data).await?; + buffered_writer.flush().await?; inner.data_size += written as u64; // Add current block to the index From 7564fc6449eaaf4dd9d90775caaff2d2f7103f0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 31 Dec 2024 10:02:51 +0100 Subject: [PATCH 05/19] fail if fail exists --- mater/lib/src/stores/file.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index a9a0cc3be..8eded934a 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -27,7 +27,10 @@ use crate::{ pub struct FileBlockstore { // Inner store inner: Mutex, - // Index of blocks that will be appended to the file at the finalization + // Index of blocks that will be appended to the file at the finalization. + // Stored number is an offset that locates the first byte of the block + // within the CARv1 payload. The offset is relative to the start of the + // CARv1 payload. index: RwLock>, } @@ -49,7 +52,7 @@ impl FileBlockstore { P: AsRef, { let mut file = File::options() - .truncate(true) + .create_new(true) .write(true) .read(true) .open(path) @@ -233,6 +236,14 @@ mod tests { use crate::{CarV2Reader, Error, FileBlockstore}; + #[tokio::test] + async fn file_exists() { + let existing_path = PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap(); + let blockstore = FileBlockstore::new(&existing_path, vec![]).await; + + assert!(blockstore.is_err()); + } + #[tokio::test] async fn test_blockstore() { // Car file From 88412e090f69323f61fd495ddaf6c941b48e31d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 31 Dec 2024 10:13:02 +0100 Subject: [PATCH 06/19] fix tests --- mater/lib/src/stores/blockstore.rs | 9 +++------ mater/lib/src/stores/file.rs | 5 +++-- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 69e6a29c1..16d016ddc 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -164,7 +164,7 @@ impl Blockstore { /// Get the [`CarV2Header`] that will be written out. fn header_v2(&self) -> CarV2Header { - let data_offset = CarV2Header::SIZE as u64; + let data_offset = CarV2Header::SIZE; let data_size: u64 = self .blocks .iter() @@ -298,16 +298,13 @@ mod tests { car_reader.read_pragma().await.unwrap(); let car_v2_header = car_reader.read_header().await.unwrap(); - assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE as u64); + assert_eq!(car_v2_header.data_offset, CarV2Header::SIZE); // Extracted with go-car and validated with an hex viewer // to extract the values, run the following commands: // $ car inspect // The dump is necessary because go-car does not support parametrization assert_eq!(car_v2_header.data_size, 1358); - assert_eq!( - car_v2_header.index_offset, - (CarV2Header::SIZE as u64) + 1358 - ); + assert_eq!(car_v2_header.index_offset, CarV2Header::SIZE + 1358); let car_v1_header = car_reader.read_v1_header().await.unwrap(); assert_eq!(car_v1_header.roots.len(), 1); diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 8eded934a..769908a5e 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -228,7 +228,7 @@ impl blockstore::Blockstore for FileBlockstore { mod tests { use std::{io::Cursor, path::PathBuf, str::FromStr}; - use tempfile::NamedTempFile; + use tempfile::TempDir; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt}, @@ -260,7 +260,8 @@ mod tests { let v1_header = reader.read_v1_header().await.unwrap(); - let blockstore_file_path = NamedTempFile::new().unwrap(); + let tmp_dir = TempDir::new().unwrap(); + let blockstore_file_path = tmp_dir.path().join("blockstore.car"); let blockstore = FileBlockstore::new(&blockstore_file_path, v1_header.roots) .await .unwrap(); From 17fa8fa545cf9f6ff8b93feed38adad7012ee24b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Tue, 31 Dec 2024 10:22:28 +0100 Subject: [PATCH 07/19] continue writing at the correct place --- mater/lib/src/stores/file.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 769908a5e..11766d74f 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -89,7 +89,6 @@ impl FileBlockstore { // certain that the file is not used and we are moving the cursor back // to the correct place after the read. let mut inner = self.inner.lock().await; - let current_cursor_location = inner.store.stream_position().await?; // Move cursor to the location of the block inner @@ -101,11 +100,9 @@ impl FileBlockstore { let (block_cid, block_data) = read_block(&mut inner.store).await?; debug_assert_eq!(block_cid, cid); - // Move cursor back to the original position - inner - .store - .seek(SeekFrom::Start(current_cursor_location)) - .await?; + // Move cursor back to the position where we'll continue writing next blocks. + let writing_position = CarV2Header::SIZE + inner.data_size; + inner.store.seek(SeekFrom::Start(writing_position)).await?; return Ok(Some(block_data)); } From 5231fdf34b7822715bc8555b738ebe0343d11f18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 10:07:41 +0100 Subject: [PATCH 08/19] fix: pr suggestions --- mater/lib/src/stores/file.rs | 41 ++++++++++++++---------------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 11766d74f..5559dc274 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -41,16 +41,18 @@ struct FileBlockstoreInner { // The byte length of the CARv1 payload. This is used by the indexing, so we // know the locations of each blocks in the file. data_size: u64, - // Is true if the blockstore was finalized. - is_finalized: bool, } impl FileBlockstore { - /// Create a new blockstore. If file at the path already exists it is truncated. + /// Create a new blockstore. If file at the path already exists the error is thrown. pub async fn new

(path: P, roots: Vec) -> Result where P: AsRef, { + if roots.is_empty() { + return Err(crate::Error::EmptyRootsError); + } + let mut file = File::options() .create_new(true) .write(true) @@ -60,12 +62,11 @@ impl FileBlockstore { // Write headers v2::write_header(&mut file, &CarV2Header::default()).await?; - let written = v1::write_header(&mut file, &CarV1Header::new(roots.clone())).await?; + let written = v1::write_header(&mut file, &CarV1Header::new(roots)).await?; let inner = FileBlockstoreInner { store: file, data_size: written as u64, - is_finalized: false, }; Ok(Self { @@ -135,11 +136,6 @@ impl FileBlockstore { // Locked underlying file handler let mut inner = self.inner.lock().await; - // The blockstore was already finalized - if inner.is_finalized { - return Ok(()); - } - // Correct CARv2 header let header = CarV2Header { characteristics: Characteristics::EMPTY, @@ -152,7 +148,7 @@ impl FileBlockstore { inner.store.rewind().await?; v2::write_header(&mut inner.store, &header).await?; - // Flatten and write the index + // Write the index inner .store .seek(SeekFrom::Start(header.index_offset)) @@ -171,7 +167,6 @@ impl FileBlockstore { // Flush underlying writer inner.store.flush().await?; - inner.is_finalized = true; Ok(()) } @@ -211,7 +206,9 @@ impl blockstore::Blockstore for FileBlockstore { } async fn remove(&self, _cid: &CidGeneric) -> Result<(), blockstore::Error> { - unimplemented!("Operation not supported") + Err(blockstore::Error::FatalDatabaseError( + "remove operation not supported".to_string(), + )) } async fn close(self) -> Result<(), blockstore::Error> { @@ -231,7 +228,7 @@ mod tests { io::{AsyncReadExt, AsyncSeekExt}, }; - use crate::{CarV2Reader, Error, FileBlockstore}; + use crate::{CarV2Reader, FileBlockstore}; #[tokio::test] async fn file_exists() { @@ -244,12 +241,9 @@ mod tests { #[tokio::test] async fn test_blockstore() { // Car file - let mut file = - File::open(PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap()) - .await - .unwrap(); - let mut original_archive = Vec::new(); - file.read_to_end(&mut original_archive).await.unwrap(); + let original_archive = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); let mut reader = CarV2Reader::new(Cursor::new(original_archive.clone())); reader.read_pragma().await.unwrap(); @@ -264,7 +258,6 @@ mod tests { .unwrap(); loop { - // NOTE(@jmg-duarte,22/05/2024): review this match reader.read_block().await { Ok((cid, data)) => { // Add block to the store @@ -284,10 +277,8 @@ mod tests { break; } } - else_ => { - // With the length check above this branch should actually be unreachable - assert!(matches!(else_, Err(Error::IoError(_)))); - break; + _ => { + unreachable!(); } } } From cc8b8b43509d0c815dc700ddfaea1ae283dc6b36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 10:22:47 +0100 Subject: [PATCH 09/19] comments --- mater/lib/src/stores/file.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 5559dc274..1a491cbc4 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -75,6 +75,7 @@ impl FileBlockstore { }) } + /// Check if the store contains a block with the cid. async fn has(&self, cid: Cid) -> Result { Ok(self.index.read().await.get(&cid).is_some()) } @@ -130,8 +131,8 @@ impl FileBlockstore { Ok(()) } - /// Finalize this blockstore by writing the CARv2 header, along with - /// flattened index for more efficient subsequent read. + /// Finalize this blockstore by writing the CARv2 header, along with index + /// for more efficient subsequent read. async fn finalize(self) -> Result<(), Error> { // Locked underlying file handler let mut inner = self.inner.lock().await; From c68b14cbcb53e224159598de051cb61be83cd5b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 10:58:47 +0100 Subject: [PATCH 10/19] move index into the inner --- mater/lib/src/stores/file.rs | 54 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 1a491cbc4..5372bbd92 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -6,7 +6,7 @@ use sha2::{Digest, Sha256}; use tokio::{ fs::File, io::{AsyncSeekExt, AsyncWriteExt, BufWriter}, - sync::{Mutex, RwLock}, + sync::RwLock, }; use crate::{ @@ -26,12 +26,7 @@ use crate::{ /// closing the blockstore, the index is written out to underlying file. pub struct FileBlockstore { // Inner store - inner: Mutex, - // Index of blocks that will be appended to the file at the finalization. - // Stored number is an offset that locates the first byte of the block - // within the CARv1 payload. The offset is relative to the start of the - // CARv1 payload. - index: RwLock>, + inner: RwLock, } /// Inner file store. Encapsulating state that is locked and used together. @@ -41,6 +36,11 @@ struct FileBlockstoreInner { // The byte length of the CARv1 payload. This is used by the indexing, so we // know the locations of each blocks in the file. data_size: u64, + // Index of blocks that will be appended to the file at the finalization. + // Stored number is an offset that locates the first byte of the block + // within the CARv1 payload. The offset is relative to the start of the + // CARv1 payload. + index: IndexMap, } impl FileBlockstore { @@ -67,30 +67,30 @@ impl FileBlockstore { let inner = FileBlockstoreInner { store: file, data_size: written as u64, + index: IndexMap::new(), }; Ok(Self { - inner: Mutex::new(inner), - index: RwLock::new(IndexMap::new()), + inner: RwLock::new(inner), }) } /// Check if the store contains a block with the cid. async fn has(&self, cid: Cid) -> Result { - Ok(self.index.read().await.get(&cid).is_some()) + Ok(self.inner.read().await.index.get(&cid).is_some()) } /// Get specific block from the store async fn get(&self, cid: Cid) -> Result>, Error> { - // Get the index if exists - let Some(index) = self.index.read().await.get(&cid).copied() else { - return Ok(None); - }; - // The lock is hold through out the method execution. That way we are // certain that the file is not used and we are moving the cursor back // to the correct place after the read. - let mut inner = self.inner.lock().await; + let mut inner = self.inner.write().await; + + // Get the index if exists + let Some(index) = inner.index.get(&cid).copied() else { + return Ok(None); + }; // Move cursor to the location of the block inner @@ -98,7 +98,7 @@ impl FileBlockstore { .seek(SeekFrom::Start(CarV2Header::SIZE + index)) .await?; - // Read block + // Read the lock let (block_cid, block_data) = read_block(&mut inner.store).await?; debug_assert_eq!(block_cid, cid); @@ -109,10 +109,10 @@ impl FileBlockstore { return Ok(Some(block_data)); } - /// Put a new block in the store + /// Put the new block in the store async fn put(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { - // Lock writer - let mut inner = self.inner.lock().await; + // The lock is hold through out the method execution + let mut inner = self.inner.write().await; // This is a current position of the writer. We save this to the indexer // so that we know where we wrote the current block. @@ -126,16 +126,16 @@ impl FileBlockstore { inner.data_size += written as u64; // Add current block to the index - self.index.write().await.insert(*cid, index_location); + inner.index.insert(*cid, index_location); Ok(()) } - /// Finalize this blockstore by writing the CARv2 header, along with index + /// Finalize the blockstore by writing the CARv2 header, along with index /// for more efficient subsequent read. async fn finalize(self) -> Result<(), Error> { - // Locked underlying file handler - let mut inner = self.inner.lock().await; + // Owned inner value + let mut inner = self.inner.into_inner(); // Correct CARv2 header let header = CarV2Header { @@ -154,9 +154,9 @@ impl FileBlockstore { .store .seek(SeekFrom::Start(header.index_offset)) .await?; - let index = self.index.read().await.clone(); - let count = index.len() as u64; - let entries = index + let count = inner.index.len() as u64; + let entries = inner + .index .into_iter() .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) .collect(); From 1159d4462949a5823fd0cd208caff58b84f9e938 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 10:59:42 +0100 Subject: [PATCH 11/19] small change --- mater/lib/src/stores/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 5372bbd92..7724928e8 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -279,7 +279,7 @@ mod tests { } } _ => { - unreachable!(); + unreachable!("the length check should avoid this from being reached"); } } } From 8d5650bc61248cbf5cde9887b0db86ea0abcc693 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 12:35:03 +0100 Subject: [PATCH 12/19] ignore identity cids --- mater/lib/src/multicodec.rs | 10 ++- mater/lib/src/stores/file.rs | 162 ++++++++++++++++++++++++----------- 2 files changed, 120 insertions(+), 52 deletions(-) diff --git a/mater/lib/src/multicodec.rs b/mater/lib/src/multicodec.rs index 34a37f4fc..5b8add0a7 100644 --- a/mater/lib/src/multicodec.rs +++ b/mater/lib/src/multicodec.rs @@ -2,13 +2,16 @@ //! as per the [code table](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv). use digest::Digest; -use ipld_core::cid::multihash::Multihash; +use ipld_core::cid::{multihash::Multihash, CidGeneric}; pub const SHA_256_CODE: u64 = 0x12; pub const SHA_512_CODE: u64 = 0x13; pub const RAW_CODE: u64 = 0x55; pub const DAG_PB_CODE: u64 = 0x70; +/// The IDENTITY multicodec code +pub const IDENTITY_CODE: u64 = 0x00; + /// Trait to ease implementing generic multihash generation. pub(crate) trait MultihashCode { /// Multihash code as defined in the [specification](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv). @@ -35,3 +38,8 @@ where Multihash::wrap(H::CODE, &hashed_bytes) .expect("the digest should be valid (enforced by the type system)") } + +// Returns Some(data) if the CID is an identity. If not, None is returned. +pub fn is_identity(cid: &CidGeneric) -> Option<&[u8]> { + (cid.hash().code() == IDENTITY_CODE).then_some(cid.hash().digest()) +} diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 7724928e8..b0ef7c860 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -10,7 +10,7 @@ use tokio::{ }; use crate::{ - multicodec::SHA_256_CODE, + multicodec::{is_identity, SHA_256_CODE}, v1::{self, read_block, write_block}, v2::{self}, CarV1Header, CarV2Header, Characteristics, Error, Index, IndexEntry, MultihashIndexSorted, @@ -22,6 +22,8 @@ use crate::{ /// blocks are written immediately, while the index is stored in memory and /// updated incrementally. /// +/// The identity CIDs are not stored. +/// /// The blockstore should be closed once the putting blocks is finished. Upon /// closing the blockstore, the index is written out to underlying file. pub struct FileBlockstore { @@ -75,13 +77,24 @@ impl FileBlockstore { }) } - /// Check if the store contains a block with the cid. + /// Check if the store contains a block with the cid. In case of IDENTITY + /// CID it always returns true. async fn has(&self, cid: Cid) -> Result { + if is_identity(&cid).is_some() { + return Ok(true); + } + Ok(self.inner.read().await.index.get(&cid).is_some()) } - /// Get specific block from the store + /// Get specific block from the store. If the CID is an identity, the digest + /// from the cid is returned. async fn get(&self, cid: Cid) -> Result>, Error> { + // If CID is an identity + if let Some(data) = is_identity(&cid) { + return Ok(Some(data.to_owned())); + } + // The lock is hold through out the method execution. That way we are // certain that the file is not used and we are moving the cursor back // to the correct place after the read. @@ -109,8 +122,14 @@ impl FileBlockstore { return Ok(Some(block_data)); } - /// Put the new block in the store - async fn put(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { + /// Put the new block in the store. The data integrity is not checked. We + /// expect that the CID correctly represents the data being passed. In case + /// of the identity CID, nothing is written to the store. + async fn put_keyed(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { + if is_identity(&cid).is_some() { + return Ok(()); + } + // The lock is hold through out the method execution let mut inner = self.inner.write().await; @@ -174,48 +193,52 @@ impl FileBlockstore { } #[cfg(feature = "blockstore")] -impl blockstore::Blockstore for FileBlockstore { - async fn get( - &self, - cid: &CidGeneric, - ) -> Result>, blockstore::Error> { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; - - self.get(cid) - .await - .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) - } +mod blockstore { + use blockstore::{Blockstore, Error}; + use ipld_core::cid::{Cid, CidGeneric}; - async fn has(&self, cid: &CidGeneric) -> blockstore::Result { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; + use crate::FileBlockstore; - self.has(cid) - .await - .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) - } + impl Blockstore for FileBlockstore { + async fn get(&self, cid: &CidGeneric) -> Result>, Error> { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; - async fn put_keyed( - &self, - cid: &CidGeneric, - data: &[u8], - ) -> Result<(), blockstore::Error> { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?; + self.get(cid) + .await + .map_err(|err| Error::FatalDatabaseError(err.to_string())) + } - self.put(&cid, data) - .await - .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) - } + async fn has(&self, cid: &CidGeneric) -> blockstore::Result { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; - async fn remove(&self, _cid: &CidGeneric) -> Result<(), blockstore::Error> { - Err(blockstore::Error::FatalDatabaseError( - "remove operation not supported".to_string(), - )) - } + self.has(cid) + .await + .map_err(|err| Error::FatalDatabaseError(err.to_string())) + } - async fn close(self) -> Result<(), blockstore::Error> { - self.finalize() - .await - .map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string())) + async fn put_keyed( + &self, + cid: &CidGeneric, + data: &[u8], + ) -> Result<(), Error> { + let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; + + self.put_keyed(&cid, data) + .await + .map_err(|err| Error::FatalDatabaseError(err.to_string())) + } + + async fn remove(&self, _cid: &CidGeneric) -> Result<(), Error> { + Err(Error::FatalDatabaseError( + "remove operation not supported".to_string(), + )) + } + + async fn close(self) -> Result<(), Error> { + self.finalize() + .await + .map_err(|err| Error::FatalDatabaseError(err.to_string())) + } } } @@ -223,22 +246,61 @@ impl blockstore::Blockstore for FileBlockstore { mod tests { use std::{io::Cursor, path::PathBuf, str::FromStr}; + use ipld_core::cid::{multihash::Multihash, Cid}; use tempfile::TempDir; use tokio::{ fs::File, io::{AsyncReadExt, AsyncSeekExt}, }; - use crate::{CarV2Reader, FileBlockstore}; + use crate::{ + multicodec::{IDENTITY_CODE, RAW_CODE}, + CarV2Reader, Error, FileBlockstore, + }; + + /// Initialize a new blockstore + async fn init_blockstore(roots: Vec) -> Result<(TempDir, PathBuf, FileBlockstore), Error> { + let tmp_dir = TempDir::new().unwrap(); + let blockstore_file_path = tmp_dir.path().join("blockstore.car"); + let blockstore = FileBlockstore::new(&blockstore_file_path, roots).await?; + + Ok((tmp_dir, blockstore_file_path, blockstore)) + } #[tokio::test] - async fn file_exists() { + async fn test_file_exists_error() { let existing_path = PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap(); let blockstore = FileBlockstore::new(&existing_path, vec![]).await; assert!(blockstore.is_err()); } + #[tokio::test] + async fn test_no_roots_error() { + let tmp_dir = TempDir::new().unwrap(); + let blockstore_file_path = tmp_dir.path().join("blockstore.car"); + let blockstore = FileBlockstore::new(&blockstore_file_path, vec![]).await; + + assert!(blockstore.is_err()); + } + + #[tokio::test] + async fn test_identity_cid() { + let arbitrary_root_cid = + Cid::from_str("bafkreiczsrdrvoybcevpzqmblh3my5fu6ui3tgag3jm3hsxvvhaxhswpyu").unwrap(); + let (_guard, _file, blockstore) = init_blockstore(vec![arbitrary_root_cid]).await.unwrap(); + + let payload = b"Hello World!"; + let multihash = Multihash::wrap(IDENTITY_CODE, payload).unwrap(); + let identity_cid = Cid::new_v1(RAW_CODE, multihash); + + let has_block = blockstore.has(identity_cid).await.unwrap(); + assert!(has_block); + + let content = blockstore.get(identity_cid).await.unwrap().unwrap(); + assert_eq!(payload, content.as_slice()); + } + #[tokio::test] async fn test_blockstore() { // Car file @@ -249,20 +311,15 @@ mod tests { let mut reader = CarV2Reader::new(Cursor::new(original_archive.clone())); reader.read_pragma().await.unwrap(); let header = reader.read_header().await.unwrap(); - let v1_header = reader.read_v1_header().await.unwrap(); - let tmp_dir = TempDir::new().unwrap(); - let blockstore_file_path = tmp_dir.path().join("blockstore.car"); - let blockstore = FileBlockstore::new(&blockstore_file_path, v1_header.roots) - .await - .unwrap(); + let (_guard, blockstore_file, blockstore) = init_blockstore(v1_header.roots).await.unwrap(); loop { match reader.read_block().await { Ok((cid, data)) => { // Add block to the store - blockstore.put(&cid, &data).await.unwrap(); + blockstore.put_keyed(&cid, &data).await.unwrap(); // Check if the blockstore has a new block assert!(blockstore.has(cid).await.unwrap()); @@ -288,11 +345,14 @@ mod tests { blockstore.finalize().await.unwrap(); // Load new archive file to memory - let mut file = File::open(blockstore_file_path).await.unwrap(); + let mut file = File::open(blockstore_file).await.unwrap(); let mut new_archive = Vec::new(); file.read_to_end(&mut new_archive).await.unwrap(); // Compare both files assert_eq!(original_archive, new_archive); } + + #[tokio::test] + async fn test_multiple() {} } From f6001eab26dd2c0ea6e67ac564fc265a986d9396 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Fri, 3 Jan 2025 13:42:58 +0100 Subject: [PATCH 13/19] remove empty test --- mater/lib/src/stores/file.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index b0ef7c860..36b5eea9f 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -352,7 +352,4 @@ mod tests { // Compare both files assert_eq!(original_archive, new_archive); } - - #[tokio::test] - async fn test_multiple() {} } From 3a090c863b4d608ef2c0b6486275bb070319e13d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Mon, 6 Jan 2025 14:37:17 +0100 Subject: [PATCH 14/19] feat: load existing archive --- mater/lib/src/lib.rs | 4 + mater/lib/src/stores/blockstore.rs | 8 +- mater/lib/src/stores/file.rs | 291 ++++++++++++++++++++++------- mater/lib/src/v2/index.rs | 43 ++++- mater/lib/src/v2/reader.rs | 18 +- 5 files changed, 281 insertions(+), 83 deletions(-) diff --git a/mater/lib/src/lib.rs b/mater/lib/src/lib.rs index 0c1d1b214..1bce95ecb 100644 --- a/mater/lib/src/lib.rs +++ b/mater/lib/src/lib.rs @@ -47,6 +47,10 @@ pub enum Error { #[error("CAR file must have roots")] EmptyRootsError, + /// Returned when the number of roots is wrong. + #[error("Wrong number of roots")] + WrongNumberOfRoots, + /// Unknown type of index. Supported indexes are /// [`IndexSorted`] and [`MultihashIndexSorted`]. #[error("unknown index type {0}")] diff --git a/mater/lib/src/stores/blockstore.rs b/mater/lib/src/stores/blockstore.rs index 16d016ddc..3b6bb66f3 100644 --- a/mater/lib/src/stores/blockstore.rs +++ b/mater/lib/src/stores/blockstore.rs @@ -340,13 +340,13 @@ mod tests { match index { Index::MultihashIndexSorted(index) => { // There's only Sha256 - assert_eq!(index.0.len(), 1); + assert_eq!(index.len(), 1); - let index_sorted = &index.0[&SHA_256_CODE]; + let index_sorted = &index[&SHA_256_CODE]; // There's only a single length - assert_eq!(index_sorted.0.len(), 1); + assert_eq!(index_sorted.len(), 1); - let single_width_index = &index_sorted.0[0]; + let single_width_index = &index_sorted[0]; assert_eq!(single_width_index.count, 2); // Sha256 output size (32) + the offset size (8) assert_eq!(single_width_index.width, Sha256::output_size() as u32 + 8); diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 36b5eea9f..57e02ec3a 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -27,22 +27,25 @@ use crate::{ /// The blockstore should be closed once the putting blocks is finished. Upon /// closing the blockstore, the index is written out to underlying file. pub struct FileBlockstore { - // Inner store + /// Original roots. + roots: Vec, + /// Inner store inner: RwLock, } /// Inner file store. Encapsulating state that is locked and used together. struct FileBlockstoreInner { - // Car file data store + /// Underlying data store in a car format store: File, - // The byte length of the CARv1 payload. This is used by the indexing, so we - // know the locations of each blocks in the file. + /// The byte length of the CARv1 payload. This is used by the indexing, so we + /// know the locations of each blocks in the file. data_size: u64, - // Index of blocks that will be appended to the file at the finalization. - // Stored number is an offset that locates the first byte of the block - // within the CARv1 payload. The offset is relative to the start of the - // CARv1 payload. - index: IndexMap, + /// Index of blocks part of this store. This index is meant to be fast for + /// the in memory lookups. The key represents a hash digest of the data + /// block. Value is an offset that locates the first byte of the block + /// within the CARv1 payload. The offset is relative to the start of the + /// CARv1 payload. + index: IndexMap, u64>, } impl FileBlockstore { @@ -64,7 +67,7 @@ impl FileBlockstore { // Write headers v2::write_header(&mut file, &CarV2Header::default()).await?; - let written = v1::write_header(&mut file, &CarV1Header::new(roots)).await?; + let written = v1::write_header(&mut file, &CarV1Header::new(roots.clone())).await?; let inner = FileBlockstoreInner { store: file, @@ -74,22 +77,78 @@ impl FileBlockstore { Ok(Self { inner: RwLock::new(inner), + roots, + }) + } + + /// Initialize the blockstore from the existing archive file. Error is + /// thrown if the archive can't be read. + /// + /// Note: The underlying store is opened in read only mode. That means the + /// returned blockstore can only be used to read an existing blocks. + pub async fn from_existing

(path: P) -> Result + where + P: AsRef, + { + let file = File::open(&path).await?; + let mut reader = v2::Reader::new(file); + + // Read the headers + reader.read_pragma().await?; + let v2_header = reader.read_header().await?; + let v1_header = reader.read_v1_header().await?; + + // This blockstore expects index to be used + if v2_header.index_offset == 0 { + return Err(Error::EmptyIndexError); + } + + // Read the index + let inner = reader.get_inner_mut(); + inner.seek(SeekFrom::Start(v2_header.index_offset)).await?; + + let mut index_map = IndexMap::new(); + match reader.read_index().await? { + Index::IndexSorted(index) => { + index.into_iter().flat_map(|a| a.entries).for_each(|index| { + index_map.insert(index.digest, index.offset); + }); + } + Index::MultihashIndexSorted(index) => { + index + .into_iter() + .flat_map(|(_, index_sorted)| index_sorted.into_iter()) + .flat_map(|a| a.entries) + .for_each(|index| { + index_map.insert(index.digest, index.offset); + }); + } + } + + Ok(Self { + roots: v1_header.roots, + inner: RwLock::new(FileBlockstoreInner { + store: File::open(path).await?, + data_size: v2_header.data_size, + index: index_map, + }), }) } /// Check if the store contains a block with the cid. In case of IDENTITY /// CID it always returns true. - async fn has(&self, cid: Cid) -> Result { + pub async fn has(&self, cid: Cid) -> Result { if is_identity(&cid).is_some() { return Ok(true); } - Ok(self.inner.read().await.index.get(&cid).is_some()) + let digest = cid.hash().digest(); + Ok(self.inner.read().await.index.get(digest).is_some()) } /// Get specific block from the store. If the CID is an identity, the digest /// from the cid is returned. - async fn get(&self, cid: Cid) -> Result>, Error> { + pub async fn get(&self, cid: Cid) -> Result>, Error> { // If CID is an identity if let Some(data) = is_identity(&cid) { return Ok(Some(data.to_owned())); @@ -101,7 +160,8 @@ impl FileBlockstore { let mut inner = self.inner.write().await; // Get the index if exists - let Some(index) = inner.index.get(&cid).copied() else { + let digest = cid.hash().digest(); + let Some(index) = inner.index.get(digest).copied() else { return Ok(None); }; @@ -125,7 +185,7 @@ impl FileBlockstore { /// Put the new block in the store. The data integrity is not checked. We /// expect that the CID correctly represents the data being passed. In case /// of the identity CID, nothing is written to the store. - async fn put_keyed(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { + pub async fn put_keyed(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { if is_identity(&cid).is_some() { return Ok(()); } @@ -136,7 +196,7 @@ impl FileBlockstore { // This is a current position of the writer. We save this to the indexer // so that we know where we wrote the current block. let current_position = inner.store.stream_position().await?; - let index_location = current_position - CarV2Header::SIZE; + let index_location = current_position.saturating_sub(CarV2Header::SIZE); // Write block let mut buffered_writer = BufWriter::new(&mut inner.store); @@ -145,14 +205,16 @@ impl FileBlockstore { inner.data_size += written as u64; // Add current block to the index - inner.index.insert(*cid, index_location); + let digest = cid.hash().digest(); + inner.index.insert(digest.to_vec(), index_location); Ok(()) } /// Finalize the blockstore by writing the CARv2 header, along with index - /// for more efficient subsequent read. - async fn finalize(self) -> Result<(), Error> { + /// for more efficient subsequent read. If roots are passed they overwrite + /// the ones used to initialize the store. + pub async fn finalize(self, new_roots: Option>) -> Result<(), Error> { // Owned inner value let mut inner = self.inner.into_inner(); @@ -168,6 +230,16 @@ impl FileBlockstore { inner.store.rewind().await?; v2::write_header(&mut inner.store, &header).await?; + // Overwrite CARv1 header if new roots were provided. + if let Some(new_roots) = new_roots { + // If the length is different we would overwrite part of the content. + if self.roots.len() != new_roots.len() { + return Err(Error::WrongNumberOfRoots); + } + + v1::write_header(&mut inner.store, &CarV1Header::new(new_roots)).await?; + } + // Write the index inner .store @@ -177,7 +249,7 @@ impl FileBlockstore { let entries = inner .index .into_iter() - .map(|(cid, offset)| IndexEntry::new(cid.hash().digest().to_vec(), offset as u64)) + .map(|(digest, offset)| IndexEntry::new(digest, offset)) .collect(); let index = Index::MultihashIndexSorted(MultihashIndexSorted::from_single_width( SHA_256_CODE, @@ -235,7 +307,7 @@ mod blockstore { } async fn close(self) -> Result<(), Error> { - self.finalize() + self.finalize(None) .await .map_err(|err| Error::FatalDatabaseError(err.to_string())) } @@ -244,9 +316,14 @@ mod blockstore { #[cfg(test)] mod tests { - use std::{io::Cursor, path::PathBuf, str::FromStr}; + use std::{ + path::{Path, PathBuf}, + str::FromStr, + sync::Arc, + }; use ipld_core::cid::{multihash::Multihash, Cid}; + use sha2::Sha256; use tempfile::TempDir; use tokio::{ fs::File, @@ -254,7 +331,8 @@ mod tests { }; use crate::{ - multicodec::{IDENTITY_CODE, RAW_CODE}, + multicodec::{generate_multihash, IDENTITY_CODE, RAW_CODE}, + test_utils::assert_buffer_eq, CarV2Reader, Error, FileBlockstore, }; @@ -267,6 +345,50 @@ mod tests { Ok((tmp_dir, blockstore_file_path, blockstore)) } + /// Load blockstore from the existing archive. + async fn load_from_existing_archive

( + path: P, + ) -> Result<(TempDir, PathBuf, FileBlockstore), Error> + where + P: AsRef, + { + let file = File::open(path).await.unwrap(); + let mut reader = CarV2Reader::new(file); + reader.read_pragma().await.unwrap(); + let header = reader.read_header().await?; + let v1_header = reader.read_v1_header().await?; + + let (guard, blockstore_file, blockstore) = init_blockstore(v1_header.roots).await?; + + loop { + match reader.read_block().await { + Ok((cid, data)) => { + // Add block to the store + blockstore.put_keyed(&cid, &data).await.unwrap(); + + // Check if the blockstore has a new block + assert!(blockstore.has(cid).await.unwrap()); + + // Get the same block back and check if it's the same + let block = blockstore.get(cid).await.unwrap().unwrap(); + assert_eq!(block, data); + + // Kinda hacky, but better than doing a seek later on + let position = reader.get_inner_mut().stream_position().await.unwrap(); + let data_end = header.data_offset + header.data_size; + if position >= data_end { + break; + } + } + _ => { + unreachable!("the length check should avoid this from being reached"); + } + } + } + + Ok((guard, blockstore_file, blockstore)) + } + #[tokio::test] async fn test_file_exists_error() { let existing_path = PathBuf::from_str("tests/fixtures/car_v2/spaceglenda.car").unwrap(); @@ -298,58 +420,99 @@ mod tests { assert!(has_block); let content = blockstore.get(identity_cid).await.unwrap().unwrap(); - assert_eq!(payload, content.as_slice()); + assert_buffer_eq!(&payload, &content); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 3)] + async fn test_parallel_readers() { + let (_guard, _file, blockstore) = + load_from_existing_archive("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); + let blockstore = Arc::new(blockstore); + + // CIDs of the content blocks that the spaceglenda.car contains. We are + // only looking at the raw content so that our validation is easier later. + let cids = vec![ + Cid::from_str("bafkreic6kcrue6ms42ykrisq6or24pbrubnyouvmgvk7ft73fjd4ynslxi").unwrap(), + Cid::from_str("bafkreicvuc5rwwjqzix7saaia55du44qqsnphdugvjxlbe446mjmupekl4").unwrap(), + Cid::from_str("bafkreiepxrkqexuff4vhc4vp6co73ubbp2vmskbwwazaihln6wws2z4wly").unwrap(), + ]; + + // Request many blocks + let handles = (0..100) + .into_iter() + .map(|i| { + let requested = cids[i % cids.len()]; + tokio::spawn({ + let blockstore = Arc::clone(&blockstore); + async move { (requested, blockstore.get(requested).await.unwrap().unwrap()) } + }) + }) + .collect::>(); + + // Validate if the blocks received are correct + for handle in handles { + let (requested_cid, block_bytes) = handle.await.expect("Panic in task"); + + // Generate the CID form the bytes. That way we can check if the + // block data returned is correct. + let multihash = generate_multihash::(&block_bytes); + let generated_cid = Cid::new_v1(RAW_CODE, multihash); + + assert_eq!(requested_cid, generated_cid); + } } #[tokio::test] - async fn test_blockstore() { - // Car file - let original_archive = tokio::fs::read("tests/fixtures/car_v2/spaceglenda.car") - .await - .unwrap(); + async fn test_blockstore_finalization() { + let original_archive_path = "tests/fixtures/car_v2/spaceglenda.car"; + let original_archive = tokio::fs::read(original_archive_path).await.unwrap(); - let mut reader = CarV2Reader::new(Cursor::new(original_archive.clone())); - reader.read_pragma().await.unwrap(); - let header = reader.read_header().await.unwrap(); - let v1_header = reader.read_v1_header().await.unwrap(); + let (_guard, blockstore_file, blockstore) = + load_from_existing_archive(original_archive_path) + .await + .unwrap(); - let (_guard, blockstore_file, blockstore) = init_blockstore(v1_header.roots).await.unwrap(); + // We are finalizing the blockstore so that the correct index and + // headers are written out. + blockstore.finalize(None).await.unwrap(); - loop { - match reader.read_block().await { - Ok((cid, data)) => { - // Add block to the store - blockstore.put_keyed(&cid, &data).await.unwrap(); + // Load new archive file to memory + let mut file = File::open(blockstore_file).await.unwrap(); + let mut new_archive = Vec::new(); + file.read_to_end(&mut new_archive).await.unwrap(); - // Check if the blockstore has a new block - assert!(blockstore.has(cid).await.unwrap()); + // Compare both contents + assert_buffer_eq!(&original_archive, &new_archive); + } - // Get the same block back and check if it's the same - let block = blockstore.get(cid).await.unwrap().unwrap(); - assert_eq!(block, data); + #[tokio::test] + async fn test_blockstore_from_existing() { + // Loaded blockstore + let blockstore = FileBlockstore::from_existing("tests/fixtures/car_v2/spaceglenda.car") + .await + .unwrap(); - // Kinda hacky, but better than doing a seek later on - let position = reader.get_inner_mut().stream_position().await.unwrap(); - let data_end = header.data_offset + header.data_size; - if position >= data_end { - break; - } - } - _ => { - unreachable!("the length check should avoid this from being reached"); - } - } - } + // Writing a new block should fail because the Blockstore is in ready + // only mode. + let payload = b"Hello World!"; + let cid = Cid::new_v1(RAW_CODE, generate_multihash::(payload)); + let writing_result = blockstore.put_keyed(&cid, payload).await; + assert!(writing_result.is_err()); - // Finalize blockstore - blockstore.finalize().await.unwrap(); + // Blockstore should have a block + let request_cid = + Cid::from_str("bafkreic6kcrue6ms42ykrisq6or24pbrubnyouvmgvk7ft73fjd4ynslxi").unwrap(); + assert!(blockstore.has(request_cid).await.unwrap()); - // Load new archive file to memory - let mut file = File::open(blockstore_file).await.unwrap(); - let mut new_archive = Vec::new(); - file.read_to_end(&mut new_archive).await.unwrap(); + // We should be able to get a block + let reading_result = blockstore.get(request_cid).await.unwrap().unwrap(); + + let generated_cid = Cid::new_v1(RAW_CODE, generate_multihash::(reading_result)); + assert_eq!(request_cid, generated_cid); - // Compare both files - assert_eq!(original_archive, new_archive); + // Finalization should fail + assert!(blockstore.finalize(None).await.is_err()); } } diff --git a/mater/lib/src/v2/index.rs b/mater/lib/src/v2/index.rs index 92822984b..56e942290 100644 --- a/mater/lib/src/v2/index.rs +++ b/mater/lib/src/v2/index.rs @@ -1,4 +1,4 @@ -use std::{collections::BTreeMap, mem::size_of}; +use std::{collections::BTreeMap, mem::size_of, ops::Deref}; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; @@ -108,7 +108,24 @@ impl TryFrom> for SingleWidthIndex { /// /// For more details, read the [`Format 0x0400: IndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0400-indexsorted) section in the CARv2 specification. #[derive(Debug, PartialEq, Eq)] -pub struct IndexSorted(pub Vec); +pub struct IndexSorted(Vec); + +impl Deref for IndexSorted { + type Target = Vec; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl IntoIterator for IndexSorted { + type Item = SingleWidthIndex; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} impl From for IndexSorted { fn from(value: IndexEntry) -> Self { @@ -132,10 +149,7 @@ impl From> for IndexSorted { /// /// For more details, read the [`Format 0x0401: MultihashIndexSorted`](https://ipld.io/specs/transport/car/carv2/#format-0x0401-multihashindexsorted) section in the CARv2 specification. #[derive(Debug, PartialEq, Eq)] -pub struct MultihashIndexSorted( - // NOTE(@jmg-duarte,21/05/2024): maybe we should implement Deref where Deref::Target = BTreeMap? - pub BTreeMap, -); +pub struct MultihashIndexSorted(BTreeMap); impl MultihashIndexSorted { /// Create a [`MultihashIndexSorted`] from a [digest code](https://github.com/multiformats/multicodec/blob/c954a787dc6a17d099653e5f90d26fbd177d2074/table.csv) and an [`IndexSorted`]. @@ -146,6 +160,23 @@ impl MultihashIndexSorted { } } +impl Deref for MultihashIndexSorted { + type Target = BTreeMap; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl IntoIterator for MultihashIndexSorted { + type Item = (u64, IndexSorted); + type IntoIter = std::collections::btree_map::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + impl From> for MultihashIndexSorted { fn from(value: BTreeMap) -> Self { Self(value) diff --git a/mater/lib/src/v2/reader.rs b/mater/lib/src/v2/reader.rs index b4e7db1d3..caa6d689f 100644 --- a/mater/lib/src/v2/reader.rs +++ b/mater/lib/src/v2/reader.rs @@ -301,9 +301,9 @@ mod tests { let index = reader.read_index().await.unwrap(); assert!(matches!(index, Index::MultihashIndexSorted(_))); if let Index::MultihashIndexSorted(mh) = index { - assert_eq!(mh.0.len(), 1); - assert!(mh.0.contains_key(&SHA_256_CODE)); - let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(mh.len(), 1); + assert!(mh.contains_key(&SHA_256_CODE)); + let fst = &mh[&SHA_256_CODE]; assert_eq!(fst.len(), 1); assert_eq!(fst[0].count, 1); assert_eq!(fst[0].width, 40); @@ -357,9 +357,9 @@ mod tests { let index = reader.read_index().await.unwrap(); assert!(matches!(index, Index::MultihashIndexSorted(_))); if let Index::MultihashIndexSorted(mh) = index { - assert_eq!(mh.0.len(), 1); - assert!(mh.0.contains_key(&SHA_256_CODE)); - let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(mh.len(), 1); + assert!(mh.contains_key(&SHA_256_CODE)); + let fst = &mh[&SHA_256_CODE]; assert_eq!(fst.len(), 1); assert_eq!(fst[0].count, 1); assert_eq!(fst[0].width, 40); @@ -416,9 +416,9 @@ mod tests { let index = reader.read_index().await.unwrap(); assert!(matches!(index, Index::MultihashIndexSorted(_))); if let Index::MultihashIndexSorted(mh) = index { - assert_eq!(mh.0.len(), 1); - assert!(mh.0.contains_key(&SHA_256_CODE)); - let fst = &mh.0[&SHA_256_CODE].0; + assert_eq!(mh.len(), 1); + assert!(mh.contains_key(&SHA_256_CODE)); + let fst = &mh[&SHA_256_CODE]; assert_eq!(fst.len(), 1); assert_eq!(fst[0].count, 4); assert_eq!(fst[0].width, 40); From 8791b3c501eb836173dfac52a271fe277bd738b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Mon, 6 Jan 2025 18:33:50 +0100 Subject: [PATCH 15/19] pr suggestions --- mater/lib/src/multicodec.rs | 4 ++-- mater/lib/src/stores/file.rs | 15 +++++++-------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/mater/lib/src/multicodec.rs b/mater/lib/src/multicodec.rs index 5b8add0a7..8d06402c4 100644 --- a/mater/lib/src/multicodec.rs +++ b/mater/lib/src/multicodec.rs @@ -40,6 +40,6 @@ where } // Returns Some(data) if the CID is an identity. If not, None is returned. -pub fn is_identity(cid: &CidGeneric) -> Option<&[u8]> { - (cid.hash().code() == IDENTITY_CODE).then_some(cid.hash().digest()) +pub fn get_identity_data(cid: &CidGeneric) -> Option<&[u8]> { + (cid.hash().code() == IDENTITY_CODE).then(|| cid.hash().digest()) } diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 57e02ec3a..fea9cdf41 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -10,7 +10,7 @@ use tokio::{ }; use crate::{ - multicodec::{is_identity, SHA_256_CODE}, + multicodec::{get_identity_data, SHA_256_CODE}, v1::{self, read_block, write_block}, v2::{self}, CarV1Header, CarV2Header, Characteristics, Error, Index, IndexEntry, MultihashIndexSorted, @@ -35,7 +35,7 @@ pub struct FileBlockstore { /// Inner file store. Encapsulating state that is locked and used together. struct FileBlockstoreInner { - /// Underlying data store in a car format + /// Underlying data store in a CAR format store: File, /// The byte length of the CARv1 payload. This is used by the indexing, so we /// know the locations of each blocks in the file. @@ -138,7 +138,7 @@ impl FileBlockstore { /// Check if the store contains a block with the cid. In case of IDENTITY /// CID it always returns true. pub async fn has(&self, cid: Cid) -> Result { - if is_identity(&cid).is_some() { + if get_identity_data(&cid).is_some() { return Ok(true); } @@ -150,11 +150,11 @@ impl FileBlockstore { /// from the cid is returned. pub async fn get(&self, cid: Cid) -> Result>, Error> { // If CID is an identity - if let Some(data) = is_identity(&cid) { + if let Some(data) = get_identity_data(&cid) { return Ok(Some(data.to_owned())); } - // The lock is hold through out the method execution. That way we are + // The lock is held throughout the method execution. That way we are // certain that the file is not used and we are moving the cursor back // to the correct place after the read. let mut inner = self.inner.write().await; @@ -186,7 +186,7 @@ impl FileBlockstore { /// expect that the CID correctly represents the data being passed. In case /// of the identity CID, nothing is written to the store. pub async fn put_keyed(&self, cid: &Cid, data: &[u8]) -> Result<(), Error> { - if is_identity(&cid).is_some() { + if get_identity_data(&cid).is_some() { return Ok(()); } @@ -494,8 +494,7 @@ mod tests { .await .unwrap(); - // Writing a new block should fail because the Blockstore is in ready - // only mode. + // Writing a new block should fail because the Blockstore is in read only mode. let payload = b"Hello World!"; let cid = Cid::new_v1(RAW_CODE, generate_multihash::(payload)); let writing_result = blockstore.put_keyed(&cid, payload).await; From 2bb1f78cf80036ba6be3859ed95f8e5a32f3f965 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 9 Jan 2025 13:54:01 +0100 Subject: [PATCH 16/19] fix: multihash resize --- mater/lib/src/stores/file.rs | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index fea9cdf41..0aff62863 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -266,14 +266,14 @@ impl FileBlockstore { #[cfg(feature = "blockstore")] mod blockstore { - use blockstore::{Blockstore, Error}; + use blockstore::{block::CidError, Blockstore, Error}; use ipld_core::cid::{Cid, CidGeneric}; use crate::FileBlockstore; impl Blockstore for FileBlockstore { async fn get(&self, cid: &CidGeneric) -> Result>, Error> { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; + let cid = to_blockstore_cid(cid)?; self.get(cid) .await @@ -281,7 +281,7 @@ mod blockstore { } async fn has(&self, cid: &CidGeneric) -> blockstore::Result { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; + let cid = to_blockstore_cid(cid)?; self.has(cid) .await @@ -293,7 +293,7 @@ mod blockstore { cid: &CidGeneric, data: &[u8], ) -> Result<(), Error> { - let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| Error::CidTooLarge)?; + let cid = to_blockstore_cid(cid)?; self.put_keyed(&cid, data) .await @@ -312,6 +312,18 @@ mod blockstore { .map_err(|err| Error::FatalDatabaseError(err.to_string())) } } + + /// Convert CID with the generic Multihash size to the CID with the specific + /// Multihash size that the underlying blockstore expects. + fn to_blockstore_cid(cid: &CidGeneric) -> Result { + let hash = cid.hash().resize::<64>().map_err(|err| { + Err(Error::CidError(CidError::InvalidMultihashLength( + cid.hash().size(), + ))) + }); + + Ok(Cid::new(cid.version(), cid.codec(), hash)) + } } #[cfg(test)] From b56d7a24ba67cdf509fa79d3b0eeb4d7f5b591ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 9 Jan 2025 13:57:54 +0100 Subject: [PATCH 17/19] return early in case of error --- mater/lib/src/stores/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index 0aff62863..c2cac5402 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -320,7 +320,7 @@ mod blockstore { Err(Error::CidError(CidError::InvalidMultihashLength( cid.hash().size(), ))) - }); + })?; Ok(Cid::new(cid.version(), cid.codec(), hash)) } From 3f35b6c79d9ab1fef1ac6dbb7353535e6a6e9112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 9 Jan 2025 14:01:38 +0100 Subject: [PATCH 18/19] small change --- mater/lib/src/stores/file.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index c2cac5402..d5f7cc414 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -316,9 +316,10 @@ mod blockstore { /// Convert CID with the generic Multihash size to the CID with the specific /// Multihash size that the underlying blockstore expects. fn to_blockstore_cid(cid: &CidGeneric) -> Result { + let digest_size = cid.hash().size(); let hash = cid.hash().resize::<64>().map_err(|err| { Err(Error::CidError(CidError::InvalidMultihashLength( - cid.hash().size(), + digest_size, ))) })?; From 3da71f405626d7cf065aa695e5dad84e27821c70 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rok=20=C4=8Cerni=C4=8D?= Date: Thu, 9 Jan 2025 14:11:16 +0100 Subject: [PATCH 19/19] fix: build --- mater/lib/src/stores/file.rs | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/mater/lib/src/stores/file.rs b/mater/lib/src/stores/file.rs index d5f7cc414..c6471929d 100644 --- a/mater/lib/src/stores/file.rs +++ b/mater/lib/src/stores/file.rs @@ -316,14 +316,13 @@ mod blockstore { /// Convert CID with the generic Multihash size to the CID with the specific /// Multihash size that the underlying blockstore expects. fn to_blockstore_cid(cid: &CidGeneric) -> Result { - let digest_size = cid.hash().size(); - let hash = cid.hash().resize::<64>().map_err(|err| { - Err(Error::CidError(CidError::InvalidMultihashLength( - digest_size, - ))) - })?; - - Ok(Cid::new(cid.version(), cid.codec(), hash)) + let digest_size = cid.hash().size() as usize; + let hash = cid + .hash() + .resize::<64>() + .map_err(|_| Error::CidError(CidError::InvalidMultihashLength(digest_size)))?; + + Ok(Cid::new(cid.version(), cid.codec(), hash).expect("we know cid is correct here")) } }