diff --git a/Cargo.lock b/Cargo.lock index 3140fa39b7..6b62a3a2c1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12519,12 +12519,15 @@ dependencies = [ name = "subspace-data-retrieval" version = "0.1.0" dependencies = [ + "async-lock 3.4.0", "async-trait", + "futures", "parity-scale-codec", "subspace-archiving", "subspace-core-primitives", "subspace-erasure-coding", "thiserror", + "tokio", "tracing", ] diff --git a/crates/subspace-archiving/src/reconstructor.rs b/crates/subspace-archiving/src/reconstructor.rs index de45fd6bbc..3a4f3f198f 100644 --- a/crates/subspace-archiving/src/reconstructor.rs +++ b/crates/subspace-archiving/src/reconstructor.rs @@ -72,15 +72,13 @@ impl Reconstructor { /// Given a set of pieces of a segment of the archived history (any half of all pieces are /// required to be present, the rest will be recovered automatically due to use of erasure - /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with - /// corresponding block numbers. + /// coding if needed), reconstructs and returns the segment itself. /// - /// It is possible to start with any segment, but when next segment is pushed, it needs to - /// follow the previous one or else error will be returned. - pub fn add_segment( - &mut self, + /// Does not modify the internal state of the reconstructor. + pub fn reconstruct_segment( + &self, segment_pieces: &[Option], - ) -> Result { + ) -> Result { let mut segment_data = RecordedHistorySegment::new_boxed(); if !segment_pieces @@ -151,9 +149,24 @@ impl Reconstructor { } } - let Segment::V0 { items } = - Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref())) - .map_err(ReconstructorError::SegmentDecoding)?; + let segment = Segment::decode(&mut AsRef::<[u8]>::as_ref(segment_data.as_ref())) + .map_err(ReconstructorError::SegmentDecoding)?; + + Ok(segment) + } + + /// Given a set of pieces of a segment of the archived history (any half of all pieces are + /// required to be present, the rest will be recovered automatically due to use of erasure + /// coding if needed), reconstructs and returns segment header and a list of encoded blocks with + /// corresponding block numbers. + /// + /// It is possible to start with any segment, but when next segment is pushed, it needs to + /// follow the previous one or else error will be returned. + pub fn add_segment( + &mut self, + segment_pieces: &[Option], + ) -> Result { + let Segment::V0 { items } = self.reconstruct_segment(segment_pieces)?; let mut reconstructed_contents = ReconstructedContents::default(); let mut next_block_number = 0; diff --git a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs index d7faaab027..8d719ef4d3 100644 --- a/crates/subspace-service/src/sync_from_dsn/import_blocks.rs +++ b/crates/subspace-service/src/sync_from_dsn/import_blocks.rs @@ -234,6 +234,7 @@ where Ok(imported_blocks) } +/// Downloads and reconstructs blocks from a DSN segment, by concurrently downloading its pieces. pub(super) async fn download_and_reconstruct_blocks( segment_index: SegmentIndex, piece_getter: &PG, diff --git a/shared/subspace-data-retrieval/Cargo.toml b/shared/subspace-data-retrieval/Cargo.toml index 5cf2d86348..442bdff8e7 100644 --- a/shared/subspace-data-retrieval/Cargo.toml +++ b/shared/subspace-data-retrieval/Cargo.toml @@ -12,12 +12,15 @@ include = [ ] [dependencies] +async-lock = "3.3.0" async-trait = "0.1.83" +futures = "0.3.29" parity-scale-codec = { version = "3.6.12", features = ["derive"] } subspace-archiving = { version = "0.1.0", path = "../../crates/subspace-archiving" } subspace-core-primitives = { version = "0.1.0", path = "../../crates/subspace-core-primitives" } subspace-erasure-coding = { version = "0.1.0", path = "../../crates/subspace-erasure-coding" } thiserror = "1.0.63" +tokio = { version = "1.39.2", features = ["sync"] } tracing = "0.1.40" [dev-dependencies] diff --git a/shared/subspace-data-retrieval/src/lib.rs b/shared/subspace-data-retrieval/src/lib.rs index f552571a58..534f2d2adf 100644 --- a/shared/subspace-data-retrieval/src/lib.rs +++ b/shared/subspace-data-retrieval/src/lib.rs @@ -17,4 +17,5 @@ pub mod object_fetcher; pub mod piece_fetcher; +pub mod piece_getter; pub mod segment_fetcher; diff --git a/shared/subspace-data-retrieval/src/object_fetcher.rs b/shared/subspace-data-retrieval/src/object_fetcher.rs index 34a79556a2..26b67d95fa 100644 --- a/shared/subspace-data-retrieval/src/object_fetcher.rs +++ b/shared/subspace-data-retrieval/src/object_fetcher.rs @@ -15,14 +15,16 @@ //! Fetching objects stored in the archived history of Subspace Network. -use async_trait::async_trait; +use crate::piece_fetcher::download_pieces; +use crate::piece_getter::{BoxError, ObjectPieceGetter}; +use crate::segment_fetcher::{download_segment, SegmentGetterError}; use parity_scale_codec::{Compact, CompactLen, Decode, Encode}; -use std::fmt; use std::sync::Arc; -use subspace_archiving::archiver::{NewArchivedSegment, Segment, SegmentItem}; +use subspace_archiving::archiver::{Segment, SegmentItem}; use subspace_core_primitives::{ Piece, PieceIndex, RawRecord, RecordedHistorySegment, SegmentIndex, }; +use subspace_erasure_coding::ErasureCoding; use tracing::{debug, trace}; /// Object fetching errors. @@ -35,6 +37,13 @@ pub enum Error { piece_offset: u32, }, + /// Supplied piece offset is too large + #[error("Piece offset {piece_offset} is too large, must be less than {}, piece index: {piece_index}", RawRecord::SIZE)] + PieceOffsetTooLarge { + piece_index: PieceIndex, + piece_offset: u32, + }, + /// No item in segment at offset #[error("Offset {offset_in_segment} in segment {segment_index} is not an item, current progress: {progress}, object: {piece_index:?}, {piece_offset}")] NoSegmentItem { @@ -112,119 +121,15 @@ pub enum Error { }, /// Piece getter error - #[error("Getting piece failed temporarily: {source:?}")] - PieceGetterTemporary { + #[error("Getting piece caused an error: {source:?}")] + PieceGetterError { #[from] - source: PieceGetterError, + source: BoxError, }, - /// Piece getter custom error type - #[error("Getting piece failed permanently: {source:?}")] - PieceGetterPermanent { source: BoxError }, -} - -/// Segment getter errors. -#[derive(Debug, thiserror::Error)] -pub enum SegmentGetterError { - /// Segment not found - #[error("Segment index {segment_index} is not available")] - NotFound { segment_index: PieceIndex }, - - /// Segment decoding error - #[error("Segment data decoding error: {source:?}")] - SegmentDecoding { - #[from] - source: parity_scale_codec::Error, - }, - - /// Piece getter error - #[error("Getting piece failed: {source:?}")] - PieceGetter { - #[from] - source: PieceGetterError, - }, -} - -/// Piece getter errors. -#[derive(Debug, thiserror::Error)] -pub enum PieceGetterError { - /// Piece not found - #[error("Piece index {piece_index} is not available from this provider")] - NotFound { piece_index: PieceIndex }, - - /// Piece decoding error - #[error("Piece data decoding error: {source:?}")] - PieceDecoding { - #[from] - source: parity_scale_codec::Error, - }, -} - -/// A type-erased error -pub type BoxError = Box; - -/// Something that can be used to get decoded pieces by index -#[async_trait] -pub trait ObjectPieceGetter: fmt::Debug { - /// Get piece by index. - /// - /// Returns `Ok(None)` for temporary errors: the piece is not found, but immediately retrying - /// this provider might return it. - /// Returns `Err(_)` for permanent errors: this provider can't provide the piece at this time, - /// and another provider should be attempted. - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError>; -} - -#[async_trait] -impl ObjectPieceGetter for Arc -where - PG: ObjectPieceGetter + Send + Sync + ?Sized, -{ - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { - self.as_ref().get_piece(piece_index).await - } -} - -// Convenience methods, mainly used in testing -#[async_trait] -impl ObjectPieceGetter for NewArchivedSegment { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { - if piece_index.segment_index() == self.segment_header.segment_index() { - return Ok(Some( - self.pieces - .pieces() - .nth(piece_index.position() as usize) - .expect("checked segment index in if; piece must be present; qed"), - )); - } - - Err(PieceGetterError::NotFound { piece_index }.into()) - } -} - -#[async_trait] -impl ObjectPieceGetter for (PieceIndex, Piece) { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { - if self.0 == piece_index { - return Ok(Some(self.1.clone())); - } - - Err(PieceGetterError::NotFound { piece_index }.into()) - } -} - -// TODO: impl for IntoIterator instead? -#[async_trait] -impl ObjectPieceGetter for Vec<(PieceIndex, Piece)> { - async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { - for (index, piece) in self.iter() { - if *index == piece_index { - return Ok(Some(piece.clone())); - } - } - - Err(PieceGetterError::NotFound { piece_index }.into()) - } + /// Piece getter couldn't find the piece + #[error("Piece {piece_index:?} was not found by piece getter")] + PieceGetterNotFound { piece_index: PieceIndex }, } /// Object fetcher for the Subspace DSN. @@ -232,21 +137,29 @@ pub struct ObjectFetcher { /// The piece getter used to fetch pieces. piece_getter: Arc, + /// The erasure coding configuration of those pieces. + erasure_coding: ErasureCoding, + /// The maximum number of data bytes we'll read for a single object. max_object_len: usize, } impl ObjectFetcher { - /// Create a new object fetcher with the given piece getter. + /// Create a new object fetcher with the given configuration. /// /// `max_object_len` is the amount of data bytes we'll read for a single object before giving /// up and returning an error, or `None` for no limit (`usize::MAX`). - pub fn new(piece_getter: PG, max_object_len: Option) -> Self + pub fn new( + piece_getter: PG, + erasure_coding: ErasureCoding, + max_object_len: Option, + ) -> Self where PG: ObjectPieceGetter + Send + Sync + 'static, { Self { piece_getter: Arc::new(piece_getter), + erasure_coding, max_object_len: max_object_len.unwrap_or(usize::MAX), } } @@ -260,6 +173,7 @@ impl ObjectFetcher { piece_index: PieceIndex, piece_offset: u32, ) -> Result, Error> { + // Validate parameters if !piece_index.is_source() { tracing::debug!( %piece_index, @@ -274,6 +188,20 @@ impl ObjectFetcher { }); } + if piece_offset >= RawRecord::SIZE as u32 { + tracing::debug!( + %piece_index, + piece_offset, + RawRecord_SIZE = RawRecord::SIZE, + "Invalid piece offset for object: must be less than the size of a raw record", + ); + + return Err(Error::PieceOffsetTooLarge { + piece_index, + piece_offset, + }); + } + // Try fast object assembling from individual pieces if let Some(data) = self.fetch_object_fast(piece_index, piece_offset).await? { tracing::debug!( @@ -317,6 +245,14 @@ impl ObjectFetcher { let last_data_piece_in_segment = piece_position_in_segment >= data_shards - 1; if last_data_piece_in_segment && !before_last_two_bytes { + trace!( + piece_position_in_segment, + %piece_index, + piece_offset, + "Fast object retrieval not possible: last source piece in segment, \ + and start of object length bytes is in potential segment padding", + ); + // Fast retrieval possibility is not guaranteed return Ok(None); } @@ -354,6 +290,15 @@ impl ObjectFetcher { } else if !last_data_piece_in_segment { // Need the next piece to read the length of data, but we can only use it if there was // no segment padding + trace!( + %next_source_piece_index, + piece_position_in_segment, + bytes_available_in_segment, + %piece_index, + piece_offset, + "Part of object length bytes is in next piece, fetching", + ); + let piece = self .read_piece(next_source_piece_index, piece_index, piece_offset) .await?; @@ -367,12 +312,31 @@ impl ObjectFetcher { )? .expect("Extra RawRecord is larger than the length encoding; qed") } else { + trace!( + piece_position_in_segment, + bytes_available_in_segment, + %piece_index, + piece_offset, + "Fast object retrieval not possible: last source piece in segment, \ + and part of object length bytes is in potential segment padding", + ); + // Super fast read is not possible, because we removed potential segment padding, so // the piece bytes are not guaranteed to be continuous return Ok(None); }; if data_length > bytes_available_in_segment as usize { + trace!( + data_length, + bytes_available_in_segment, + piece_position_in_segment, + %piece_index, + piece_offset, + "Fast object retrieval not possible: part of object data bytes is in \ + potential segment padding", + ); + // Not enough data without crossing segment boundary return Ok(None); } @@ -382,16 +346,20 @@ impl ObjectFetcher { drop(read_records_data); // Read more pieces until we have enough data - let remaining_piece_count = (data_length as usize - data.len()) / RawRecord::SIZE; - let remaining_piece_indexes = (next_source_piece_index..) - .filter(|i| i.is_source()) - .take(remaining_piece_count); - self.read_pieces(remaining_piece_indexes, piece_index, piece_offset) - .await? - .into_iter() - .for_each(|piece| { - data.extend(piece.record().to_raw_record_chunks().flatten().copied()) - }); + if data_length as usize > data.len() { + let remaining_piece_count = + (data_length as usize - data.len()).div_ceil(RawRecord::SIZE); + let remaining_piece_indexes = (next_source_piece_index..) + .filter(|i| i.is_source()) + .take(remaining_piece_count) + .collect::>(); + self.read_pieces(&remaining_piece_indexes) + .await? + .into_iter() + .for_each(|piece| { + data.extend(piece.record().to_raw_record_chunks().flatten().copied()) + }); + } // Decode the data, and return it if it's valid let data = Vec::::decode(&mut data.as_slice())?; @@ -406,16 +374,23 @@ impl ObjectFetcher { piece_index: PieceIndex, piece_offset: u32, ) -> Result, Error> { - let segment_index = piece_index.segment_index(); + let mut segment_index = piece_index.segment_index(); let piece_position_in_segment = piece_index.position(); // Used to access the data after it is converted to raw bytes let offset_in_segment = piece_position_in_segment as usize * RawRecord::SIZE + piece_offset as usize; + tracing::trace!( + %segment_index, + offset_in_segment, + piece_position_in_segment, + %piece_index, + piece_offset, + "Fetching object from segment(s)", + ); + let mut data = { - let Segment::V0 { items } = self - .read_segment(segment_index, piece_index, piece_offset) - .await?; + let Segment::V0 { items } = self.read_segment(segment_index).await?; // Go through the segment until we reach the offset. // Unconditional progress is enum variant + compact encoding of number of elements let mut progress = 1 + Compact::compact_len(&(items.len() as u64)); @@ -448,14 +423,25 @@ impl ObjectFetcher { } })?; + tracing::trace!( + progress, + %segment_index, + offset_in_segment, + piece_position_in_segment, + %piece_index, + piece_offset, + segment_item = format!("{segment_item:?}").chars().take(50).collect::(), + "Found item at offset in first segment", + ); + // Look at the item after the offset, collecting block bytes match segment_item { SegmentItem::Block { bytes, .. } | SegmentItem::BlockStart { bytes, .. } | SegmentItem::BlockContinuation { bytes, .. } => { - // Rewind back progress to the beginning of the number of bytes + // Rewind back progress to the beginning of this item progress -= bytes.len(); - // Get a chunk of the bytes starting at the position we care about + // Get a chunk of the bytes starting at the offset in this item Vec::from(&bytes[offset_in_segment - progress..]) } segment_item @ SegmentItem::Padding @@ -468,7 +454,7 @@ impl ObjectFetcher { %segment_index, %piece_index, piece_offset, - ?segment_item, + segment_item = format!("{segment_item:?}").chars().take(50).collect::(), "Unexpected segment item in first segment", ); @@ -484,6 +470,16 @@ impl ObjectFetcher { } }; + tracing::trace!( + %segment_index, + offset_in_segment, + piece_position_in_segment, + %piece_index, + piece_offset, + data_len = data.len(), + "Got data at offset in first segment", + ); + // Return an error if the length is unreasonably large, before we get the next segment if let Some(data_length) = self.decode_data_length(data.as_slice(), piece_index, piece_offset)? @@ -499,9 +495,8 @@ impl ObjectFetcher { // We don't attempt to calculate the number of segments needed, because it involves // headers and optional padding. loop { - let Segment::V0 { items } = self - .read_segment(segment_index + SegmentIndex::ONE, piece_index, piece_offset) - .await?; + segment_index += SegmentIndex::ONE; + let Segment::V0 { items } = self.read_segment(segment_index).await?; for segment_item in items { match segment_item { SegmentItem::BlockContinuation { bytes, .. } => { @@ -528,7 +523,7 @@ impl ObjectFetcher { %segment_index, %piece_index, piece_offset, - ?segment_item, + segment_item = format!("{segment_item:?}").chars().take(50).collect::(), "Unexpected segment item in continuing segment", ); @@ -546,29 +541,20 @@ impl ObjectFetcher { } /// Read the whole segment by its index (just records, skipping witnesses). - /// - /// The mapping piece index and offset are only used for error reporting. - // TODO: replace with a refactored subspace-service::sync_from_dsn::import_blocks::download_and_reconstruct_blocks() - async fn read_segment( - &self, - _segment_index: SegmentIndex, - _mapping_piece_index: PieceIndex, - _mapping_piece_offset: u32, - ) -> Result { - unimplemented!("read_segment will be implemented as part of a refactoring") + async fn read_segment(&self, segment_index: SegmentIndex) -> Result { + Ok(download_segment( + segment_index, + &self.piece_getter, + self.erasure_coding.clone(), + ) + .await?) } - /// Concurrently read multiple pieces by their indexes - /// - /// The mapping piece index and offset are only used for error reporting. - // TODO: replace with a refactored method that fetches pieces - async fn read_pieces( - &self, - _piece_indexes: impl IntoIterator, - _mapping_piece_index: PieceIndex, - _mapping_piece_offset: u32, - ) -> Result, Error> { - unimplemented!("read_pieces will be implemented as part of a refactoring") + /// Concurrently read multiple pieces, and return them in the supplied order. + async fn read_pieces(&self, piece_indexes: &[PieceIndex]) -> Result, Error> { + download_pieces(piece_indexes, &self.piece_getter) + .await + .map_err(|source| Error::PieceGetterError { source }) } /// Read and return a single piece. @@ -584,16 +570,14 @@ impl ObjectFetcher { .piece_getter .get_piece(piece_index) .await - .map_err(|source| { + .inspect_err(|source| { debug!( %piece_index, error = ?source, %mapping_piece_index, mapping_piece_offset, - "Permanent error fetching piece during object assembling" + "Error fetching piece during object assembling" ); - - Error::PieceGetterPermanent { source } })?; if let Some(piece) = piece { @@ -610,10 +594,10 @@ impl ObjectFetcher { %piece_index, %mapping_piece_index, mapping_piece_offset, - "Temporary error fetching piece during object assembling" + "Piece not found during object assembling" ); - Err(PieceGetterError::NotFound { + Err(Error::PieceGetterNotFound { piece_index: mapping_piece_index, })? } diff --git a/shared/subspace-data-retrieval/src/piece_fetcher.rs b/shared/subspace-data-retrieval/src/piece_fetcher.rs index bf1f4517bf..acd9829cd3 100644 --- a/shared/subspace-data-retrieval/src/piece_fetcher.rs +++ b/shared/subspace-data-retrieval/src/piece_fetcher.rs @@ -14,5 +14,74 @@ // limitations under the License. //! Fetching pieces of the archived history of Subspace Network. -//! -//! TODO: move piece fetching here + +use crate::object_fetcher::Error; +use crate::piece_getter::{BoxError, ObjectPieceGetter}; +use futures::stream::FuturesOrdered; +use futures::TryStreamExt; +use subspace_core_primitives::{Piece, PieceIndex}; +use tracing::{debug, trace}; + +/// Concurrently downloads the exact pieces in `piece_indexes`, returning them in that order. +/// Each piece index must be unique. +/// +/// If any piece can't be downloaded, returns an error. +// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks(): +// +pub async fn download_pieces( + piece_indexes: &[PieceIndex], + piece_getter: &PG, +) -> Result, BoxError> +where + PG: ObjectPieceGetter, +{ + debug!( + count = piece_indexes.len(), + ?piece_indexes, + "Retrieving exact pieces" + ); + + // TODO: + // - consider using a semaphore to limit the number of concurrent requests, like + // download_segment_pieces() + // - if we're close to the number of pieces in a segment, use segment downloading and piece + // reconstruction instead + // Currently most objects are limited to 4 pieces, so this isn't needed yet. + let received_pieces = piece_indexes + .iter() + .map(|piece_index| async move { + match piece_getter.get_piece(*piece_index).await { + Ok(Some(piece)) => { + trace!(?piece_index, "Piece request succeeded",); + Ok(piece) + } + Ok(None) => { + trace!(?piece_index, "Piece not found"); + Err(Error::PieceGetterNotFound { + piece_index: *piece_index, + } + .into()) + } + Err(error) => { + trace!( + %error, + ?piece_index, + "Piece request caused an error", + ); + Err(error) + } + } + }) + .collect::>(); + + // We want exact pieces, so any errors are fatal. + let received_pieces: Vec = received_pieces.try_collect().await?; + + trace!( + count = piece_indexes.len(), + ?piece_indexes, + "Successfully retrieved exact pieces" + ); + + Ok(received_pieces) +} diff --git a/shared/subspace-data-retrieval/src/piece_getter.rs b/shared/subspace-data-retrieval/src/piece_getter.rs new file mode 100644 index 0000000000..7f0fa894ba --- /dev/null +++ b/shared/subspace-data-retrieval/src/piece_getter.rs @@ -0,0 +1,74 @@ +// Copyright (C) 2024 Subspace Labs, Inc. +// SPDX-License-Identifier: Apache-2.0 + +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Getting object pieces from the Subspace Distributed Storage Network, or various caches. + +use async_trait::async_trait; +use std::fmt; +use std::sync::Arc; +use subspace_archiving::archiver::NewArchivedSegment; +use subspace_core_primitives::{Piece, PieceIndex}; + +/// A type-erased error +pub type BoxError = Box; + +/// Trait representing a way to get pieces from the DSN for object reconstruction +// TODO: make ObjectPieceGetter impls retry before failing, if that is useful +#[async_trait] +pub trait ObjectPieceGetter: fmt::Debug { + /// Get piece by index. + /// + /// Returns `Ok(None)` if the piece is not found. + /// Returns `Err(_)` if trying to get the piece caused an error. + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError>; +} + +#[async_trait] +impl ObjectPieceGetter for Arc +where + T: ObjectPieceGetter + Send + Sync + ?Sized, +{ + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + self.as_ref().get_piece(piece_index).await + } +} + +// Convenience methods, mainly used in testing +#[async_trait] +impl ObjectPieceGetter for NewArchivedSegment { + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + if piece_index.segment_index() == self.segment_header.segment_index() { + return Ok(Some( + self.pieces + .pieces() + .nth(piece_index.position() as usize) + .expect("Piece position always exists in a segment; qed"), + )); + } + + Ok(None) + } +} + +#[async_trait] +impl ObjectPieceGetter for (PieceIndex, Piece) { + async fn get_piece(&self, piece_index: PieceIndex) -> Result, BoxError> { + if self.0 == piece_index { + return Ok(Some(self.1.clone())); + } + + Ok(None) + } +} diff --git a/shared/subspace-data-retrieval/src/segment_fetcher.rs b/shared/subspace-data-retrieval/src/segment_fetcher.rs index 600a84738f..99b28aa2ec 100644 --- a/shared/subspace-data-retrieval/src/segment_fetcher.rs +++ b/shared/subspace-data-retrieval/src/segment_fetcher.rs @@ -14,5 +14,156 @@ // limitations under the License. //! Fetching segments of the archived history of Subspace Network. -//! -//! TODO: move segment fetching here + +use crate::piece_getter::ObjectPieceGetter; +use async_lock::Semaphore; +use futures::stream::FuturesUnordered; +use futures::StreamExt; +use subspace_archiving::archiver::Segment; +use subspace_archiving::reconstructor::{Reconstructor, ReconstructorError}; +use subspace_core_primitives::{ + ArchivedHistorySegment, Piece, RecordedHistorySegment, SegmentIndex, +}; +use subspace_erasure_coding::ErasureCoding; +use tokio::task::spawn_blocking; +use tracing::{debug, trace}; + +/// Segment getter errors. +#[derive(Debug, thiserror::Error)] +pub enum SegmentGetterError { + /// Piece getter error + #[error("Failed to get enough segment pieces")] + PieceGetter { segment_index: SegmentIndex }, + + /// Segment reconstruction error + #[error("Segment reconstruction error: {source:?}")] + SegmentReconstruction { + #[from] + source: ReconstructorError, + }, + + /// Segment decoding error + #[error("Segment data decoding error: {source:?}")] + SegmentDecoding { + #[from] + source: parity_scale_codec::Error, + }, +} + +/// Concurrently downloads the pieces for `segment_index`, and reconstructs the segment. +pub async fn download_segment( + segment_index: SegmentIndex, + piece_getter: &PG, + erasure_coding: ErasureCoding, +) -> Result +where + PG: ObjectPieceGetter, +{ + let reconstructor = Reconstructor::new(erasure_coding); + + let segment_pieces = download_segment_pieces(segment_index, piece_getter).await?; + + let segment = spawn_blocking(move || reconstructor.reconstruct_segment(&segment_pieces)) + .await + .expect("Panic if blocking task panicked")?; + + Ok(segment) +} + +/// Concurrently downloads the pieces for `segment_index`. +// This code was copied and modified from subspace_service::sync_from_dsn::download_and_reconstruct_blocks(): +// +// +// TODO: pass a lower concurrency limit into this function, to avoid overwhelming residential routers or slow connections +pub async fn download_segment_pieces( + segment_index: SegmentIndex, + piece_getter: &PG, +) -> Result>, SegmentGetterError> +where + PG: ObjectPieceGetter, +{ + debug!(%segment_index, "Retrieving pieces of the segment"); + + let semaphore = &Semaphore::new(RecordedHistorySegment::NUM_RAW_RECORDS); + + let mut received_segment_pieces = segment_index + .segment_piece_indexes_source_first() + .into_iter() + .map(|piece_index| { + // Source pieces will acquire permit here right away + let maybe_permit = semaphore.try_acquire(); + + async move { + let permit = match maybe_permit { + Some(permit) => permit, + None => { + // Other pieces will acquire permit here instead + semaphore.acquire().await + } + }; + let piece = match piece_getter.get_piece(piece_index).await { + Ok(Some(piece)) => piece, + Ok(None) => { + trace!(?piece_index, "Piece not found"); + return None; + } + Err(error) => { + trace!( + %error, + ?piece_index, + "Piece request failed", + ); + return None; + } + }; + + trace!(?piece_index, "Piece request succeeded"); + + // Piece was received successfully, "remove" this slot from semaphore + permit.forget(); + Some((piece_index, piece)) + } + }) + .collect::>(); + + let mut segment_pieces = vec![None::; ArchivedHistorySegment::NUM_PIECES]; + let mut pieces_received = 0; + + while let Some(maybe_piece) = received_segment_pieces.next().await { + let Some((piece_index, piece)) = maybe_piece else { + continue; + }; + + segment_pieces + .get_mut(piece_index.position() as usize) + .expect("Piece position is by definition within segment; qed") + .replace(piece); + + pieces_received += 1; + + if pieces_received >= RecordedHistorySegment::NUM_RAW_RECORDS { + trace!(%segment_index, "Received half of the segment."); + break; + } + } + + if pieces_received < RecordedHistorySegment::NUM_RAW_RECORDS { + debug!( + %segment_index, + pieces_received, + pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS, + "Failed to get half of the pieces in the segment" + ); + + Err(SegmentGetterError::PieceGetter { segment_index }) + } else { + trace!( + %segment_index, + pieces_received, + pieces_needed = RecordedHistorySegment::NUM_RAW_RECORDS, + "Successfully retrieved enough pieces of the segment" + ); + + Ok(segment_pieces) + } +}