Skip to content

Commit

Permalink
multi-shard trie test (#174)
Browse files Browse the repository at this point in the history
  • Loading branch information
suurkivi authored Dec 18, 2024
1 parent 35d5878 commit 0e2f2d0
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 26 deletions.
7 changes: 5 additions & 2 deletions src/perf/gen_multi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,16 @@ pub struct MultiUser {
initialized_fids: HashSet<u64>,
private_key: SigningKey,
thread_id: u32,
users_per_shard: u32,
}

impl MultiUser {
pub fn new(thread_id: u32) -> Self {
pub fn new(thread_id: u32, users_per_shard: u32) -> Self {
Self {
initialized_fids: HashSet::new(),
private_key: test_helper::default_signer(),
thread_id,
users_per_shard,
}
}
}
Expand All @@ -27,7 +29,8 @@ impl MessageGenerator for MultiUser {
fn next(&mut self, seq: u64) -> Vec<NextMessage> {
let mut rng = rand::thread_rng();

let fid: u64 = rng.gen_range(1..=5000) + 1_000_000 * self.thread_id as u64;
let fid: u64 =
rng.gen_range(1..=self.users_per_shard as u64) + 1_000_000 * self.thread_id as u64;
let mut messages = Vec::new();

// If the FID has not been initialized, return initial messages
Expand Down
12 changes: 10 additions & 2 deletions src/perf/generate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,17 @@ pub enum GeneratorTypes {
MultiUser,
}

pub fn new_generator(typ: GeneratorTypes, thread_id: u32) -> Box<dyn MessageGenerator> {
pub struct Config {
pub users_per_shard: u32,
}

pub fn new_generator(
typ: GeneratorTypes,
thread_id: u32,
cfg: Config,
) -> Box<dyn MessageGenerator> {
match typ {
GeneratorTypes::SingleUser => Box::new(SingleUser::new(thread_id)),
GeneratorTypes::MultiUser => Box::new(MultiUser::new(thread_id)),
GeneratorTypes::MultiUser => Box::new(MultiUser::new(thread_id, cfg.users_per_shard)),
}
}
8 changes: 7 additions & 1 deletion src/perf/perftest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,13 @@ fn start_submit_messages(
let gen_type = gen_type.clone();

submit_message_handles.push(tokio::spawn(async move {
let mut generator = new_generator(gen_type, thread_id);
let mut generator = new_generator(
gen_type,
thread_id,
generate::Config {
users_per_shard: 5000,
},
);

let mut submit_message_timer = time::interval(config.submit_message.interval);

Expand Down
109 changes: 88 additions & 21 deletions src/perf/trie_only_perftest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,40 +8,99 @@ use clap::Parser;
use std::collections::VecDeque;
use std::error::Error;
use std::net;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tempfile::TempDir;

#[derive(Parser, Debug)]
#[derive(Parser, Debug, Clone)]
struct Args {
#[arg(long)]
branching_factor: u32,

#[arg(long, default_value_t = 150)]
messages_per_block: u32,

#[arg(long, default_value_t = String::from("."))]
db_dir: String,

#[arg(long, default_value_t = 1)]
shard_count: u32,

#[arg(long, default_value_t = 1_000_000)]
users_per_shard: u32,
}

pub fn run() -> Result<(), Box<dyn Error>> {
let args = Args::parse();
let turbohash = 1;
let turbohash = 0;

let host = ("127.0.0.1", cadence::DEFAULT_PORT);
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
let sink = cadence::UdpMetricSink::from(host, socket)?;
let statsd_client = cadence::StatsdClient::builder("trietest", sink).build();
let statsd_client = statsd_wrapper::StatsdClientWrapper::new(statsd_client, true);
let statsd_client = Arc::new(statsd_wrapper::StatsdClientWrapper::new(
statsd_client,
true,
));

let dir = TempDir::new_in(&args.db_dir)?;
let base_path = dir.into_path();

let mut handles = Vec::new();

for shard_id in 0..args.shard_count {
let shard_statsd = statsd_client.clone();
let shard_args = args.clone();
let shard_turbohash = turbohash;
let shard_path = base_path.join(format!("shard_{}", shard_id));
std::fs::create_dir_all(&shard_path)?;

let handle = std::thread::spawn(move || {
if let Err(e) = run_shard(
shard_id,
&shard_args,
shard_statsd,
shard_turbohash,
&shard_path,
) {
eprintln!("Error in shard {}: {:?}", shard_id, e);
}
});
handles.push(handle);
}

let gen_type = GeneratorTypes::MultiUser;
let mut gen = new_generator(gen_type, 1);
// Join all shard threads
for handle in handles {
let _ = handle.join();
}

let dir = TempDir::new_in(".")?; // TODO: config, etc.
let db_path = dir.path().join("a.db");
let db_path = db_path.to_str().unwrap();
Ok(())
}

// TODO: branching factor metric
let mut t = merkle_trie::MerkleTrie::new(args.branching_factor)?;
let db = &db::RocksDB::new(db_path);
fn run_shard(
shard_id: u32,
args: &Args,
statsd_client: Arc<statsd_wrapper::StatsdClientWrapper>,
turbohash: u64,
shard_path: &std::path::Path,
) -> Result<(), Box<dyn Error>> {
let db_path = shard_path.join("db");
let db_path_str = db_path.to_str().unwrap();

let db = db::RocksDB::new(db_path_str);
db.open()?;
t.initialize(db)?;

let mut t = merkle_trie::MerkleTrie::new(args.branching_factor)?;
t.initialize(&db)?;

let gen_type = GeneratorTypes::MultiUser;
let mut gen = new_generator(
gen_type,
shard_id,
generate::Config {
users_per_shard: args.users_per_shard,
},
);

let mut seq = 0u64;
let mut message_queue: VecDeque<generate::NextMessage> = VecDeque::new();
Expand All @@ -51,26 +110,34 @@ pub fn run() -> Result<(), Box<dyn Error>> {

let sdc = statsd_client.clone();
let count_callback = move |read_count: (u64, u64)| {
sdc.count_with_shard(1, "engine.trie.db_get_count.total", read_count.0);
sdc.count_with_shard(1, "engine.trie.mem_get_count.total", read_count.1);
sdc.count_with_shard(shard_id, "engine.trie.db_get_count.total", read_count.0);
sdc.count_with_shard(shard_id, "engine.trie.mem_get_count.total", read_count.1);
};

let mut ctx = Context::with_callback(count_callback.clone());

loop {
if last_tick.elapsed() >= Duration::from_secs(1) {
let elapsed_secs = start_time.elapsed().as_secs();
last_tick = Instant::now();

let items = t.items()?;
println!("Seconds since start: {}, items={}", elapsed_secs, items);
statsd_client.gauge_with_shard(1, "engine.trie.num_items", items as u64);
println!(
"Shard {}: Seconds since start: {}, items={}",
shard_id, elapsed_secs, items
);
statsd_client.gauge_with_shard(shard_id, "engine.trie.num_items", items as u64);
statsd_client.gauge_with_shard(
1,
shard_id,
"engine.trie.branching_factor",
args.branching_factor as u64,
);
statsd_client.gauge_with_shard(1, "messages_per_block", args.branching_factor as u64);
statsd_client.gauge_with_shard(1, "turbohash", turbohash as u64);
statsd_client.gauge_with_shard(
shard_id,
"messages_per_block",
args.messages_per_block as u64,
);
statsd_client.gauge_with_shard(shard_id, "turbohash", turbohash);

ctx = Context::with_callback(count_callback.clone());
}
Expand All @@ -91,7 +158,7 @@ pub fn run() -> Result<(), Box<dyn Error>> {
let msg = msg.unwrap();
match msg {
generate::NextMessage::Message(message) => {
t.insert(&ctx, db, &mut txn_batch, vec![message.hash.clone()])?;
t.insert(&ctx, &db, &mut txn_batch, vec![message.hash.clone()])?;
}
generate::NextMessage::OnChainEvent(_) => {
// Ignore for now
Expand All @@ -100,6 +167,6 @@ pub fn run() -> Result<(), Box<dyn Error>> {
}

db.commit(txn_batch)?;
t.reload(db)?;
t.reload(&db)?;
}
}

0 comments on commit 0e2f2d0

Please sign in to comment.