Skip to content

Commit

Permalink
Refactor communication code
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Ravedutti <[email protected]>
  • Loading branch information
rafaelravedutti committed Jul 14, 2020
1 parent 1a3c014 commit 8f752fc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 78 deletions.
116 changes: 38 additions & 78 deletions comm/comm.impala
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ fn mpi_initialize() -> () { mpi().init(); }
fn mpi_finalize() -> () { mpi().finalize(); }

// Types for condition and communication functions
type CondFunc = fn(Vector3D, fn(Vector3D, PBCFlags) -> ()) -> ();
type CommFunc = fn(i32, i32, i32, i32, i32, CondFunc, CondFunc) -> ();
type CondFn= fn(Vector3D, fn(Vector3D, PBCFlags) -> ()) -> ();
type CommFn= fn(i32, i32, i32, i32, i32, CondFn, CondFn) -> ();

// Communication pattern using walberla or 6-stencil neighbors
fn communication_ranks(grid: Grid, body: CommFunc) -> () {
fn communication_ranks(grid: Grid, body: CommFn) -> () {
let nbh = neighborhood;
let world_aabb = grid.world_aabb;
let spacing = grid.spacing;
Expand Down Expand Up @@ -132,23 +132,23 @@ fn communication_ranks(grid: Grid, body: CommFunc) -> () {
}
}

fn communication_local(me: i32, grid: Grid, body: CommFunc) -> () {
fn communication_local(me: i32, grid: Grid, body: CommFn) -> () {
communication_ranks(grid, |send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions| {
if send_rank == me {
body(send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions);
}
});
}

fn communication_remote(me: i32, grid: Grid, body: CommFunc) -> () {
fn communication_remote(me: i32, grid: Grid, body: CommFn) -> () {
communication_ranks(grid, |send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions| {
if send_rank != me {
body(send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions);
}
});
}

fn communication_sep(me: i32, grid: Grid, body: CommFunc) -> () {
fn communication_sep(me: i32, grid: Grid, body: CommFn) -> () {
@@communication_remote(me, grid, body);
@@communication_local(me, grid, body);
}
Expand All @@ -159,39 +159,50 @@ fn @pbc_corrected_position_if(@cond: bool, pos: Vector3D, grid: Grid) -> Vector3
} else {
let world_aabb = grid.world_aabb;
let mut corrected_pos = pos;

if pos.x < world_aabb.xmin { corrected_pos.x += grid.xlength; }
if pos.x > world_aabb.xmax { corrected_pos.x -= grid.xlength; }
if pos.y < world_aabb.ymin { corrected_pos.y += grid.ylength; }
if pos.y > world_aabb.ymax { corrected_pos.y -= grid.ylength; }
if pos.z < world_aabb.zmin { corrected_pos.z += grid.zlength; }
if pos.z > world_aabb.zmax { corrected_pos.z -= grid.zlength; }

corrected_pos
}
}

// Get world size
fn get_world_size() -> i32 {
let mpih = mpi();
let mut world_size: i32;
mpih.comm_size(mpih.comms.world, &mut world_size);
world_size
}

// Get process rank
fn get_process_rank() -> i32 {
// Initialize grid communication
fn alloc_comm(grid: Grid) -> Comm {
let mpih = mpi();
let mut rank: i32;
mpih.comm_rank(mpih.comms.world, &mut rank);
rank
}
let max_neighs = get_initial_maximum_neighbor_ranks();
let max_faces_dim = math.max(math.max(grid.nx * grid.ny, grid.nx * grid.nz), grid.ny * grid.nz);
let send_capacity = max_neighs * max_faces_dim * 20;
let recv_capacity = max_neighs * max_faces_dim * 20;
let null_buf = Buffer {
device: 0,
data: 0 as &[i8],
size: 0 as i64
};

// MPI barrier
fn barrier() -> () {
let mpih = mpi();
let mut request: MPI_Request;
mpih.barrier(mpih.comms.world, &mut request);
Comm {
me: get_process_rank(),
local_start: 0,
send_buffer: allocate_array(send_capacity, 7, sizeof[real_t](), true),
send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
send_offsets: allocate_array(send_capacity, 1, sizeof[i32](), true),
send_pbc: allocate_array(send_capacity, 3, sizeof[i8](), true),
copy_list: allocate_array(send_capacity, 1, sizeof[i32](), true),
send_capacity: send_capacity,
send_noffsets: 0,
recv_buffer: allocate_array(recv_capacity, 7, sizeof[real_t](), true),
recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
recv_capacity: recv_capacity,
recv_noffsets: 0,
max_neighs: max_neighs,
neighs: 0
}
}

// Release communication buffers
Expand Down Expand Up @@ -248,8 +259,6 @@ fn resize_recv_capacity(comm: &mut Comm, recv_capacity: i32) -> () {
comm.recv_capacity = recv_capacity;
}

// Get configuration for nodes according to world size and number of
// cells in each dimension
fn get_node_config(xlength: real_t, ylength: real_t, zlength: real_t, destx: &mut i32, desty: &mut i32, destz: &mut i32) -> () {
let mpih = mpi();
let areax = xlength * ylength;
Expand Down Expand Up @@ -330,46 +339,6 @@ fn get_node_bounding_box(aabb: AABB) -> AABB {
}
}

// Initialize grid communication
fn alloc_comm(grid: Grid) -> Comm {
let mpih = mpi();
let max_neighs = get_initial_maximum_neighbor_ranks();
let max_faces_dim = math.max(math.max(grid.nx * grid.ny, grid.nx * grid.nz), grid.ny * grid.nz);
let send_capacity = max_neighs * max_faces_dim * 20;
let recv_capacity = max_neighs * max_faces_dim * 20;
let null_buf = Buffer {
device: 0,
data: 0 as &[i8],
size: 0 as i64
};

let mut world_rank: i32;

mpih.comm_rank(mpih.comms.world, &mut world_rank);

Comm {
me: world_rank,
local_start: 0,
send_buffer: allocate_array(send_capacity, 7, sizeof[real_t](), true),
send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
send_offsets: allocate_array(send_capacity, 1, sizeof[i32](), true),
send_pbc: allocate_array(send_capacity, 3, sizeof[i8](), true),
copy_list: allocate_array(send_capacity, 1, sizeof[i32](), true),
send_capacity: send_capacity,
send_noffsets: 0,
recv_buffer: allocate_array(recv_capacity, 7, sizeof[real_t](), true),
recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
recv_capacity: recv_capacity,
recv_noffsets: 0,
max_neighs: max_neighs,
neighs: 0
}
}

// Synchronize ghost layer cells with neighbor ranks
fn synchronize_ghost_layer(grid: Grid, comm: Comm) -> () {
let mpih = mpi();
Expand Down Expand Up @@ -734,10 +703,7 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () {
transfer_array_to_host(comm.send_buffer);
}

// Requests buffers
let mut nreq = 0;

// Exchange sizes
communication_remote(comm.me, *grid, |send_rank, recv_rank, _, _, _, _, _| {
mpih.irecv(&mut recv_rank_lengths(nreq) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq);
mpih.isend(&send_rank_lengths(nreq) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq);
Expand All @@ -747,10 +713,8 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () {
mpih.wait_all(nreq);

range(0, nreq, |r| {
// Rank receive offset
recv_rank_offsets(r) = irecv;

// Readjust receive capacity if it is not enough
if irecv + recv_rank_lengths(r) >= comm.recv_capacity {
resize_recv_capacity(comm, (irecv + recv_rank_lengths(r)) * 2);
}
Expand All @@ -760,10 +724,7 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () {
});

nreq = 0;

// Synchronize borders with other ranks
communication_remote(comm.me, *grid, |send_rank, recv_rank, _, _, _, _, _| {
// Offsets to send and receive
let send_offset = send_rank_offsets(nreq) * 4;
let recv_offset = recv_rank_offsets(nreq) * 4;

Expand All @@ -775,7 +736,6 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () {
&get_array_real_ref(array_host, comm.send_buffer)(send_offset) as MPI_Buf,
send_rank_lengths(nreq) * 4, mpih.double_t, send_rank, 0, mpih.comms.world, nreq);

// Update ranks to send and receive during synchronization
bitcast[&mut[i32]](comm.send_neighbors.data)(nreq) = send_rank;
bitcast[&mut[i32]](comm.recv_neighbors.data)(nreq) = recv_rank;
nreq++;
Expand Down
20 changes: 20 additions & 0 deletions comm/mpi.impala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,26 @@ struct MPI {
finalize : fn() -> i32,
}

fn get_world_size() -> i32 {
let mpih = mpi();
let mut world_size: i32;
mpih.comm_size(mpih.comms.world, &mut world_size);
world_size
}

fn get_process_rank() -> i32 {
let mpih = mpi();
let mut rank: i32;
mpih.comm_rank(mpih.comms.world, &mut rank);
rank
}

fn barrier() -> () {
let mpih = mpi();
let mut request: MPI_Request;
mpih.barrier(mpih.comms.world, &mut request);
}

fn @mpi() -> MPI {
MPI {
comms : MPIComms {
Expand Down

0 comments on commit 8f752fc

Please sign in to comment.