Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed May 10, 2024
1 parent e127680 commit 31a98a9
Show file tree
Hide file tree
Showing 17 changed files with 5,212 additions and 7 deletions.
31 changes: 28 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/subspace-farmer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ include = [
[dependencies]
anyhow = "1.0.82"
async-lock = "3.3.0"
async-nats = "0.34.0"
async-nats = "0.35.0"
async-trait = "0.1.80"
backoff = { version = "0.4.0", features = ["futures", "tokio"] }
base58 = "0.2.0"
Expand Down
1 change: 1 addition & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
pub(crate) mod benchmark;
pub(crate) mod cluster;
pub(crate) mod farm;
mod info;
mod scrub;
Expand Down
140 changes: 140 additions & 0 deletions crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
mod cache;
mod controller;
mod farmer;
mod plotter;

use crate::commands::cluster::cache::{cache, CacheArgs};
use crate::commands::cluster::controller::{controller, ControllerArgs};
use crate::commands::cluster::farmer::{farmer, FarmerArgs};
use crate::commands::cluster::plotter::{plotter, PlotterArgs};
use crate::utils::shutdown_signal;
use anyhow::anyhow;
use async_nats::ServerAddr;
use backoff::ExponentialBackoff;
use clap::{Parser, Subcommand};
use futures::stream::FuturesUnordered;
use futures::{select, FutureExt, StreamExt};
use prometheus_client::registry::Registry;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::utils::AsyncJoinOnDrop;
use subspace_metrics::{start_prometheus_metrics_server, RegistryAdapter};
use subspace_proof_of_space::Table;

/// Arguments for cluster
#[derive(Debug, Parser)]
pub(crate) struct ClusterArgs {
/// Shared arguments for all subcommands
#[clap(flatten)]
shared_args: SharedArgs,
/// Cluster subcommands
#[clap(subcommand)]
subcommand: ClusterSubcommand,
}

/// Shared arguments
#[derive(Debug, Parser)]
struct SharedArgs {
/// NATS server address, typically in `nats://server1:port1` format, can be specified multiple
/// times.
///
/// NOTE: NATS must be configured for message sizes of 2MiB or larger (1MiB is the default),
/// which can be done by starting NATS server with config file containing `max_payload = 2MB`.
#[arg(long, alias = "nats-server", required = true)]
nats_servers: Vec<ServerAddr>,
/// Size of connection pool of NATS clients.
///
/// Pool size can be increased in case of large number of farms or high plotting capacity of
/// this instance.
#[arg(long, default_value = "16")]
nats_pool_size: NonZeroUsize,
/// Defines endpoints for the prometheus metrics server. It doesn't start without at least
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
}

/// Cluster subcommands
#[derive(Debug, Subcommand)]
enum ClusterSubcommand {
/// Farming cluster controller
Controller(ControllerArgs),
/// Farming cluster farmer
Farmer(FarmerArgs),
/// Farming cluster plotter
Plotter(PlotterArgs),
/// Farming cluster cache
Cache(CacheArgs),
}

pub(crate) async fn cluster<PosTable>(cluster_args: ClusterArgs) -> anyhow::Result<()>
where
PosTable: Table,
{
let signal = shutdown_signal();

let ClusterArgs {
shared_args,
subcommand,
} = cluster_args;
let SharedArgs {
nats_servers,
nats_pool_size,
prometheus_listen_on,
} = shared_args;

let nats_client = NatsClient::new(
nats_servers,
ExponentialBackoff {
max_elapsed_time: None,
..ExponentialBackoff::default()
},
nats_pool_size,
)
.await
.map_err(|error| anyhow!("Failed to connect to NATS server: {error}"))?;
let mut registry = Registry::default();

let mut tasks = FuturesUnordered::new();

// TODO: Support running multiple components at once
tasks.push(match subcommand {
ClusterSubcommand::Controller(controller_args) => {
controller(nats_client, &mut registry, controller_args).await?
}
ClusterSubcommand::Farmer(farmer_args) => {
farmer::<PosTable>(nats_client, &mut registry, farmer_args).await?
}
ClusterSubcommand::Plotter(plotter_args) => {
plotter::<PosTable>(nats_client, &mut registry, plotter_args).await?
}
ClusterSubcommand::Cache(cache_args) => {
cache(nats_client, &mut registry, cache_args).await?
}
});

if !prometheus_listen_on.is_empty() {
let prometheus_task = start_prometheus_metrics_server(
prometheus_listen_on,
RegistryAdapter::PrometheusClient(registry),
)?;

let join_handle = tokio::spawn(prometheus_task);
tasks.push(Box::pin(async move {
Ok(AsyncJoinOnDrop::new(join_handle, true).await??)
}));
}

select! {
// Signal future
_ = signal.fuse() => {
Ok(())
},

// Run future
result = tasks.next() => {
result.expect("List of tasks is not empty; qed")
},
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
use anyhow::anyhow;
use bytesize::ByteSize;
use clap::Parser;
use prometheus_client::registry::Registry;
use std::fs;
use std::future::Future;
use std::path::PathBuf;
use std::pin::Pin;
use std::str::FromStr;
use std::time::Duration;
use subspace_farmer::cluster::cache::cache_service;
use subspace_farmer::cluster::nats_client::NatsClient;
use subspace_farmer::piece_cache::PieceCache;

/// Interval between cache self-identification broadcast messages
pub(super) const CACHE_IDENTIFICATION_BROADCAST_INTERVAL: Duration = Duration::from_secs(5);

#[derive(Debug, Clone)]
struct DiskCache {
/// Path to directory where cache is stored
directory: PathBuf,
/// How much space in bytes can cache use
allocated_space: u64,
}

impl FromStr for DiskCache {
type Err = String;

#[inline]
fn from_str(s: &str) -> anyhow::Result<Self, Self::Err> {
let parts = s.split(',').collect::<Vec<_>>();
if parts.len() != 2 {
return Err("Must contain 2 coma-separated components".to_string());
}

let mut plot_directory = None;
let mut allocated_space = None;

for part in parts {
let part = part.splitn(2, '=').collect::<Vec<_>>();
if part.len() != 2 {
return Err("Each component must contain = separating key from value".to_string());
}

let key = *part.first().expect("Length checked above; qed");
let value = *part.get(1).expect("Length checked above; qed");

match key {
"path" => {
plot_directory.replace(PathBuf::from(value));
}
"size" => {
allocated_space.replace(
value
.parse::<ByteSize>()
.map_err(|error| {
format!("Failed to parse `size` \"{value}\": {error}")
})?
.as_u64(),
);
}
key => {
return Err(format!(
"Key \"{key}\" is not supported, only `path` or `size`"
));
}
}
}

Ok(DiskCache {
directory: plot_directory.ok_or(
"`path` key is required with path to directory where cache will be stored",
)?,
allocated_space: allocated_space
.ok_or("`size` key is required with allocated amount of disk space")?,
})
}
}

/// Arguments for cache
#[derive(Debug, Parser)]
pub(super) struct CacheArgs {
/// One or more caches located at specified path, each with its own allocated space.
///
/// Format for each cache is coma-separated list of strings like this:
///
/// path=/path/to/directory,size=5T
///
/// `size` is max allocated size in human-readable format (e.g. 10GB, 2TiB) or just bytes that
/// cache will make sure to not exceed (and will pre-allocated all the space on startup to
/// ensure it will not run out of space in runtime).
disk_caches: Vec<DiskCache>,
/// Run temporary cache with specified farm size in human-readable format (e.g. 10GB, 2TiB) or
/// just bytes (e.g. 4096), this will create a temporary directory that will be deleted at the
/// end of the process.
#[arg(long, conflicts_with = "disk_caches")]
tmp: Option<ByteSize>,
/// Cache group to use, the same cache group must be also specified on corresponding controller
#[arg(long, default_value = "default")]
cache_group: String,
}

pub(super) async fn cache(
nats_client: NatsClient,
_registry: &mut Registry,
cache_args: CacheArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>> {
let CacheArgs {
mut disk_caches,
tmp,
cache_group,
} = cache_args;

let tmp_directory = if let Some(plot_size) = tmp {
let tmp_directory = tempfile::Builder::new()
.prefix("subspace-cache-")
.tempdir()?;

disk_caches = vec![DiskCache {
directory: tmp_directory.as_ref().to_path_buf(),
allocated_space: plot_size.as_u64(),
}];

Some(tmp_directory)
} else {
if disk_caches.is_empty() {
return Err(anyhow!("There must be at least one disk cache provided"));
}

for cache in &disk_caches {
if !cache.directory.exists() {
if let Err(error) = fs::create_dir(&cache.directory) {
return Err(anyhow!(
"Directory {} doesn't exist and can't be created: {}",
cache.directory.display(),
error
));
}
}
}
None
};

// TODO: Metrics

let caches = disk_caches
.iter()
.map(|disk_cache| {
PieceCache::open(
&disk_cache.directory,
u32::try_from(disk_cache.allocated_space / PieceCache::element_size() as u64)
.unwrap_or(u32::MAX),
)
.map_err(|error| {
anyhow!(
"Failed to open piece cache at {}: {error}",
disk_cache.directory.display()
)
})
})
.collect::<Result<Vec<_>, _>>()?;

Ok(Box::pin(async move {
cache_service(
nats_client,
&caches,
&cache_group,
CACHE_IDENTIFICATION_BROADCAST_INTERVAL,
)
.await?;

drop(tmp_directory);

Ok(())
}))
}
Loading

0 comments on commit 31a98a9

Please sign in to comment.