Skip to content

Commit

Permalink
Merge pull request #120 from White-Oak/oom-based-on-available-memory
Browse files Browse the repository at this point in the history
feat(core/init): use available memory as a trigger for OOM shutdown
  • Loading branch information
loyd authored Feb 1, 2024
2 parents fa81799 + bd4a564 commit 7f8d746
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 32 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!-- next-header -->

## [Unreleased] - ReleaseDate
### Changed
- core: changed OOM prevention trigger (that gracefully shutdowns the service) from old `used > total * ratio` to new `avail < total * (1 - ratio)`. ([#120])

[#120] - https://github.com/elfo-rs/elfo/pull/120

## [0.2.0-alpha.12] - 2023-12-21
### Changed
Expand Down
17 changes: 14 additions & 3 deletions elfo-core/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
context::Context,
demux::Demux,
errors::{RequestError, StartError, StartGroupError},
memory_tracker::MemoryCheckResult,
message,
messages::{StartEntrypoint, Terminate, UpdateConfig},
object::Object,
Expand Down Expand Up @@ -241,9 +242,19 @@ async fn termination(mut ctx: Context, topology: Topology) {
#[cfg(target_os = "linux")]
if envelope.is::<CheckMemoryUsageTick>() {
match memory_tracker.as_ref().map(|mt| mt.check()) {
Some(Ok(true)) | None => {}
Some(Ok(false)) => {
error!("maximum memory usage is reached, forcibly terminating");
Some(Ok(MemoryCheckResult::Passed)) | None => {}
Some(Ok(MemoryCheckResult::Failed(stats))) => {
let percents_of_total =
|x| ((x as f64) / (stats.total as f64) * 100.).round() as u64;
let used = percents_of_total(stats.used);
let available = percents_of_total(stats.available);
error!(
total = stats.total,
used_pct = used,
available_pct = available,
"maximum memory usage is reached, forcibly terminating"
);

let _ = ctx.try_send_to(ctx.addr(), TerminateSystem);
oom_prevented = true;
}
Expand Down
189 changes: 162 additions & 27 deletions elfo-core/src/memory_tracker.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,51 @@
//! Contains `MemoryTracker` that tracks memory usage of the process.

use derive_more::IsVariant;
use metrics::gauge;

/// Checks memory usage to prevent OOM.
pub(crate) struct MemoryTracker {
threshold: f64,
/// A threshold that available memory should not reach (in fractions of
/// total memory).
available_threshold: f64,
}

#[derive(Debug, Clone, Copy, IsVariant)]
pub(crate) enum MemoryCheckResult {
Passed,
Failed(MemoryStats),
}

impl MemoryTracker {
pub(crate) fn new(threshold: f64) -> Result<Self, String> {
let tracker = Self { threshold };
let tracker = Self {
available_threshold: 1. - threshold,
};
tracker.check()?;
Ok(tracker)
}

/// Returns `Ok(false)` if the threshold is reached.
pub(crate) fn check(&self) -> Result<bool, String> {
/// Returns `Ok(MemoryCheckResult::Failed(stats))` if available memory
/// amount is below a threshold. `stats` will include memory stats data
/// that triggered the fail.
pub(crate) fn check(&self) -> Result<MemoryCheckResult, String> {
let stats = get_stats()?;
let used_pct = stats.used as f64 / stats.total as f64;

gauge!("elfo_memory_usage_bytes", stats.used as f64);

Ok(used_pct < self.threshold)
let res = if (stats.available as f64) < stats.total as f64 * self.available_threshold {
MemoryCheckResult::Failed(stats)
} else {
MemoryCheckResult::Passed
};
Ok(res)
}
}

#[derive(Clone, Copy)]
struct MemoryStats {
total: usize,
used: usize,
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct MemoryStats {
pub(crate) total: usize,
pub(crate) used: usize,
pub(crate) available: usize,
}

#[cfg(test)]
Expand All @@ -41,21 +58,16 @@ mod proc_stats {

use super::MemoryStats;

const PROC_MEMINFO: &str = "/proc/meminfo";

pub(super) fn get() -> Result<MemoryStats, String> {
const PROC_SELF_STATM: &str = "/proc/self/statm";
const PROC_MEMINFO: &str = "/proc/meminfo";
const PAGE_SIZE: usize = 4096; // TODO: use `sysconf(_SC_PAGESIZE)`

// Get `total`.
let proc_meminfo = fs::read_to_string(PROC_MEMINFO)
.map_err(|err| format!("cannot read {PROC_MEMINFO}: {err}"))?;

let total = proc_meminfo
.split_ascii_whitespace()
.nth(1)
.and_then(|s| s.parse::<usize>().ok())
.ok_or_else(|| format!("cannot parse {PROC_MEMINFO}"))?
* 1024; // always in KiB
let (total, available) = get_values_from_meminfo(&proc_meminfo)?;

// Get `used`.
let proc_self_statm = fs::read_to_string(PROC_SELF_STATM)
Expand All @@ -68,15 +80,120 @@ mod proc_stats {
.ok_or_else(|| format!("cannot parse {PROC_SELF_STATM}"))?
* PAGE_SIZE;

Ok(MemoryStats { used, total })
Ok(MemoryStats {
total,
used,
available,
})
}

fn get_values_from_meminfo(proc_meminfo: &str) -> Result<(usize, usize), String> {
const MEM_TOTAL_NAME: &str = "MemTotal";
const MEM_AVAILABLE_NAME: &str = "MemAvailable";

let mut total = None;
let mut available = None;

for line in proc_meminfo.split('\n') {
let Some((name, suffix)) = line.split_once(':') else {
continue;
};

match name {
MEM_TOTAL_NAME => {
total = Some(
parse_size(suffix)
.map_err(|err| format!("failed to parse {MEM_TOTAL_NAME}: {err}"))?,
)
}
MEM_AVAILABLE_NAME => {
available =
Some(parse_size(suffix).map_err(|err| {
format!("failed to parse {MEM_AVAILABLE_NAME}: {err}")
})?)
}
_ => continue,
}

if total.is_some() && available.is_some() {
break;
}
}

let Some(total) = total else {
return Err(format!(
"failed to find {MEM_AVAILABLE_NAME} in {PROC_MEMINFO}"
));
};
let Some(available) = available else {
return Err(format!("failed to find {MEM_TOTAL_NAME} in {PROC_MEMINFO}"));
};

Ok((total, available))
}

fn parse_size(suffix: &str) -> Result<usize, String> {
suffix
.trim_start()
.strip_suffix(" kB")
.map(|n_str| n_str.parse::<usize>().map_err(|err| err.to_string()))
.unwrap_or_else(|| {
Err(format!(
"failed to parse amount in {suffix} from {PROC_MEMINFO}"
))
})
.map(|kb| 1024 * kb)
}

#[test]
fn parsing_works() {
let linux_correct = "MemTotal: 32446772 kB
MemFree: 27748552 kB
MemAvailable: 27718224 kB
Buffers: 22520 kB
Cached: 256296 kB
SwapCached: 23344 kB";

let (total, available) = get_values_from_meminfo(linux_correct).unwrap();
assert_eq!(total, 32446772 * 1024);
assert_eq!(available, 27718224 * 1024);

let possible = "MemFree: 27748552 kB
MemTotal: 32446772 kB
Buffers: 22520 kB
Cached: 256296 kB
MemAvailable: 27718224 kB
SwapCached: 23344 kB";

let (total, available) = get_values_from_meminfo(possible).unwrap();
assert_eq!(total, 32446772 * 1024);
assert_eq!(available, 27718224 * 1024);

let no_total = "MemFree: 27748552 kB
Buffers: 22520 kB
Cached: 256296 kB
MemAvailable: 27718224 kB
SwapCached: 23344 kB";
get_values_from_meminfo(no_total).unwrap_err();

let not_kb = "MemTotal: 32446772 kB
MemFree: 27748552 kB
MemAvailable: 27718224 GB
Buffers: 22520 kB
Cached: 256296 kB
SwapCached: 23344 kB";

get_values_from_meminfo(not_kb).unwrap_err();
}

#[test]
fn it_works() {
let stats = get().unwrap();
assert!(stats.total > 0);
assert!(stats.available > 0);
assert!(stats.used > 0);
assert!(stats.used < stats.total);
assert!(stats.available + stats.used <= stats.total);
}
}

Expand Down Expand Up @@ -105,20 +222,38 @@ fn it_works() {
mock_stats::set(Ok(MemoryStats {
total: 1000,
used: 100,
available: 900,
}));

let memory_tracker = MemoryTracker::new(0.5);
assert!(memory_tracker.as_ref().unwrap().check().unwrap());
assert!(memory_tracker
.as_ref()
.unwrap()
.check()
.unwrap()
.is_passed());

mock_stats::set(Ok(MemoryStats {
total: 1000,
used: 499,
used: 200,
available: 500,
}));
assert!(memory_tracker.as_ref().unwrap().check().unwrap());
assert!(memory_tracker
.as_ref()
.unwrap()
.check()
.unwrap()
.is_passed());

mock_stats::set(Ok(MemoryStats {
let expected_stats = MemoryStats {
total: 1000,
used: 500,
}));
assert!(!memory_tracker.as_ref().unwrap().check().unwrap());
used: 100,
available: 499,
};
mock_stats::set(Ok(expected_stats));
let res = memory_tracker.as_ref().unwrap().check().unwrap();
let MemoryCheckResult::Failed(stats) = res else {
panic!("memory check passed when should have failed");
};
assert_eq!(stats, expected_stats);
}
4 changes: 2 additions & 2 deletions elfo-core/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ impl fmt::Debug for AnyMessage {
}
}

// `Serialize` / `Deserialize` impls for `AnyMessage` are not used when sending it by itself,
// only when it's used in other messages.
// `Serialize` / `Deserialize` impls for `AnyMessage` are not used when sending
// it by itself, only when it's used in other messages.
impl Serialize for AnyMessage {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand Down

0 comments on commit 7f8d746

Please sign in to comment.