Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: retrieval server & client setup #654

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
622 changes: 551 additions & 71 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"storage-provider/client",
"storage-provider/common",
"storage-provider/server",
"storage-retrieval",
"storage/polka-index",
"storagext/cli",
"storagext/lib",
Expand Down Expand Up @@ -51,8 +52,10 @@ async-stream = "0.3.6"
async-trait = "0.1.80"
axum = "0.7.5"
base64 = "0.22.1"
beetswap = "0.4.0"
bitflags = "2.5.0"
blake2b_simd = { version = "1.0.2", default-features = false }
blockstore = "0.7.1"
bls12_381 = "0.8"
bs58 = "0.5.1"
byteorder = "1.5.0"
Expand All @@ -78,6 +81,9 @@ ipld-core = "0.4.1"
ipld-dagpb = "0.2.1"
itertools = "0.13.0"
jsonrpsee = { version = "0.24.7" }
libp2p = "0.54.1"
libp2p-core = "0.42.0"
libp2p-swarm = "0.45.1"
log = { version = "0.4.21", default-features = false }
multihash-codetable = { version = "0.1.1", default-features = false }
num-bigint = { version = "0.4.5", default-features = false }
Expand Down Expand Up @@ -136,6 +142,7 @@ pallet-market = { path = "pallets/market", default-features = false }
pallet-proofs = { path = "pallets/proofs", default-features = false }
pallet-randomness = { path = "pallets/randomness", default-features = false }
pallet-storage-provider = { path = "pallets/storage-provider", default-features = false }
polka-index = { path = "storage/polka-index" }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it?

polka-storage-proofs = { path = "lib/polka-storage-proofs", default-features = false }
polka-storage-provider-common = { path = "storage-provider/common" }
polka-storage-runtime = { path = "runtime" }
Expand Down
1 change: 1 addition & 0 deletions mater/lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ version = "0.1.0"
[dependencies]
async-stream.workspace = true
bitflags.workspace = true
blockstore = { workspace = true }
byteorder = { workspace = true, features = ["i128"] }
bytes.workspace = true
digest.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion mater/lib/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ mod v2;

// We need to re-expose this because `read_block` returns `(Cid, Vec<u8>)`.
pub use ipld_core::cid::Cid;
pub use stores::{create_filestore, Blockstore, Config};
pub use stores::{create_filestore, Blockstore, Config, InMemory};
pub use v1::{Header as CarV1Header, Reader as CarV1Reader, Writer as CarV1Writer};
pub use v2::{
verify_cid, Characteristics, Header as CarV2Header, Index, IndexEntry, IndexSorted,
Expand Down
91 changes: 87 additions & 4 deletions mater/lib/src/stores/blockstore.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,89 @@
// NOTE(@jmg-duarte,28/05/2024): the blockstore can (and should) evolve to support other backends.
// At the time of writing, there is no need invest more time in it because the current PR(#25) is delayed enough.

use std::collections::{HashMap, HashSet};
use std::{
collections::{HashMap, HashSet},
sync::Mutex,
};

use bytes::Bytes;
use indexmap::IndexMap;
use integer_encoding::VarInt;
use ipld_core::cid::Cid;
use ipld_core::cid::{Cid, CidGeneric};
use sha2::{Digest, Sha256};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_stream::StreamExt;
use tokio_util::io::ReaderStream;

use super::{DEFAULT_BLOCK_SIZE, DEFAULT_TREE_WIDTH};
use crate::{
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Writer,
Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
multicodec::SHA_256_CODE, unixfs::stream_balanced_tree, CarV1Header, CarV2Header, CarV2Reader,
CarV2Writer, Error, Index, IndexEntry, MultihashIndexSorted, SingleWidthIndex,
};

/// Thread-safe in memory blockstore
pub struct InMemory(Mutex<Blockstore>);

impl From<Blockstore> for InMemory {
fn from(value: Blockstore) -> Self {
Self(Mutex::new(value))
}
}

impl InMemory {
/// Create a new InMemory blockstore
pub fn new() -> Self {
Self(Mutex::new(Blockstore::new()))
}
}

impl blockstore::Blockstore for InMemory {
async fn get<const S: usize>(
&self,
cid: &CidGeneric<S>,
) -> Result<Option<Vec<u8>>, blockstore::Error> {
// Convert to the cid that the store uses
let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?;

let inner_lock = self
.0
.lock()
.map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string()))?;

let block = inner_lock.blocks.get(&cid).map(|b| b.to_vec());

Ok(block)
}

async fn put_keyed<const S: usize>(
&self,
cid: &CidGeneric<S>,
data: &[u8],
) -> Result<(), blockstore::Error> {
// Convert to the cid that the store uses
let cid = Cid::try_from(cid.to_bytes()).map_err(|_err| blockstore::Error::CidTooLarge)?;

let mut inner_lock = self
.0
.lock()
.map_err(|err| blockstore::Error::FatalDatabaseError(err.to_string()))?;

inner_lock.blocks.insert(cid, Bytes::from(data.to_vec()));

Ok(())
}

async fn remove<const S: usize>(&self, _cid: &CidGeneric<S>) -> Result<(), blockstore::Error> {
// Remove is not supported
unreachable!()
}

async fn close(self) -> Result<(), blockstore::Error> {
// Nothing to do here
Ok(())
}
}

