From ca0eba6e6dc4c4ce47edc706804290c5a6b28687 Mon Sep 17 00:00:00 2001 From: white-oak Date: Tue, 30 Jan 2024 19:43:17 +0300 Subject: [PATCH 1/2] feat(core): use available memory as a trigger for OOM shutdown `avail < total * (1 - ratio)` threshold is now used instead of old `used > total * ratio` to shutdown a service when tracking memory usage. Additionally, total memory in bytes, and used/available memory as percentages of total memory are now being logged on OOM. --- CHANGELOG.md | 2 + elfo-core/src/init.rs | 17 ++++- elfo-core/src/memory_tracker.rs | 127 ++++++++++++++++++++++++-------- 3 files changed, 114 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4378a325..663bd7aa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] - ReleaseDate +### Changed +- core: changed OOM prevention trigger (that gracefully shutdowns the service) from old `used > total * ratio` to new `avail < total * (1 - ratio)`. ([#120]) ## [0.2.0-alpha.12] - 2023-12-21 ### Changed diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index fe65fb67..b7957cd8 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -19,6 +19,7 @@ use crate::{ context::Context, demux::Demux, errors::{RequestError, StartError, StartGroupError}, + memory_tracker::MemoryCheckResult, message, messages::{StartEntrypoint, Terminate, UpdateConfig}, object::Object, @@ -241,9 +242,19 @@ async fn termination(mut ctx: Context, topology: Topology) { #[cfg(target_os = "linux")] if envelope.is::() { match memory_tracker.as_ref().map(|mt| mt.check()) { - Some(Ok(true)) | None => {} - Some(Ok(false)) => { - error!("maximum memory usage is reached, forcibly terminating"); + Some(Ok(MemoryCheckResult::Passed)) | None => {} + Some(Ok(MemoryCheckResult::Failed(stats))) => { + let percents_of_total = + |x| ((x as f64) / (stats.total as f64) * 100.).round() as u64; + let used = percents_of_total(stats.used); + let available = percents_of_total(stats.available); + error!( + total = stats.total, + used_pct = used, + available_pct = available, + "maximum memory usage is reached, forcibly terminating" + ); + let _ = ctx.try_send_to(ctx.addr(), TerminateSystem); oom_prevented = true; } diff --git a/elfo-core/src/memory_tracker.rs b/elfo-core/src/memory_tracker.rs index 765cfb3c..f1b21ca1 100644 --- a/elfo-core/src/memory_tracker.rs +++ b/elfo-core/src/memory_tracker.rs @@ -1,34 +1,51 @@ //! Contains `MemoryTracker` that tracks memory usage of the process. - +use derive_more::IsVariant; use metrics::gauge; /// Checks memory usage to prevent OOM. pub(crate) struct MemoryTracker { - threshold: f64, + /// A threshold that available memory should not reach (in fractions of + /// total memory). + available_threshold: f64, +} + +#[derive(Debug, Clone, Copy, IsVariant)] +pub(crate) enum MemoryCheckResult { + Passed, + Failed(MemoryStats), } impl MemoryTracker { pub(crate) fn new(threshold: f64) -> Result { - let tracker = Self { threshold }; + let tracker = Self { + available_threshold: 1. - threshold, + }; tracker.check()?; Ok(tracker) } - /// Returns `Ok(false)` if the threshold is reached. - pub(crate) fn check(&self) -> Result { + /// Returns `Ok(MemoryCheckResult::Failed(stats))` if available memory + /// amount is below a threshold. `stats` will include memory stats data + /// that triggered the fail. + pub(crate) fn check(&self) -> Result { let stats = get_stats()?; - let used_pct = stats.used as f64 / stats.total as f64; gauge!("elfo_memory_usage_bytes", stats.used as f64); - Ok(used_pct < self.threshold) + let res = if (stats.available as f64) < stats.total as f64 * self.available_threshold { + MemoryCheckResult::Failed(stats) + } else { + MemoryCheckResult::Passed + }; + Ok(res) } } -#[derive(Clone, Copy)] -struct MemoryStats { - total: usize, - used: usize, +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) struct MemoryStats { + pub(crate) total: usize, + pub(crate) used: usize, + pub(crate) available: usize, } #[cfg(test)] @@ -41,21 +58,24 @@ mod proc_stats { use super::MemoryStats; + const PROC_MEMINFO: &str = "/proc/meminfo"; + pub(super) fn get() -> Result { const PROC_SELF_STATM: &str = "/proc/self/statm"; - const PROC_MEMINFO: &str = "/proc/meminfo"; const PAGE_SIZE: usize = 4096; // TODO: use `sysconf(_SC_PAGESIZE)` - // Get `total`. + // Assumes `MemTotal` is always the 1st line in `/proc/meminfo`. + const MEM_TOTAL_LINE_NO: usize = 0; + // Assumes `MemAvailable` is always the 3rd line in `/proc/meminfo`. + const MEM_AVAILABLE_LINE_NO: usize = 2; + let proc_meminfo = fs::read_to_string(PROC_MEMINFO) .map_err(|err| format!("cannot read {PROC_MEMINFO}: {err}"))?; + let proc_meminfo = proc_meminfo.split_ascii_whitespace(); - let total = proc_meminfo - .split_ascii_whitespace() - .nth(1) - .and_then(|s| s.parse::().ok()) - .ok_or_else(|| format!("cannot parse {PROC_MEMINFO}"))? - * 1024; // always in KiB + let total = get_value_from_meminfo(proc_meminfo.clone(), "MemTotal", MEM_TOTAL_LINE_NO)?; + let available = + get_value_from_meminfo(proc_meminfo, "MemAvailable", MEM_AVAILABLE_LINE_NO)?; // Get `used`. let proc_self_statm = fs::read_to_string(PROC_SELF_STATM) @@ -68,15 +88,46 @@ mod proc_stats { .ok_or_else(|| format!("cannot parse {PROC_SELF_STATM}"))? * PAGE_SIZE; - Ok(MemoryStats { used, total }) + Ok(MemoryStats { + total, + used, + available, + }) + } + + fn get_value_from_meminfo<'a>( + mut proc_meminfo: impl Iterator, + item_name: &str, + line: usize, + ) -> Result { + const MEMINFO_ITEM_PER_LINE: usize = 3; + const MEMINFO_VALUE_INDEX: usize = 1; + let Some(value) = proc_meminfo.nth(line * MEMINFO_ITEM_PER_LINE + MEMINFO_VALUE_INDEX) + else { + return Err(format!( + "failed to find `{item_name}` at line {line}, in {PROC_MEMINFO}" + )); + }; + + let value = match value.parse::() { + Ok(x) => x * 1024, // always in KiB. + Err(err) => { + return Err(format!( + "failed to parse `{item_name}`: {err}, in {PROC_MEMINFO}" + )); + } + }; + Ok(value) } #[test] fn it_works() { let stats = get().unwrap(); assert!(stats.total > 0); + assert!(stats.available > 0); assert!(stats.used > 0); assert!(stats.used < stats.total); + assert!(stats.available + stats.used <= stats.total); } } @@ -87,7 +138,7 @@ mod mock_stats { use super::MemoryStats; thread_local! { - static STATS: Cell> = Cell::new(Err("not exists")); + static STATS: Cell> = const { Cell::new(Err("not exists")) }; } pub(super) fn get() -> Result { @@ -105,20 +156,38 @@ fn it_works() { mock_stats::set(Ok(MemoryStats { total: 1000, used: 100, + available: 900, })); let memory_tracker = MemoryTracker::new(0.5); - assert!(memory_tracker.as_ref().unwrap().check().unwrap()); + assert!(memory_tracker + .as_ref() + .unwrap() + .check() + .unwrap() + .is_passed()); mock_stats::set(Ok(MemoryStats { total: 1000, - used: 499, + used: 200, + available: 500, })); - assert!(memory_tracker.as_ref().unwrap().check().unwrap()); - - mock_stats::set(Ok(MemoryStats { + assert!(memory_tracker + .as_ref() + .unwrap() + .check() + .unwrap() + .is_passed()); + + let expected_stats = MemoryStats { total: 1000, - used: 500, - })); - assert!(!memory_tracker.as_ref().unwrap().check().unwrap()); + used: 100, + available: 499, + }; + mock_stats::set(Ok(expected_stats)); + let res = memory_tracker.as_ref().unwrap().check().unwrap(); + let MemoryCheckResult::Failed(stats) = res else { + panic!("memory check passed when should have failed"); + }; + assert_eq!(stats, expected_stats); } From bd4a564c961169820287cebb841e6c38b0f9a567 Mon Sep 17 00:00:00 2001 From: Suvorov Stanislav Date: Wed, 31 Jan 2024 23:36:48 +0300 Subject: [PATCH 2/2] feat(core): parse meminfo smarter --- CHANGELOG.md | 2 + elfo-core/src/memory_tracker.rs | 124 ++++++++++++++++++++++++-------- elfo-core/src/message.rs | 4 +- 3 files changed, 99 insertions(+), 31 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 663bd7aa..77ae04f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - core: changed OOM prevention trigger (that gracefully shutdowns the service) from old `used > total * ratio` to new `avail < total * (1 - ratio)`. ([#120]) +[#120] - https://github.com/elfo-rs/elfo/pull/120 + ## [0.2.0-alpha.12] - 2023-12-21 ### Changed - **BREAKING** restarting: the default restart policy is `RestartPolicy::never()` now ([#118]). diff --git a/elfo-core/src/memory_tracker.rs b/elfo-core/src/memory_tracker.rs index f1b21ca1..f456e8f9 100644 --- a/elfo-core/src/memory_tracker.rs +++ b/elfo-core/src/memory_tracker.rs @@ -64,18 +64,10 @@ mod proc_stats { const PROC_SELF_STATM: &str = "/proc/self/statm"; const PAGE_SIZE: usize = 4096; // TODO: use `sysconf(_SC_PAGESIZE)` - // Assumes `MemTotal` is always the 1st line in `/proc/meminfo`. - const MEM_TOTAL_LINE_NO: usize = 0; - // Assumes `MemAvailable` is always the 3rd line in `/proc/meminfo`. - const MEM_AVAILABLE_LINE_NO: usize = 2; - let proc_meminfo = fs::read_to_string(PROC_MEMINFO) .map_err(|err| format!("cannot read {PROC_MEMINFO}: {err}"))?; - let proc_meminfo = proc_meminfo.split_ascii_whitespace(); - let total = get_value_from_meminfo(proc_meminfo.clone(), "MemTotal", MEM_TOTAL_LINE_NO)?; - let available = - get_value_from_meminfo(proc_meminfo, "MemAvailable", MEM_AVAILABLE_LINE_NO)?; + let (total, available) = get_values_from_meminfo(&proc_meminfo)?; // Get `used`. let proc_self_statm = fs::read_to_string(PROC_SELF_STATM) @@ -95,29 +87,103 @@ mod proc_stats { }) } - fn get_value_from_meminfo<'a>( - mut proc_meminfo: impl Iterator, - item_name: &str, - line: usize, - ) -> Result { - const MEMINFO_ITEM_PER_LINE: usize = 3; - const MEMINFO_VALUE_INDEX: usize = 1; - let Some(value) = proc_meminfo.nth(line * MEMINFO_ITEM_PER_LINE + MEMINFO_VALUE_INDEX) - else { + fn get_values_from_meminfo(proc_meminfo: &str) -> Result<(usize, usize), String> { + const MEM_TOTAL_NAME: &str = "MemTotal"; + const MEM_AVAILABLE_NAME: &str = "MemAvailable"; + + let mut total = None; + let mut available = None; + + for line in proc_meminfo.split('\n') { + let Some((name, suffix)) = line.split_once(':') else { + continue; + }; + + match name { + MEM_TOTAL_NAME => { + total = Some( + parse_size(suffix) + .map_err(|err| format!("failed to parse {MEM_TOTAL_NAME}: {err}"))?, + ) + } + MEM_AVAILABLE_NAME => { + available = + Some(parse_size(suffix).map_err(|err| { + format!("failed to parse {MEM_AVAILABLE_NAME}: {err}") + })?) + } + _ => continue, + } + + if total.is_some() && available.is_some() { + break; + } + } + + let Some(total) = total else { return Err(format!( - "failed to find `{item_name}` at line {line}, in {PROC_MEMINFO}" + "failed to find {MEM_AVAILABLE_NAME} in {PROC_MEMINFO}" )); }; - - let value = match value.parse::() { - Ok(x) => x * 1024, // always in KiB. - Err(err) => { - return Err(format!( - "failed to parse `{item_name}`: {err}, in {PROC_MEMINFO}" - )); - } + let Some(available) = available else { + return Err(format!("failed to find {MEM_TOTAL_NAME} in {PROC_MEMINFO}")); }; - Ok(value) + + Ok((total, available)) + } + + fn parse_size(suffix: &str) -> Result { + suffix + .trim_start() + .strip_suffix(" kB") + .map(|n_str| n_str.parse::().map_err(|err| err.to_string())) + .unwrap_or_else(|| { + Err(format!( + "failed to parse amount in {suffix} from {PROC_MEMINFO}" + )) + }) + .map(|kb| 1024 * kb) + } + + #[test] + fn parsing_works() { + let linux_correct = "MemTotal: 32446772 kB +MemFree: 27748552 kB +MemAvailable: 27718224 kB +Buffers: 22520 kB +Cached: 256296 kB +SwapCached: 23344 kB"; + + let (total, available) = get_values_from_meminfo(linux_correct).unwrap(); + assert_eq!(total, 32446772 * 1024); + assert_eq!(available, 27718224 * 1024); + + let possible = "MemFree: 27748552 kB +MemTotal: 32446772 kB +Buffers: 22520 kB +Cached: 256296 kB +MemAvailable: 27718224 kB +SwapCached: 23344 kB"; + + let (total, available) = get_values_from_meminfo(possible).unwrap(); + assert_eq!(total, 32446772 * 1024); + assert_eq!(available, 27718224 * 1024); + + let no_total = "MemFree: 27748552 kB +Buffers: 22520 kB +Cached: 256296 kB +MemAvailable: 27718224 kB +SwapCached: 23344 kB"; + get_values_from_meminfo(no_total).unwrap_err(); + + let not_kb = "MemTotal: 32446772 kB +MemFree: 27748552 kB +MemAvailable: 27718224 GB +Buffers: 22520 kB +Cached: 256296 kB +SwapCached: 23344 kB"; + + get_values_from_meminfo(not_kb).unwrap_err(); } #[test] @@ -138,7 +204,7 @@ mod mock_stats { use super::MemoryStats; thread_local! { - static STATS: Cell> = const { Cell::new(Err("not exists")) }; + static STATS: Cell> = Cell::new(Err("not exists")); } pub(super) fn get() -> Result { diff --git a/elfo-core/src/message.rs b/elfo-core/src/message.rs index 8416ca20..bfcab8ad 100644 --- a/elfo-core/src/message.rs +++ b/elfo-core/src/message.rs @@ -145,8 +145,8 @@ impl fmt::Debug for AnyMessage { } } -// `Serialize` / `Deserialize` impls for `AnyMessage` are not used when sending it by itself, -// only when it's used in other messages. +// `Serialize` / `Deserialize` impls for `AnyMessage` are not used when sending +// it by itself, only when it's used in other messages. impl Serialize for AnyMessage { fn serialize(&self, serializer: S) -> Result where