-
Notifications
You must be signed in to change notification settings - Fork 9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Stack overflow on renoir's distributed environment #3
Comments
Hi, thank you for showing interest in Renoir, could you please share some code and instructions on how to replicate the issue you are facing? |
The "dataset" is just 100 1920x1280 images, nothing special. This is the relevant code: let config = RuntimeConfig::local(threads as u64 * 5).unwrap();
let env = StreamContext::new(config);
let result = env
.stream_par_iter(|this, total| {
all_images
.into_iter()
.skip(this as usize)
.step_by(total as usize)
})
.batch_mode(BatchMode::fixed(threads * 5))
.map(|mut image| {
filter::saturation(&mut image.0, 0.2).unwrap();
image
})
.map(|mut image| {
filter::emboss(&mut image.0).unwrap();
image
})
.map(|mut image| {
filter::gamma(&mut image.0, 2.0).unwrap();
image
})
.map(|mut image| {
filter::sharpen(&mut image.0).unwrap();
image
})
.map(|mut image| {
filter::grayscale(&mut image.0).unwrap();
image
})
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
let _results = result.get().unwrap(); We just apply a bunch of filters from the raster crate. I am currently comparing a bunch of parallel programming libraries/frameworks in Rust among each other for my Master's. When running locally, Renoir works pretty well, except with the above code, where it stacks overflows. When running non-locally, I've been having a lot more trouble. It takes forever to start, it's pretty slow even after it started and it's stack overflowing on pretty trivial code. One of my applications is a simple fluid simulation using Lattice Boltzmann's method. Here's the entire code: use super::common::*;
use renoir::prelude::*;
use serde::{Deserialize, Serialize};
#[repr(transparent)]
#[derive(Clone, Serialize, Deserialize)]
struct FArr(#[serde(with = "serde_arrays")] [[f32; NDIR]; YDIM]);
#[repr(transparent)]
#[derive(Clone, Serialize, Deserialize)]
struct UArr(#[serde(with = "serde_arrays")] [(f32, f32); YDIM]);
#[repr(transparent)]
#[derive(Clone, Serialize, Deserialize)]
struct RhoArr(#[serde(with = "serde_arrays")] [f32; YDIM]);
/// Note: in order to use the same trick we used in the other implementations of passing a
/// transmuted mutable reference, because `renoir` demands that its types implement `Serialize`, we
/// cannot send in raw pointers through the pipeline. The only way around this that I know of would
/// be to transform the pointer into an integer and then transform it back later.
pub fn latbol(f: F, u: U, rho: Rho, steps: usize, threads: usize) -> F {
let config = RuntimeConfig::local(threads as u64 * 4).unwrap();
let mut f: Vec<FArr> = unsafe { std::mem::transmute(f) };
let mut u: Vec<UArr> = unsafe { std::mem::transmute(u) };
let mut rho: Vec<RhoArr> = unsafe { std::mem::transmute(rho) };
for _ in 0..steps {
let env = StreamContext::new(config.clone());
let result = env
.stream_par_iter(|this, total| {
f.into_iter()
.zip(u.into_iter())
.zip(rho.into_iter())
.enumerate()
.skip(this as usize)
.step_by(total as usize)
})
.map(|(i, ((f, u), rho))| clean_boundaries((i, f.0, u.0, rho.0)))
.map(update_macro)
.map(collide)
.map(|(i, f, u, rho)| (i, FArr(f), UArr(u), RhoArr(rho)))
.collect_vec();
env.execute_blocking();
f = Vec::new();
u = Vec::new();
rho = Vec::new();
let mut elems = result.get().unwrap();
elems.sort_unstable_by_key(|(i, _, _, _)| *i);
for elem in elems {
f.push(elem.1);
u.push(elem.2);
rho.push(elem.3);
}
stream(&mut f);
}
unsafe { std::mem::transmute(f) }
}
type StreamElem = (usize, [[f32; NDIR]; YDIM], [(f32, f32); YDIM], [f32; YDIM]);
fn clean_boundaries(elem: StreamElem) -> StreamElem {
let (i, mut f, mut u, mut rho) = elem;
for j in 0..YDIM {
if contains(i, j) {
f[j] = [0.0; NDIR];
u[j] = (0.0, 0.0);
rho[j] = 0.0;
}
}
(i, f, u, rho)
}
fn update_macro(elem: StreamElem) -> StreamElem {
let (i, f, mut u, mut rho) = elem;
for j in 0..YDIM {
// density is the sum of the particle distribution within the lattice
let rho_sum: f32 = f[j].iter().sum();
// velocity of the lattice is the weighted sum of each 9 lattice sites
// incorporating their respective velocities
let x: f32 = f[j]
.iter()
.enumerate()
.map(|(k, s)| SITE_VECS[k].0 * s)
.sum();
let y: f32 = f[j]
.iter()
.enumerate()
.map(|(k, s)| SITE_VECS[k].1 * s)
.sum();
rho[j] = rho_sum;
u[j] = if rho_sum == 0.0 {
(0.0, 0.0)
} else {
(x / rho_sum, y / rho_sum)
};
}
(i, f, u, rho)
}
fn collide(elem: StreamElem) -> StreamElem {
let (i, mut f, u, rho) = elem;
for j in 0..YDIM {
for k in 0..NDIR {
let c = SITE_VECS[k];
let ui = u[j];
let p1 = 3.0 * (c.0 * ui.0 + c.1 * ui.1);
// p2 = 3 / 2 * c.dot(u) ^ 2
let p2 = p1.powi(2) / (3.0 * 2.0);
let p3 = (9.0 / 2.0) * (ui.0 * ui.0 + ui.1 * ui.1);
// equilibrium state lattice point
let f_eq_ijk = rho[j] * WEIGHTS[k] * (1.0 + p1 + p2 - p3);
// update the lattice site using the LBM step
f[j][k] += -(f[j][k] - f_eq_ijk) / TAU;
}
}
(i, f, u, rho)
}
fn stream(f: &mut Vec<FArr>) {
let mut f_new = vec![FArr([[0.0; NDIR]; YDIM]); XDIM];
for k in 0..NDIR {
let (vx, vy) = SITE_VECS[k];
for i in 0..XDIM {
for j in 0..YDIM {
// ignore processing points inside of boundaries,
// where fluid is unable to move (or doesn't exist)
if contains(i, j) {
continue;
}
let i_new = i as f32 + vx;
let j_new = j as f32 + vy;
// in-bounds safety pass.
let i_new = i_new.rem_euclid(XDIM as _) as usize;
let j_new = j_new.rem_euclid(YDIM as _) as usize;
// use particle bounce-back when computing streaming operations
// that collide with a boundary.
// this involves reverse the vector within the current lattice (i,j),
// rather than propagating to the point within the boundary (i_new, j_new).
if contains(i_new, j_new) {
f_new[i].0[j][SITE_REV[k]] = f[i].0[j][k];
continue;
}
// normal streaming update
f_new[i_new].0[j_new][k] = f[i].0[j][k];
}
}
}
*f = f_new;
} There is the issue that I have one idea: the line EDIT: if you want to try reproducing it, I can also share the initialization code, that I have omitted above. EDIT2: the above lattice boltman code also stacks overflows when running locally. |
Hi, I gave a quick look at the code and I will first address the second one My recommendation would be to instead use a dynamically allocated structure instead of stack arrays, if you don't feel like adding new dependencies this could be done using std::Vec, however if I were to implement that myself I would probably use something like ndarray which has an optimized layout for ndimensional matrices and also has integrated support for serde. Concerning the first one I think having the initialization code would probably help with diagnosing the issue as I cannot see an immediate issue. Still, concerning stack overflow issues, I find most of the times they are related to deeply nested function calls with a large stack frame or recursion problems, I checked the operator code of the operators you are using and I see no possible bad recursion candidates, so having the code may help me discover the cause of the problems. Greetings! |
Thank you for taking the time to look into this! The full code for the first case looks like this: use raster::filter;
use serde::{de::Visitor, ser::SerializeStruct, Deserialize, Serialize};
use std::time::SystemTime;
use renoir::prelude::*;
#[derive(Debug, Clone)]
#[repr(transparent)]
struct Image(raster::Image);
impl Serialize for Image {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let raster::Image {
width,
height,
bytes,
} = &self.0;
let mut state = serializer.serialize_struct("Image", 3)?;
state.serialize_field("width", width)?;
state.serialize_field("height", height)?;
state.serialize_field("bytes", bytes)?;
state.end()
}
}
impl<'de> Deserialize<'de> for Image {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct ImageVisitor;
impl<'de> Visitor<'de> for ImageVisitor {
type Value = Image;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("struct Image")
}
fn visit_seq<A>(self, mut seq: A) -> Result<Self::Value, A::Error>
where
A: serde::de::SeqAccess<'de>,
{
let width = seq.next_element()?.unwrap();
let height = seq.next_element()?.unwrap();
let bytes = seq.next_element()?.unwrap();
Ok(Image(raster::Image {
width,
height,
bytes,
}))
}
}
deserializer.deserialize_struct("Image", &["width", "height", "bytes"], ImageVisitor)
}
}
pub fn renoir(dir_name: &str, threads: usize) {
let dir_entries = std::fs::read_dir(dir_name);
let mut all_images: Vec<Image> = Vec::new();
for entry in dir_entries.unwrap() {
let entry = entry.unwrap();
let path = entry.path();
if path.extension().is_none() {
continue;
}
all_images.push(Image(raster::open(path.to_str().unwrap()).unwrap()));
}
let start = SystemTime::now();
let config = RuntimeConfig::local(threads as u64 * 5).unwrap();
let env = StreamContext::new(config);
let result = env
.stream_par_iter(|this, total| {
all_images
.into_iter()
.skip(this as usize)
.step_by(total as usize)
})
.batch_mode(BatchMode::fixed(threads * 5))
.map(|mut image| {
filter::saturation(&mut image.0, 0.2).unwrap();
image
})
.map(|mut image| {
filter::emboss(&mut image.0).unwrap();
image
})
.map(|mut image| {
filter::gamma(&mut image.0, 2.0).unwrap();
image
})
.map(|mut image| {
filter::sharpen(&mut image.0).unwrap();
image
})
.map(|mut image| {
filter::grayscale(&mut image.0).unwrap();
image
})
.collect_vec();
env.execute_blocking(); // Start execution (blocking)
let _results = result.get().unwrap();
let system_duration = start.elapsed().unwrap();
let in_sec = system_duration.as_secs_f64();
println!("{in_sec:.4}");
} You need to add I will try your suggestion of using |
UPDATE: ah, my apologies. With the I am currently in a bit of a hurry running experiments in like 3 machines at once, so I got a little confused. I apologize. |
I haven't had time to look at it during the weekend, so as of now is there any other problem you are encountering with the system? Concerning the memory issues, by default Renoir uses a batching strategy so multiple inputs are kept in memory, if you are dealing with large elements and have RAM constraints you can try lowering the batch size using the Let me know of any further issues you incounter, best regards. |
I tried using |
Hello, I am trying to use
renoir
in a distributed environement, with 4 machines.I've configured it to fire 1 process per machine.
However, after a while, all threads exit saying they have overflowed their stack.
This does not happen with an equivalent MPI code, nor with a different programming abstraction I am currently developing. My goal was to compare
renoir
to just using raw MPI or using my own abstraction. However, as things currently stand, I simply cannot executerenoir
to make this comparison.I am currently using
renoir
s latest published version incrates.io
EDIT: I also get a stack overflow when executing locally, if I use a large enough dataset. Note that, once again, neither raw MPI nor my own abstraction get a stack overflow, so it must be a problem with
renoir
.The text was updated successfully, but these errors were encountered: