Skip to content

Commit

Permalink
Merge pull request #1768 from subspace/providers-cleanup
Browse files Browse the repository at this point in the history
Providers cleanup
  • Loading branch information
nazar-pc authored Aug 7, 2023
2 parents f0aee46 + 172e137 commit 8c078b1
Show file tree
Hide file tree
Showing 19 changed files with 129 additions and 1,291 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,16 @@ use subspace_core_primitives::SegmentIndex;
use subspace_farmer::piece_cache::PieceCache;
use subspace_farmer::utils::archival_storage_info::ArchivalStorageInfo;
use subspace_farmer::utils::archival_storage_pieces::ArchivalStoragePieces;
use subspace_farmer::utils::farmer_provider_storage::FarmerProviderStorage;
use subspace_farmer::utils::readers_and_pieces::ReadersAndPieces;
use subspace_farmer::{NodeClient, NodeRpcClient};
use subspace_networking::libp2p::identity::Keypair;
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::multiaddr::Protocol;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{
create, peer_id, Config, NetworkingParametersManager, Node, NodeRunner,
ParityDbProviderStorage, PeerInfo, PeerInfoProvider, PieceByHashRequest,
PieceByHashRequestHandler, PieceByHashResponse, SegmentHeaderBySegmentIndexesRequestHandler,
SegmentHeaderRequest, SegmentHeaderResponse,
create, Config, NetworkingParametersManager, Node, NodeRunner, PeerInfo, PeerInfoProvider,
PieceByHashRequest, PieceByHashRequestHandler, PieceByHashResponse,
SegmentHeaderBySegmentIndexesRequestHandler, SegmentHeaderRequest, SegmentHeaderResponse,
};
use subspace_rpc_primitives::MAX_SEGMENT_HEADERS_PER_REQUEST;
use tracing::{debug, error, info, Instrument};
Expand All @@ -37,7 +35,6 @@ pub(super) fn configure_dsn(
DsnArgs {
listen_on,
bootstrap_nodes,
provided_keys_limit,
enable_private_ips,
reserved_peers,
in_connections,
Expand All @@ -52,32 +49,13 @@ pub(super) fn configure_dsn(
archival_storage_pieces: ArchivalStoragePieces,
archival_storage_info: ArchivalStorageInfo,
piece_cache: PieceCache,
) -> Result<(Node, NodeRunner<FarmerProviderStorage>), anyhow::Error> {
let peer_id = peer_id(&keypair);

) -> Result<(Node, NodeRunner<PieceCache>), anyhow::Error> {
let networking_parameters_registry = {
let known_addresses_db_path = base_path.join("known_addresses_db");

NetworkingParametersManager::new(&known_addresses_db_path).map(|manager| manager.boxed())?
};

let provider_db_path = base_path.join("providers_db");

info!(
db_path = ?provider_db_path,
keys_limit = ?provided_keys_limit,
"Initializing provider storage..."
);
let persistent_provider_storage =
ParityDbProviderStorage::new(&provider_db_path, provided_keys_limit, peer_id)
.map_err(|err| anyhow::anyhow!(err.to_string()))?;
info!(
current_size = ?persistent_provider_storage.size(),
"Provider storage initialized successfully"
);

let farmer_provider_storage = FarmerProviderStorage::new(peer_id, piece_cache.clone());

// TODO: Consider introducing and using global in-memory segment header cache (this comment is
// in multiple files)
let last_archived_segment_index = Arc::new(AtomicU64::default());
Expand Down Expand Up @@ -115,7 +93,7 @@ pub(super) fn configure_dsn(
let default_config = Config::new(
protocol_prefix,
keypair,
farmer_provider_storage.clone(),
piece_cache.clone(),
Some(PeerInfoProvider::new_farmer(Box::new(
archival_storage_pieces,
))),
Expand Down
5 changes: 2 additions & 3 deletions crates/subspace-farmer/src/bin/subspace-farmer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,6 @@ struct DsnArgs {
/// multiple are supported.
#[arg(long, default_value = "/ip4/0.0.0.0/tcp/30533")]
listen_on: Vec<Multiaddr>,
/// Number of provided keys (by other peers) that will be stored.
#[arg(long, default_value = "655360")]
provided_keys_limit: NonZeroUsize,
/// Determines whether we allow keeping non-global (private, shared, loopback..) addresses in Kademlia DHT.
#[arg(long, default_value_t = false)]
enable_private_ips: bool,
Expand Down Expand Up @@ -301,6 +298,8 @@ async fn main() -> anyhow::Result<()> {
// TODO: Remove this in the future once `base_path` can be removed
// Wipe legacy caching directory that is no longer used
let _ = fs::remove_file(base_path.join("piece_cache_db"));
// TODO: Remove this in the future after enough upgrade time that this no longer exist
let _ = fs::remove_file(base_path.join("providers_db"));

let disk_farms = if command.farm.is_empty() {
if !base_path.exists() {
Expand Down
39 changes: 25 additions & 14 deletions crates/subspace-farmer/src/piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ use std::mem;
use std::sync::Arc;
use subspace_core_primitives::{Piece, PieceIndex, SegmentIndex};
use subspace_farmer_components::plotting::{PieceGetter, PieceGetterRetryPolicy};
use subspace_networking::libp2p::kad::RecordKey;
use subspace_networking::libp2p::kad::{ProviderRecord, RecordKey};
use subspace_networking::libp2p::PeerId;
use subspace_networking::utils::multihash::ToMultihash;
use subspace_networking::{KeyWrapper, UniqueRecordBinaryHeap};
use subspace_networking::{KeyWrapper, LocalRecordProvider, UniqueRecordBinaryHeap};
use tokio::sync::mpsc;
use tracing::{debug, error, info, trace, warn};

Expand Down Expand Up @@ -559,6 +559,7 @@ where
/// Piece cache that aggregates caches of multiple disks
#[derive(Debug, Clone)]
pub struct PieceCache {
peer_id: PeerId,
/// Individual disk caches where pieces are stored
caches: Arc<RwLock<Vec<DiskPieceCacheState>>>,
// We do not want to increase capacity unnecessarily on clone
Expand All @@ -578,6 +579,7 @@ impl PieceCache {
let (worker_sender, worker_receiver) = mpsc::channel(WORKER_CHANNEL_CAPACITY);

let instance = Self {
peer_id,
caches: Arc::clone(&caches),
worker_sender,
};
Expand All @@ -591,18 +593,6 @@ impl PieceCache {
(instance, worker)
}

/// Check whether piece exists in cache
pub fn contains_piece(&self, key: RecordKey) -> bool {
// It is okay to take read lock here, writes locks are very infrequent and very short
for cache in self.caches.read().iter() {
if cache.stored_pieces.contains_key(&key) {
return true;
};
}

false
}

/// Get piece from cache
pub async fn get_piece(&self, key: RecordKey) -> Option<Piece> {
let caches = Arc::clone(&self.caches);
Expand Down Expand Up @@ -663,3 +653,24 @@ impl PieceCache {
}
}
}

impl LocalRecordProvider for PieceCache {
fn record(&self, key: &RecordKey) -> Option<ProviderRecord> {
// It is okay to take read lock here, writes locks are very infrequent and very short
for cache in self.caches.read().iter() {
if cache.stored_pieces.contains_key(key) {
// Note: We store our own provider records locally without local addresses
// to avoid redundant storage and outdated addresses. Instead these are
// acquired on demand when returning a `ProviderRecord` for the local node.
return Some(ProviderRecord {
key: key.clone(),
provider: self.peer_id,
expires: None,
addresses: Vec::new(),
});
};
}

None
}
}
1 change: 0 additions & 1 deletion crates/subspace-farmer/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod archival_storage_info;
pub mod archival_storage_pieces;
pub mod farmer_piece_getter;
pub mod farmer_provider_storage;
pub mod piece_validator;
pub mod readers_and_pieces;
#[cfg(test)]
Expand Down
62 changes: 0 additions & 62 deletions crates/subspace-farmer/src/utils/farmer_provider_storage.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/subspace-networking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ anyhow = "1.0.71"
async-trait = "0.1.68"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
bytes = "1.4.0"
bytesize = "1.2.0"
chrono = {version = "0.4.26", features = ["clock", "serde", "std",]}
clap = { version = "4.2.1", features = ["color", "derive"] }
derive_more = "0.99.17"
Expand Down
81 changes: 0 additions & 81 deletions crates/subspace-networking/examples/custom-store.rs

This file was deleted.

1 change: 0 additions & 1 deletion crates/subspace-networking/src/behavior.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub(crate) mod persistent_parameters;
pub(crate) mod provider_storage;
#[cfg(test)]
mod tests;

Expand Down
34 changes: 0 additions & 34 deletions crates/subspace-networking/src/behavior/provider_storage.rs

This file was deleted.

Loading

0 comments on commit 8c078b1

Please sign in to comment.