diff --git a/Cargo.lock b/Cargo.lock index fb7c0c3..92e5967 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8,6 +8,18 @@ version = "1.0.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f538837af36e6f6a9be0faa67f9a314f8119e4e4b5867c6ab40ed60360142519" +[[package]] +name = "autocfg" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c4b4d0bd25bd0b74681c0ad21497610ce1b7c91b1022cd21c80c6fbdd9476b0" + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + [[package]] name = "cfg-if" version = "1.0.0" @@ -19,10 +31,31 @@ name = "concurrency" version = "0.1.0" dependencies = [ "anyhow", + "dashmap", "oneshot", "rand", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + +[[package]] +name = "dashmap" +version = "6.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "804c8821570c3f8b70230c2ba75ffa5c0f9a4189b9a432b6656c536712acae28" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "getrandom" version = "0.2.15" @@ -34,18 +67,53 @@ dependencies = [ "wasi", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "libc" version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "once_cell" +version = "1.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" + [[package]] name = "oneshot" version = "0.1.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -82,8 +150,93 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c82cf8cff14456045f55ec4241383baeff27af886adb72ffb2162f99911de0fd" +dependencies = [ + "bitflags", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/Cargo.toml b/Cargo.toml index e85c0b3..ec40a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,5 +8,6 @@ authors = ["Noah "] [dependencies] anyhow = "^1.0" +dashmap = "6.0.1" oneshot = "0.1.8" rand = "^0.8.5" diff --git a/README.md b/README.md index a894c5a..d832b09 100644 --- a/README.md +++ b/README.md @@ -1 +1,14 @@ # rust-concurrency + +## matrix + +- 矩阵乘法 +- 并发矩阵乘法 + +## cmap metrics(指标监测) + +| 并发 map + +## amap metrics(指标监测) + +| atomic map diff --git a/examples/ametrics.rs b/examples/ametrics.rs new file mode 100644 index 0000000..33c3c42 --- /dev/null +++ b/examples/ametrics.rs @@ -0,0 +1,71 @@ +use anyhow::Result; +use concurrency::AmapMetrics; +use rand::Rng; +use std::{ + thread, + time::{Duration, Instant}, +}; + +const N: usize = 2; +const M: usize = 4; + +fn main() -> Result<()> { + let metrics = AmapMetrics::new(&[ + "call.thread.worker.0", + "call.thread.worker.1", + "req.page.1", + "req.page.2", + "req.page.3", + "req.page.4", + ]); + let start_time = Instant::now(); + + // start N workers and M requesters + + for idx in 0..N { + task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} + } + + for _ in 0..M { + request_worker(metrics.clone())?; + } + + while start_time.elapsed() < Duration::from_secs(10) { + thread::sleep(Duration::from_secs(2)); + println!("{}", metrics); + } + + Ok(()) +} + +fn task_worker(idx: usize, metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // do long term stuff + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + metrics.inc(format!("call.thread.worker.{}", idx))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + Ok(()) +} + +fn request_worker(metrics: AmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + // process requests + let mut rng = rand::thread_rng(); + + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..5); + metrics.inc(format!("req.page.{}", page))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + + Ok(()) +} diff --git a/examples/cmetrics.rs b/examples/cmetrics.rs new file mode 100644 index 0000000..595d1a7 --- /dev/null +++ b/examples/cmetrics.rs @@ -0,0 +1,99 @@ +use std::{ + thread, + time::{Duration, Instant}, +}; + +use anyhow::Result; +use concurrency::CmapMetrics; +use rand::Rng; + +const N: usize = 2; +const M: usize = 4; +// region: --- HashMap version +// fn main() -> Result<()> { +// let metrics = Metrics::new(); +// let start_time = Instant::now(); + +// // region: --- 单线程可用 code + +// // for i in 0..100 { +// // metrics.inc("req.page.1"); +// // metrics.inc("req.page.2"); +// // if i & 1 == 0 { +// // metrics.inc("req.page.3"); +// // } +// // } + +// // for _ in 0..27 { +// // metrics.inc("call.thread.worker.1"); +// // } + +// // endregion: --- 单线程可用 code + +// println!("{:?}", metrics.snapshot()?); + +// for idx in 0..N { +// task_worker(idx, metrics.clone())?; // Metrics {data: Arc::clone(&metrics.data)} +// } + +// for _ in 0..M { +// request_worker(metrics.clone())?; +// } + +// while start_time.elapsed() < Duration::from_secs(10) { +// thread::sleep(Duration::from_secs(2)); +// // println!("{:?}", metrics.snapshot()?); // snapshot 使用 clone 的方式 +// println!("{}", metrics); // 拿到读锁之后, 直接打印 +// } + +// Ok(()) +// } +// endregion: --- HashMap version + +// region: --- DashMap version +fn main() -> Result<()> { + let metrics = CmapMetrics::new(); + let start_time = Instant::now(); + println!("{}", metrics); + + for idx in 0..N { + task_worker(idx, metrics.clone())?; + } + for _ in 0..M { + request_worker(metrics.clone())?; + } + while start_time.elapsed() < Duration::from_secs(10) { + thread::sleep(Duration::from_secs(2)); + println!("{}", metrics); + } + Ok(()) +} +// endregion: --- DashMap version + +fn task_worker(idx: usize, metrics: CmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(100..5000))); + // metrics.inc(format!("call.thread.worker.{}", idx)).unwrap(); + metrics.inc(format!("call.thread.worker.{}", idx))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + Ok(()) +} + +fn request_worker(metrics: CmapMetrics) -> Result<()> { + thread::spawn(move || { + loop { + let mut rng = rand::thread_rng(); + thread::sleep(Duration::from_millis(rng.gen_range(50..800))); + let page = rng.gen_range(1..=256); + metrics.inc(format!("req.page.{}", page))?; + } + #[allow(unreachable_code)] + Ok::<_, anyhow::Error>(()) + }); + Ok(()) +} diff --git a/examples/read_write.rs b/examples/read_write.rs new file mode 100644 index 0000000..dbaf6e8 --- /dev/null +++ b/examples/read_write.rs @@ -0,0 +1,61 @@ +use anyhow::Result; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +// 读写不分离的例子, 频繁读和写 + +fn main() -> Result<()> { + let stock_data = Arc::new(RwLock::new(StockData { + price: 100.0, + volume: 1000, + })); + + // 模拟实时写操作,更新股票价格 + let writer = { + // let stock_data = Arc::clone(&stock_data); + let stock_data = stock_data.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let mut data = stock_data.write().unwrap(); + data.price += 1.0; + data.volume += 100; + println!( + "Updated stock data: price={}, volume={}", + data.price, data.volume + ); + } + thread::sleep(Duration::from_millis(100)); + } + }) + }; + + // 模拟多个读取操作,获取最新的股票价格 + let readers: Vec<_> = (0..5) + .map(|i| { + // let stock_data = Arc::clone(&stock_data); + let stock_data = stock_data.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let data = stock_data.read().unwrap(); + println!("Reader {}: price={}, volume={}", i, data.price, data.volume); + } + thread::sleep(Duration::from_millis(50)); + } + }) + }) + .collect(); + + writer.join().unwrap(); + for reader in readers { + reader.join().unwrap(); + } + Ok(()) +} + +struct StockData { + price: f64, + volume: u64, +} diff --git a/examples/rwlock.rs b/examples/rwlock.rs new file mode 100644 index 0000000..ccf8aca --- /dev/null +++ b/examples/rwlock.rs @@ -0,0 +1,53 @@ +use anyhow::Result; +use std::collections::HashMap; +use std::sync::{Arc, RwLock}; +use std::thread; +use std::time::Duration; + +// 假设我们有一个网站访问计数器,它统计每个页面的访问次数。网站有大量的读取操作,因为每个访问者都会查看访问次数, +// 但只有少量的写入操作,因为只有在访问页面时才会增加访问次数。 + +fn main() -> Result<()> { + let page_views = Arc::new(RwLock::new(HashMap::new())); + + // 模拟写操作,增加页面访问次数 + let writer = { + // let page_views = Arc::clone(&page_views); + let page_views = page_views.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let mut views = page_views.write().unwrap(); + let counter = views.entry("home".to_string()).or_insert(0); + *counter += 1; + println!("Page 'home' visited {} times", *counter); + } + thread::sleep(Duration::from_millis(100)); + } + }) + }; + + // 模拟多个读取操作,查看页面访问次数 + let readers: Vec<_> = (0..5) + .map(|i| { + // let page_views = Arc::clone(&page_views); + let page_views = page_views.clone(); + thread::spawn(move || { + for _ in 0..10 { + { + let views = page_views.read().unwrap(); + let counter = views.get("home").unwrap_or(&0); + println!("Reader {}: Page 'home' visited {} times", i, counter); + } + thread::sleep(Duration::from_millis(50)); + } + }) + }) + .collect(); + + writer.join().unwrap(); + for reader in readers { + reader.join().unwrap(); + } + Ok(()) +} diff --git a/src/lib.rs b/src/lib.rs index 08dc35d..76ffa62 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,7 @@ mod matrix; +mod metrics; mod vector; pub use matrix::*; +pub use metrics::*; pub use vector::*; diff --git a/src/metrics.rs b/src/metrics.rs new file mode 100644 index 0000000..a563b17 --- /dev/null +++ b/src/metrics.rs @@ -0,0 +1,5 @@ +mod cmap; +mod amap; + +pub use cmap::*; +pub use amap::*; \ No newline at end of file diff --git a/src/metrics/amap.rs b/src/metrics/amap.rs new file mode 100644 index 0000000..278e366 --- /dev/null +++ b/src/metrics/amap.rs @@ -0,0 +1,66 @@ +use anyhow::Result; +use std::{ + collections::HashMap, + fmt, + sync::{ + atomic::{AtomicI64, Ordering}, + Arc, + }, +}; + +// Rust 标准库提供了一些原子类型,可以在多线程环境下安全地共享和修改数据 +// 不需要使用锁,原子类型的操作是无锁的,因此性能更好 +#[derive(Debug, Clone)] +pub struct AmapMetrics { + data: Arc>, +} + +impl AmapMetrics { + pub fn new(metric_names: &[&'static str]) -> Self { + // 初始化 HashMap,每个 key 对应一个 AtomicI64 类型的值 + let map = metric_names + .iter() + .map(|&name| (name, AtomicI64::new(0))) + .collect(); + AmapMetrics { + data: Arc::new(map), + } + } + + pub fn inc(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_add(1, Ordering::Relaxed); + Ok(()) + } + + pub fn dec(&self, key: impl AsRef) -> Result<()> { + let key = key.as_ref(); + let counter = self + .data + .get(key) + .ok_or_else(|| anyhow::anyhow!("key {} not found", key))?; + counter.fetch_sub(1, Ordering::Relaxed); + Ok(()) + } +} + +// impl Clone for AmapMetrics { +// fn clone(&self) -> Self { +// AmapMetrics { +// data: Arc::clone(&self.data), +// } +// } +// } + +impl fmt::Display for AmapMetrics { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + for (key, value) in self.data.iter() { + writeln!(f, "{}: {}", key, value.load(Ordering::Relaxed))?; + } + Ok(()) + } +} diff --git a/src/metrics/cmap.rs b/src/metrics/cmap.rs new file mode 100644 index 0000000..3d8bedc --- /dev/null +++ b/src/metrics/cmap.rs @@ -0,0 +1,119 @@ +// metrics data structure +// basic functions: inc/dec/snapshot + +use std::{fmt::Display, sync::Arc}; + +use anyhow::Result; +use dashmap::DashMap; + +// region: --- HashMap Version +// // metrics table +// #[derive(Debug, Default, Clone)] +// pub struct Metrics { +// // data: HashMap, +// // data: Arc>>, +// data: Arc>>, +// } + +// // region: --- impls +// impl Metrics { +// pub fn new() -> Self { +// // Self { +// // data: HashMap::new(), +// // } +// Self::default() +// } +// // pub fn inc(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter += 1; +// // } + +// // pub fn dec(&mut self, key: &str) { +// // let counter = self.data.entry(key.to_string()).or_insert(0); +// // *counter -= 1; +// // } + +// pub fn inc(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.lock.entry(key.into()).or_insert(0); +// // *counter += 1; +// // let mut data = self.data.lock().unwrap(); +// // let mut data = self.data.lock().map_err(|e| anyhow!(e.to_string()))?; +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter += 1; +// Ok(()) +// } + +// pub fn dec(&self, key: impl Into) -> Result<()> { +// // let counter = self.data.entry(key.into()).or_insert(0); +// // *counter -= 1; +// // let mut data = self.data.lock().unwrap(); +// let mut data = self.data.write().map_err(|e| anyhow!(e.to_string()))?; +// let counter = data.entry(key.into()).or_insert(0); +// *counter -= 1; +// Ok(()) +// } + +// pub fn snapshot(&self) -> Result> { +// // self.data.clone() +// // Ok(self +// // .data +// // .lock() +// // .map_err(|e| anyhow!(e.to_string()))? +// // .clone()) +// Ok(self +// .data +// .read() +// .map_err(|e| anyhow!(e.to_string()))? +// .clone()) +// } +// } + +// impl Display for Metrics { +// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +// let data = self.data.read().map_err(|_e| fmt::Error {})?; +// for (key, value) in data.iter() { +// writeln!(f, "{}: {}", key, value)?; +// } +// Ok(()) +// } +// } +// // endregion: --- impls +// endregion: --- HashMap Version + +// region: --- DashMap Version +#[derive(Default, Clone)] +pub struct CmapMetrics { + // Arc>> => Arc> + data: Arc>, // 不需要加锁, 因为 DashMap 本身是线程安全的 +} + +// region: --- impls +impl CmapMetrics { + pub fn new() -> Self { + Self::default() + } + + pub fn inc(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter += 1; + Ok(()) + } + + pub fn dec(&self, key: impl Into) -> Result<()> { + let mut counter = self.data.entry(key.into()).or_insert(0); + *counter -= 1; + Ok(()) + } +} + +impl Display for CmapMetrics { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + for entry in self.data.iter() { + writeln!(f, "{}: {}", entry.key(), entry.value())?; + } + Ok(()) + } +} +// endregion: --- impls +// endregion: --- DashMap Version