/// The [`Blockstore`] stores pairs of [`Cid`] and [`Bytes`] in memory.
///
/// The store will chunk data blocks into `chunk_size` and "gather" nodes in groups with at most `tree_width` children.
Expand Down Expand Up @@ -81,6 +147,23 @@ impl Blockstore {
}
}

/// Read the contents of a CARv2 file into the [`Blockstore`].
pub async fn read_car<R>(&mut self, mut reader: CarV2Reader<R>) -> Result<(), Error>
where
R: AsyncRead + Unpin,
{
reader.read_pragma().await?;
reader.read_header().await?;
let car_v1_header = reader.read_v1_header().await?;
self.root = car_v1_header.roots.first().cloned();

while let Ok((cid, data)) = reader.read_block().await {
self.insert(cid, data.into(), true);
}

Ok(())
}

/// Fully read the contents of an arbitrary `reader` into the [`Blockstore`],
/// converting the contents into a CARv2 file.
pub async fn read<R>(&mut self, reader: R) -> Result<(), Error>
Expand Down
2 changes: 1 addition & 1 deletion mater/lib/src/stores/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
mod blockstore;
mod filestore;

pub use blockstore::Blockstore;
pub use blockstore::{Blockstore, InMemory};
pub use filestore::create_filestore;

/// The default block size, as defined in
Expand Down
32 changes: 32 additions & 0 deletions storage-retrieval/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
[package]
authors.workspace = true
edition.workspace = true
homepage.workspace = true
license-file.workspace = true
name = "storage-retrieval"
repository.workspace = true
version = "0.1.0"

[lints]
workspace = true

[dependencies]
anyhow = { workspace = true }
beetswap = { workspace = true }
blockstore = { workspace = true }
cid = { workspace = true }
futures = { workspace = true }
libp2p = { workspace = true, features = ["macros", "noise", "tcp", "tokio", "yamux"] }
libp2p-core = { workspace = true }
libp2p-swarm = { workspace = true }
mater = { workspace = true }
polka-index = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "sync", "time"] }
tracing = { workspace = true }

[dev-dependencies]
# multihash = "0.19.3"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# multihash = "0.19.3"

multihash-codetable = { workspace = true, features = ["sha2"] }
tracing-appender = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Empty file added storage-retrieval/README.md
Empty file.
97 changes: 97 additions & 0 deletions storage-retrieval/examples/simple_retrieval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
use std::{str::FromStr, sync::Arc, time::Duration};

use anyhow::Result;
use blockstore::{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice, didn't know eiger had this library

block::{Block, CidError},
InMemoryBlockstore,
};
use cid::Cid;
use libp2p::Multiaddr;
use mater::{Blockstore, CarV2Reader, InMemory};
use multihash_codetable::{Code, MultihashDigest};
use storage_retrieval::{client::Client, server::Server};
use tokio::{fs::File, time::sleep};

#[tokio::main]
async fn main() -> Result<()> {
// Init tracing
let _guard = init_tracing();

// Setup indexer
// let indexer_path = temp_dir();
// let indexer = Arc::new(RocksDBLid::new(RocksDBStateStoreConfig {
// path: indexer_path,
// })?);

// TODO: Blocks should not be hold in memory. Implement blockstore that can
// source blocks directly from sectors on disk with the help of an index.
let file = File::open("./examples/test-data-big.car").await?;
let car_reader = CarV2Reader::new(file);
let mut blockstore = Blockstore::new();
blockstore.read_car(car_reader).await?;

let blockstore: InMemory = blockstore.into();

// let blockstore = Arc::new(InMemoryBlockstore::<64>::new());
// blockstore.put(StringBlock("12345".to_string())).await?;

// Setup server
let server = Server::new(Arc::new(blockstore))?;
let address: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;

tokio::spawn({
let address = address.clone();
async move {
let _ = server.run(vec![address]).await;
}
});

// TODO: Implement blockstore that persist blocks directly to disk as car file.
let blockstore = Arc::new(InMemoryBlockstore::<64>::new());
let client = Client::new(blockstore, vec![address])?;

// Payload cid of the car file we want to fetch
let payload_cid =
Cid::from_str("bafkreiechz74drg7tg5zswmxf4g2dnwhemlwdv7e3l5ypehdqdwaoyz3dy").unwrap();
// let payload_cid =
// Cid::from_str("bafkreiczsrdrvoybcevpzqmblh3my5fu6ui3tgag3jm3hsxvvhaxhswpyu").unwrap();
client
.download(payload_cid, sleep(Duration::from_secs(10)))
.await?;

Ok(())
}

struct StringBlock(pub String);

impl Block<64> for StringBlock {
fn cid(&self) -> Result<Cid, CidError> {
const RAW_CODEC: u64 = 0x55;
let hash = Code::Sha2_256.digest(self.0.as_ref());
Ok(Cid::new_v1(RAW_CODEC, hash))
}

fn data(&self) -> &[u8] {
self.0.as_ref()
}
}

fn init_tracing() -> tracing_appender::non_blocking::WorkerGuard {
let (non_blocking, guard) = tracing_appender::non_blocking(std::io::stdout());

let filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing_subscriber::filter::LevelFilter::INFO.into())
.from_env_lossy();

tracing_subscriber::fmt()
.event_format(
tracing_subscriber::fmt::format()
.with_file(true)
.with_line_number(true),
)
.with_env_filter(filter)
.with_writer(non_blocking)
.init();

guard
}
Loading