From f1141e529319459e7c9257f72fa9cd24758c7391 Mon Sep 17 00:00:00 2001 From: upupnoah Date: Wed, 10 Jul 2024 08:14:18 +0700 Subject: [PATCH 1/4] feat(metrics): metrics v1 --- examples/metrics.rs | 61 ++++++++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ src/metrics.rs | 64 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 127 insertions(+) create mode 100644 examples/metrics.rs create mode 100644 src/metrics.rs diff --git a/examples/metrics.rs b/examples/metrics.rs new file mode 100644 index 0000000..4a851a1 --- /dev/null +++ b/examples/metrics.rs @@ -0,0 +1,61 @@ +use std::{thread, time::Duration}; + +use anyhow::Result; +use concurrency::Metrics; +use rand::Rng; + +const N: usize = 2; +const M: usize = 4; +fn main() -> Result<()> { + let metrics = Metrics::new(); + + // region: --- 单线程可用 code + + // for i in 0..100 { + // metrics.inc("req.page.1"); + // metrics.inc("req.page.2"); + // if i & 1 == 0 { + // metrics.inc("req.page.3"); + // } + // } + + // for _ in 0..27 { + // metrics.inc("call.thread.worker.1"); + // } + + // endregion: --- 单线程可用 code + + println!("{:?}", metrics.snapshot()?); + + for idx in 0..N { + task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} + } + + for _ in 0..M { + request_worker(metrics.clone())?; + } + + loop { + thread::sleep(Duration::from_secs(2)); + println!("{:?}", metrics.snapshot()); + } +} + +fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { + thread::spawn(move || loop{ + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + metrics.inc(format!("call.thread.worker.{}", idx)).unwrap(); + }); + Ok(()) +} + +fn request_worker(metrics: Metrics) -> Result<()> { + thread::spawn(move || loop { + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..=256); + metrics.inc(format!("req.page.{}", page)).unwrap(); + }); + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 08dc35d..76ffa62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ mod matrix; +mod metrics; mod vector; pub use matrix::*; +pub use metrics::*; pub use vector::*; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..dd0e5eb --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,64 @@ +// metrics data structure +// basic functions: inc/dec/snapshot + +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; + +use anyhow::{anyhow, Result}; + +// metrics table +#[derive(Debug, Default, Clone)] +pub struct Metrics { + // data: HashMap, + data: Arc>>, +} + +// region: --- impls +impl Metrics { + pub fn new() -> Self { + // Self { + // data: HashMap::new(), + // } + Self::default() + } + // pub fn inc(&mut self, key: &str) { + // let counter = self.data.entry(key.to_string()).or_insert(0); + // *counter += 1; + // } + + // pub fn dec(&mut self, key: &str) { + // let counter = self.data.entry(key.to_string()).or_insert(0); + // *counter -= 1; + // } + + pub fn inc(&self, key: impl Into) -> Result<()> { + // let counter = self.data.lock.entry(key.into()).or_insert(0); + // *counter += 1; + // let mut data = self.data.lock().unwrap(); + let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; + let counter = data.entry(key.into()).or_insert(0); + *counter += 1; + Ok(()) + } + + pub fn dec(&self, key: impl Into) -> Result<()> { + // let counter = self.data.entry(key.into()).or_insert(0); + // *counter -= 1; + let mut data = self.data.lock().unwrap(); + let counter = data.entry(key.into()).or_insert(0); + *counter -= 1; + Ok(()) + } + + pub fn snapshot(&self) -> Result> { + // self.data.clone() + Ok(self + .data + .lock() + .map_err(|e| anyhow!(e.to_string()))? + .clone()) + } +} +// endregion: --- impls From 58b4d387414b7fd2469ff70a3015eea79b0fc1c0 Mon Sep 17 00:00:00 2001 From: upupnoah Date: Wed, 10 Jul 2024 15:25:56 +0700 Subject: [PATCH 2/4] fix: reduce the use of unwrap --- examples/metrics.rs | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/examples/metrics.rs b/examples/metrics.rs index 4a851a1..d6c58b4 100644 --- a/examples/metrics.rs +++ b/examples/metrics.rs @@ -1,4 +1,7 @@ -use std::{thread, time::Duration}; +use std::{ + thread, + time::{Duration, Instant}, +}; use anyhow::Result; use concurrency::Metrics; @@ -8,6 +11,7 @@ const N: usize = 2; const M: usize = 4; fn main() -> Result<()> { let metrics = Metrics::new(); + let start_time = Instant::now(); // region: --- 单线程可用 code @@ -35,27 +39,38 @@ fn main() -> Result<()> { request_worker(metrics.clone())?; } - loop { + while start_time.elapsed() < Duration::from_secs(10) { thread::sleep(Duration::from_secs(2)); - println!("{:?}", metrics.snapshot()); + println!("{:?}", metrics.snapshot()?); } + + Ok(()) } fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { - thread::spawn(move || loop{ - let mut rng = rand::thread_rng(); - thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); - metrics.inc(format!("call.thread.worker.{}", idx)).unwrap(); + thread::spawn(move || { + loop { + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + // metrics.inc(format!("call.thread.worker.{}", idx)).unwrap(); + metrics.inc(format!("call.thread.worker.{}", idx))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) }); Ok(()) } fn request_worker(metrics: Metrics) -> Result<()> { - thread::spawn(move || loop { - let mut rng = rand::thread_rng(); - thread::sleep(Duration::from_millis(rng.gen_range(50..800))); - let page = rng.gen_range(1..=256); - metrics.inc(format!("req.page.{}", page)).unwrap(); + thread::spawn(move || { + loop { + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..=256); + metrics.inc(format!("req.page.{}", page))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) }); Ok(()) } From 0d124cfb780d2738a793192131524171b22fc785 Mon Sep 17 00:00:00 2001 From: upupnoah Date: Wed, 10 Jul 2024 23:41:37 +0700 Subject: [PATCH 3/4] feat: support DashMap --- Cargo.lock | 153 +++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + examples/metrics.rs | 65 +++++++++++------ examples/read_write.rs | 61 ++++++++++++++++ examples/rwlock.rs | 53 ++++++++++++++ src/metrics.rs | 129 ++++++++++++++++++++++++---------- 6 files changed, 404 insertions(+), 58 deletions(-) create mode 100644 examples/read_write.rs create mode 100644 examples/rwlock.rs diff --git a/Cargo.lock b/Cargo.lock index fb7c0c3..92e5967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,18 @@ version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + [[package]] name = "cfg-if" version = "1.0.0" @@ -19,10 +31,31 @@ name = "concurrency" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "oneshot", "rand", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -34,18 +67,53 @@ dependencies = [ "wasi", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + [[package]] name = "oneshot" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -82,8 +150,93 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index e85c0b3..ec40a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ authors = ["Noah "] [dependencies] anyhow = "^1.0" +dashmap = "6.0.1" oneshot = "0.1.8" rand = "^0.8.5" diff --git a/examples/metrics.rs b/examples/metrics.rs index d6c58b4..ff58374 100644 --- a/examples/metrics.rs +++ b/examples/metrics.rs @@ -9,43 +9,66 @@ use rand::Rng; const N: usize = 2; const M: usize = 4; -fn main() -> Result<()> { - let metrics = Metrics::new(); - let start_time = Instant::now(); +// region: --- HashMap version +// fn main() -> Result<()> { +// let metrics = Metrics::new(); +// let start_time = Instant::now(); + +// // region: --- 单线程可用 code + +// // for i in 0..100 { +// // metrics.inc("req.page.1"); +// // metrics.inc("req.page.2"); +// // if i & 1 == 0 { +// // metrics.inc("req.page.3"); +// // } +// // } + +// // for _ in 0..27 { +// // metrics.inc("call.thread.worker.1"); +// // } + +// // endregion: --- 单线程可用 code - // region: --- 单线程可用 code +// println!("{:?}", metrics.snapshot()?); - // for i in 0..100 { - // metrics.inc("req.page.1"); - // metrics.inc("req.page.2"); - // if i & 1 == 0 { - // metrics.inc("req.page.3"); - // } - // } +// for idx in 0..N { +// task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} +// } - // for _ in 0..27 { - // metrics.inc("call.thread.worker.1"); - // } +// for _ in 0..M { +// request_worker(metrics.clone())?; +// } - // endregion: --- 单线程可用 code +// while start_time.elapsed() < Duration::from_secs(10) { +// thread::sleep(Duration::from_secs(2)); +// // println!("{:?}", metrics.snapshot()?); // snapshot 使用 clone 的方式 +// println!("{}", metrics); // 拿到读锁之后, 直接打印 +// } - println!("{:?}", metrics.snapshot()?); +// Ok(()) +// } +// endregion: --- HashMap version + +// region: --- DashMap version +fn main() -> Result<()> { + let metrics = Metrics::new(); + let start_time = Instant::now(); + println!("{}", metrics); for idx in 0..N { - task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} + task_worker(idx, metrics.clone())?; } - for _ in 0..M { request_worker(metrics.clone())?; } - while start_time.elapsed() < Duration::from_secs(10) { thread::sleep(Duration::from_secs(2)); - println!("{:?}", metrics.snapshot()?); + println!("{}", metrics); } - Ok(()) } +// endregion: --- DashMap version fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { thread::spawn(move || { diff --git a/examples/read_write.rs b/examples/read_write.rs new file mode 100644 index 0000000..dbaf6e8 --- /dev/null +++ b/examples/read_write.rs @@ -0,0 +1,61 @@ +use anyhow::Result; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +// 读写不分离的例子, 频繁读和写 + +fn main() -> Result<()> { + let stock_data = Arc::new(RwLock::new(StockData { + price: 100.0, + volume: 1000, + })); + + // 模拟实时写操作,更新股票价格 + let writer = { + // let stock_data = Arc::clone(&stock_data); + let stock_data = stock_data.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let mut data = stock_data.write().unwrap(); + data.price += 1.0; + data.volume += 100; + println!( + "Updated stock data: price={}, volume={}", + data.price, data.volume + ); + } + thread::sleep(Duration::from_millis(100)); + } + }) + }; + + // 模拟多个读取操作,获取最新的股票价格 + let readers: Vec<_> = (0..5) + .map(|i| { + // let stock_data = Arc::clone(&stock_data); + let stock_data = stock_data.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let data = stock_data.read().unwrap(); + println!("Reader {}: price={}, volume={}", i, data.price, data.volume); + } + thread::sleep(Duration::from_millis(50)); + } + }) + }) + .collect(); + + writer.join().unwrap(); + for reader in readers { + reader.join().unwrap(); + } + Ok(()) +} + +struct StockData { + price: f64, + volume: u64, +} diff --git a/examples/rwlock.rs b/examples/rwlock.rs new file mode 100644 index 0000000..ccf8aca --- /dev/null +++ b/examples/rwlock.rs @@ -0,0 +1,53 @@ +use anyhow::Result; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +// 假设我们有一个网站访问计数器,它统计每个页面的访问次数。网站有大量的读取操作,因为每个访问者都会查看访问次数, +// 但只有少量的写入操作,因为只有在访问页面时才会增加访问次数。 + +fn main() -> Result<()> { + let page_views = Arc::new(RwLock::new(HashMap::new())); + + // 模拟写操作,增加页面访问次数 + let writer = { + // let page_views = Arc::clone(&page_views); + let page_views = page_views.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let mut views = page_views.write().unwrap(); + let counter = views.entry("home".to_string()).or_insert(0); + *counter += 1; + println!("Page 'home' visited {} times", *counter); + } + thread::sleep(Duration::from_millis(100)); + } + }) + }; + + // 模拟多个读取操作,查看页面访问次数 + let readers: Vec<_> = (0..5) + .map(|i| { + // let page_views = Arc::clone(&page_views); + let page_views = page_views.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let views = page_views.read().unwrap(); + let counter = views.get("home").unwrap_or(&0); + println!("Reader {}: Page 'home' visited {} times", i, counter); + } + thread::sleep(Duration::from_millis(50)); + } + }) + }) + .collect(); + + writer.join().unwrap(); + for reader in readers { + reader.join().unwrap(); + } + Ok(()) +} diff --git a/src/metrics.rs b/src/metrics.rs index dd0e5eb..0fe64b0 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,64 +1,119 @@ // metrics data structure // basic functions: inc/dec/snapshot -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::{fmt::Display, sync::Arc}; -use anyhow::{anyhow, Result}; +use anyhow::Result; +use dashmap::DashMap; -// metrics table -#[derive(Debug, Default, Clone)] +// region: --- HashMap Version +// // metrics table +// #[derive(Debug, Default, Clone)] +// pub struct Metrics { +// // data: HashMap, +// // data: Arc>>, +// data: Arc>>, +// } + +// // region: --- impls +// impl Metrics { +// pub fn new() -> Self { +// // Self { +// // data: HashMap::new(), +// // } +// Self::default() +// } +// // pub fn inc(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter += 1; +// // } + +// // pub fn dec(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter -= 1; +// // } + +// pub fn inc(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.lock.entry(key.into()).or_insert(0); +// // *counter += 1; +// // let mut data = self.data.lock().unwrap(); +// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter += 1; +// Ok(()) +// } + +// pub fn dec(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.entry(key.into()).or_insert(0); +// // *counter -= 1; +// // let mut data = self.data.lock().unwrap(); +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter -= 1; +// Ok(()) +// } + +// pub fn snapshot(&self) -> Result> { +// // self.data.clone() +// // Ok(self +// // .data +// // .lock() +// // .map_err(|e| anyhow!(e.to_string()))? +// // .clone()) +// Ok(self +// .data +// .read() +// .map_err(|e| anyhow!(e.to_string()))? +// .clone()) +// } +// } + +// impl Display for Metrics { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// let data = self.data.read().map_err(|_e| fmt::Error {})?; +// for (key, value) in data.iter() { +// writeln!(f, "{}: {}", key, value)?; +// } +// Ok(()) +// } +// } +// // endregion: --- impls +// endregion: --- HashMap Version + +// region: --- DashMap Version +#[derive(Default, Clone)] pub struct Metrics { - // data: HashMap, - data: Arc>>, + // Arc>> => Arc> + data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 } // region: --- impls impl Metrics { pub fn new() -> Self { - // Self { - // data: HashMap::new(), - // } Self::default() } - // pub fn inc(&mut self, key: &str) { - // let counter = self.data.entry(key.to_string()).or_insert(0); - // *counter += 1; - // } - - // pub fn dec(&mut self, key: &str) { - // let counter = self.data.entry(key.to_string()).or_insert(0); - // *counter -= 1; - // } pub fn inc(&self, key: impl Into) -> Result<()> { - // let counter = self.data.lock.entry(key.into()).or_insert(0); - // *counter += 1; - // let mut data = self.data.lock().unwrap(); - let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; - let counter = data.entry(key.into()).or_insert(0); + let mut counter = self.data.entry(key.into()).or_insert(0); *counter += 1; Ok(()) } pub fn dec(&self, key: impl Into) -> Result<()> { - // let counter = self.data.entry(key.into()).or_insert(0); - // *counter -= 1; - let mut data = self.data.lock().unwrap(); - let counter = data.entry(key.into()).or_insert(0); + let mut counter = self.data.entry(key.into()).or_insert(0); *counter -= 1; Ok(()) } +} - pub fn snapshot(&self) -> Result> { - // self.data.clone() - Ok(self - .data - .lock() - .map_err(|e| anyhow!(e.to_string()))? - .clone()) +impl Display for Metrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for entry in self.data.iter() { + writeln!(f, "{}: {}", entry.key(), entry.value())?; + } + Ok(()) } } // endregion: --- impls +// endregion: --- DashMap Version From 240cdea922488d6b97b235545b5d19ee135b7b20 Mon Sep 17 00:00:00 2001 From: upupnoah Date: Thu, 11 Jul 2024 04:32:55 +0700 Subject: [PATCH 4/4] feat(amap): support atomic map for metrics --- README.md | 13 +++ examples/ametrics.rs | 71 ++++++++++++++++ examples/{metrics.rs => cmetrics.rs} | 8 +- src/metrics.rs | 122 +-------------------------- src/metrics/amap.rs | 66 +++++++++++++++ src/metrics/cmap.rs | 119 ++++++++++++++++++++++++++ 6 files changed, 277 insertions(+), 122 deletions(-) create mode 100644 examples/ametrics.rs rename examples/{metrics.rs => cmetrics.rs} (92%) create mode 100644 src/metrics/amap.rs create mode 100644 src/metrics/cmap.rs diff --git a/README.md b/README.md index a894c5a..d832b09 100644 --- a/README.md +++ b/README.md @@ -1 +1,14 @@ # rust-concurrency + +## matrix + +- 矩阵乘法 +- 并发矩阵乘法 + +## cmap metrics(指标监测) + +| 并发 map + +## amap metrics(指标监测) + +| atomic map diff --git a/examples/ametrics.rs b/examples/ametrics.rs new file mode 100644 index 0000000..33c3c42 --- /dev/null +++ b/examples/ametrics.rs @@ -0,0 +1,71 @@ +use anyhow::Result; +use concurrency::AmapMetrics; +use rand::Rng; +use std::{ + thread, + time::{Duration, Instant}, +}; + +const N: usize = 2; +const M: usize = 4; + +fn main() -> Result<()> { + let metrics = AmapMetrics::new(&[ + "call.thread.worker.0", + "call.thread.worker.1", + "req.page.1", + "req.page.2", + "req.page.3", + "req.page.4", + ]); + let start_time = Instant::now(); + + // start N workers and M requesters + + for idx in 0..N { + task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} + } + + for _ in 0..M { + request_worker(metrics.clone())?; + } + + while start_time.elapsed() < Duration::from_secs(10) { + thread::sleep(Duration::from_secs(2)); + println!("{}", metrics); + } + + Ok(()) +} + +fn task_worker(idx: usize, metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // do long term stuff + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + metrics.inc(format!("call.thread.worker.{}", idx))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + Ok(()) +} + +fn request_worker(metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // process requests + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..5); + metrics.inc(format!("req.page.{}", page))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) +} diff --git a/examples/metrics.rs b/examples/cmetrics.rs similarity index 92% rename from examples/metrics.rs rename to examples/cmetrics.rs index ff58374..595d1a7 100644 --- a/examples/metrics.rs +++ b/examples/cmetrics.rs @@ -4,7 +4,7 @@ use std::{ }; use anyhow::Result; -use concurrency::Metrics; +use concurrency::CmapMetrics; use rand::Rng; const N: usize = 2; @@ -52,7 +52,7 @@ const M: usize = 4; // region: --- DashMap version fn main() -> Result<()> { - let metrics = Metrics::new(); + let metrics = CmapMetrics::new(); let start_time = Instant::now(); println!("{}", metrics); @@ -70,7 +70,7 @@ fn main() -> Result<()> { } // endregion: --- DashMap version -fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { +fn task_worker(idx: usize, metrics: CmapMetrics) -> Result<()> { thread::spawn(move || { loop { let mut rng = rand::thread_rng(); @@ -84,7 +84,7 @@ fn task_worker(idx: usize, metrics: Metrics) -> Result<()> { Ok(()) } -fn request_worker(metrics: Metrics) -> Result<()> { +fn request_worker(metrics: CmapMetrics) -> Result<()> { thread::spawn(move || { loop { let mut rng = rand::thread_rng(); diff --git a/src/metrics.rs b/src/metrics.rs index 0fe64b0..a563b17 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,119 +1,5 @@ -// metrics data structure -// basic functions: inc/dec/snapshot +mod cmap; +mod amap; -use std::{fmt::Display, sync::Arc}; - -use anyhow::Result; -use dashmap::DashMap; - -// region: --- HashMap Version -// // metrics table -// #[derive(Debug, Default, Clone)] -// pub struct Metrics { -// // data: HashMap, -// // data: Arc>>, -// data: Arc>>, -// } - -// // region: --- impls -// impl Metrics { -// pub fn new() -> Self { -// // Self { -// // data: HashMap::new(), -// // } -// Self::default() -// } -// // pub fn inc(&mut self, key: &str) { -// // let counter = self.data.entry(key.to_string()).or_insert(0); -// // *counter += 1; -// // } - -// // pub fn dec(&mut self, key: &str) { -// // let counter = self.data.entry(key.to_string()).or_insert(0); -// // *counter -= 1; -// // } - -// pub fn inc(&self, key: impl Into) -> Result<()> { -// // let counter = self.data.lock.entry(key.into()).or_insert(0); -// // *counter += 1; -// // let mut data = self.data.lock().unwrap(); -// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; -// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; -// let counter = data.entry(key.into()).or_insert(0); -// *counter += 1; -// Ok(()) -// } - -// pub fn dec(&self, key: impl Into) -> Result<()> { -// // let counter = self.data.entry(key.into()).or_insert(0); -// // *counter -= 1; -// // let mut data = self.data.lock().unwrap(); -// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; -// let counter = data.entry(key.into()).or_insert(0); -// *counter -= 1; -// Ok(()) -// } - -// pub fn snapshot(&self) -> Result> { -// // self.data.clone() -// // Ok(self -// // .data -// // .lock() -// // .map_err(|e| anyhow!(e.to_string()))? -// // .clone()) -// Ok(self -// .data -// .read() -// .map_err(|e| anyhow!(e.to_string()))? -// .clone()) -// } -// } - -// impl Display for Metrics { -// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -// let data = self.data.read().map_err(|_e| fmt::Error {})?; -// for (key, value) in data.iter() { -// writeln!(f, "{}: {}", key, value)?; -// } -// Ok(()) -// } -// } -// // endregion: --- impls -// endregion: --- HashMap Version - -// region: --- DashMap Version -#[derive(Default, Clone)] -pub struct Metrics { - // Arc>> => Arc> - data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 -} - -// region: --- impls -impl Metrics { - pub fn new() -> Self { - Self::default() - } - - pub fn inc(&self, key: impl Into) -> Result<()> { - let mut counter = self.data.entry(key.into()).or_insert(0); - *counter += 1; - Ok(()) - } - - pub fn dec(&self, key: impl Into) -> Result<()> { - let mut counter = self.data.entry(key.into()).or_insert(0); - *counter -= 1; - Ok(()) - } -} - -impl Display for Metrics { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - for entry in self.data.iter() { - writeln!(f, "{}: {}", entry.key(), entry.value())?; - } - Ok(()) - } -} -// endregion: --- impls -// endregion: --- DashMap Version +pub use cmap::*; +pub use amap::*; \ No newline at end of file diff --git a/src/metrics/amap.rs b/src/metrics/amap.rs new file mode 100644 index 0000000..278e366 --- /dev/null +++ b/src/metrics/amap.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use std::{ + collections::HashMap, + fmt, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; + +// Rust 标准库提供了一些原子类型,可以在多线程环境下安全地共享和修改数据 +// 不需要使用锁,原子类型的操作是无锁的,因此性能更好 +#[derive(Debug, Clone)] +pub struct AmapMetrics { + data: Arc>, +} + +impl AmapMetrics { + pub fn new(metric_names: &[&'static str]) -> Self { + // 初始化 HashMap,每个 key 对应一个 AtomicI64 类型的值 + let map = metric_names + .iter() + .map(|&name| (name, AtomicI64::new(0))) + .collect(); + AmapMetrics { + data: Arc::new(map), + } + } + + pub fn inc(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + pub fn dec(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_sub(1, Ordering::Relaxed); + Ok(()) + } +} + +// impl Clone for AmapMetrics { +// fn clone(&self) -> Self { +// AmapMetrics { +// data: Arc::clone(&self.data), +// } +// } +// } + +impl fmt::Display for AmapMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for (key, value) in self.data.iter() { + writeln!(f, "{}: {}", key, value.load(Ordering::Relaxed))?; + } + Ok(()) + } +} diff --git a/src/metrics/cmap.rs b/src/metrics/cmap.rs new file mode 100644 index 0000000..3d8bedc --- /dev/null +++ b/src/metrics/cmap.rs @@ -0,0 +1,119 @@ +// metrics data structure +// basic functions: inc/dec/snapshot + +use std::{fmt::Display, sync::Arc}; + +use anyhow::Result; +use dashmap::DashMap; + +// region: --- HashMap Version +// // metrics table +// #[derive(Debug, Default, Clone)] +// pub struct Metrics { +// // data: HashMap, +// // data: Arc>>, +// data: Arc>>, +// } + +// // region: --- impls +// impl Metrics { +// pub fn new() -> Self { +// // Self { +// // data: HashMap::new(), +// // } +// Self::default() +// } +// // pub fn inc(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter += 1; +// // } + +// // pub fn dec(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter -= 1; +// // } + +// pub fn inc(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.lock.entry(key.into()).or_insert(0); +// // *counter += 1; +// // let mut data = self.data.lock().unwrap(); +// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter += 1; +// Ok(()) +// } + +// pub fn dec(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.entry(key.into()).or_insert(0); +// // *counter -= 1; +// // let mut data = self.data.lock().unwrap(); +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter -= 1; +// Ok(()) +// } + +// pub fn snapshot(&self) -> Result> { +// // self.data.clone() +// // Ok(self +// // .data +// // .lock() +// // .map_err(|e| anyhow!(e.to_string()))? +// // .clone()) +// Ok(self +// .data +// .read() +// .map_err(|e| anyhow!(e.to_string()))? +// .clone()) +// } +// } + +// impl Display for Metrics { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// let data = self.data.read().map_err(|_e| fmt::Error {})?; +// for (key, value) in data.iter() { +// writeln!(f, "{}: {}", key, value)?; +// } +// Ok(()) +// } +// } +// // endregion: --- impls +// endregion: --- HashMap Version + +// region: --- DashMap Version +#[derive(Default, Clone)] +pub struct CmapMetrics { + // Arc>> => Arc> + data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 +} + +// region: --- impls +impl CmapMetrics { + pub fn new() -> Self { + Self::default() + } + + pub fn inc(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter += 1; + Ok(()) + } + + pub fn dec(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter -= 1; + Ok(()) + } +} + +impl Display for CmapMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for entry in self.data.iter() { + writeln!(f, "{}: {}", entry.key(), entry.value())?; + } + Ok(()) + } +} +// endregion: --- impls +// endregion: --- DashMap Version