From 8f752fcc019579a2f7d4f787f9feb5749cb027c6 Mon Sep 17 00:00:00 2001 From: Rafael Ravedutti Date: Tue, 14 Jul 2020 17:47:29 +0200 Subject: [PATCH] Refactor communication code Signed-off-by: Rafael Ravedutti --- comm/comm.impala | 116 ++++++++++++++++------------------------------- comm/mpi.impala | 20 ++++++++ 2 files changed, 58 insertions(+), 78 deletions(-) diff --git a/comm/comm.impala b/comm/comm.impala index 1007327..08ae2c9 100644 --- a/comm/comm.impala +++ b/comm/comm.impala @@ -40,11 +40,11 @@ fn mpi_initialize() -> () { mpi().init(); } fn mpi_finalize() -> () { mpi().finalize(); } // Types for condition and communication functions -type CondFunc = fn(Vector3D, fn(Vector3D, PBCFlags) -> ()) -> (); -type CommFunc = fn(i32, i32, i32, i32, i32, CondFunc, CondFunc) -> (); +type CondFn= fn(Vector3D, fn(Vector3D, PBCFlags) -> ()) -> (); +type CommFn= fn(i32, i32, i32, i32, i32, CondFn, CondFn) -> (); // Communication pattern using walberla or 6-stencil neighbors -fn communication_ranks(grid: Grid, body: CommFunc) -> () { +fn communication_ranks(grid: Grid, body: CommFn) -> () { let nbh = neighborhood; let world_aabb = grid.world_aabb; let spacing = grid.spacing; @@ -132,7 +132,7 @@ fn communication_ranks(grid: Grid, body: CommFunc) -> () { } } -fn communication_local(me: i32, grid: Grid, body: CommFunc) -> () { +fn communication_local(me: i32, grid: Grid, body: CommFn) -> () { communication_ranks(grid, |send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions| { if send_rank == me { body(send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions); @@ -140,7 +140,7 @@ fn communication_local(me: i32, grid: Grid, body: CommFunc) -> () { }); } -fn communication_remote(me: i32, grid: Grid, body: CommFunc) -> () { +fn communication_remote(me: i32, grid: Grid, body: CommFn) -> () { communication_ranks(grid, |send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions| { if send_rank != me { body(send_rank, recv_rank, pbc_x, pbc_y, pbc_z, border_positions, exchange_positions); @@ -148,7 +148,7 @@ fn communication_remote(me: i32, grid: Grid, body: CommFunc) -> () { }); } -fn communication_sep(me: i32, grid: Grid, body: CommFunc) -> () { +fn communication_sep(me: i32, grid: Grid, body: CommFn) -> () { @@communication_remote(me, grid, body); @@communication_local(me, grid, body); } @@ -159,39 +159,50 @@ fn @pbc_corrected_position_if(@cond: bool, pos: Vector3D, grid: Grid) -> Vector3 } else { let world_aabb = grid.world_aabb; let mut corrected_pos = pos; - if pos.x < world_aabb.xmin { corrected_pos.x += grid.xlength; } if pos.x > world_aabb.xmax { corrected_pos.x -= grid.xlength; } if pos.y < world_aabb.ymin { corrected_pos.y += grid.ylength; } if pos.y > world_aabb.ymax { corrected_pos.y -= grid.ylength; } if pos.z < world_aabb.zmin { corrected_pos.z += grid.zlength; } if pos.z > world_aabb.zmax { corrected_pos.z -= grid.zlength; } - corrected_pos } } -// Get world size -fn get_world_size() -> i32 { - let mpih = mpi(); - let mut world_size: i32; - mpih.comm_size(mpih.comms.world, &mut world_size); - world_size -} - -// Get process rank -fn get_process_rank() -> i32 { +// Initialize grid communication +fn alloc_comm(grid: Grid) -> Comm { let mpih = mpi(); - let mut rank: i32; - mpih.comm_rank(mpih.comms.world, &mut rank); - rank -} + let max_neighs = get_initial_maximum_neighbor_ranks(); + let max_faces_dim = math.max(math.max(grid.nx * grid.ny, grid.nx * grid.nz), grid.ny * grid.nz); + let send_capacity = max_neighs * max_faces_dim * 20; + let recv_capacity = max_neighs * max_faces_dim * 20; + let null_buf = Buffer { + device: 0, + data: 0 as &[i8], + size: 0 as i64 + }; -// MPI barrier -fn barrier() -> () { - let mpih = mpi(); - let mut request: MPI_Request; - mpih.barrier(mpih.comms.world, &mut request); + Comm { + me: get_process_rank(), + local_start: 0, + send_buffer: allocate_array(send_capacity, 7, sizeof[real_t](), true), + send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), + send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), + send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), + send_offsets: allocate_array(send_capacity, 1, sizeof[i32](), true), + send_pbc: allocate_array(send_capacity, 3, sizeof[i8](), true), + copy_list: allocate_array(send_capacity, 1, sizeof[i32](), true), + send_capacity: send_capacity, + send_noffsets: 0, + recv_buffer: allocate_array(recv_capacity, 7, sizeof[real_t](), true), + recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), + recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), + recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), + recv_capacity: recv_capacity, + recv_noffsets: 0, + max_neighs: max_neighs, + neighs: 0 + } } // Release communication buffers @@ -248,8 +259,6 @@ fn resize_recv_capacity(comm: &mut Comm, recv_capacity: i32) -> () { comm.recv_capacity = recv_capacity; } -// Get configuration for nodes according to world size and number of -// cells in each dimension fn get_node_config(xlength: real_t, ylength: real_t, zlength: real_t, destx: &mut i32, desty: &mut i32, destz: &mut i32) -> () { let mpih = mpi(); let areax = xlength * ylength; @@ -330,46 +339,6 @@ fn get_node_bounding_box(aabb: AABB) -> AABB { } } -// Initialize grid communication -fn alloc_comm(grid: Grid) -> Comm { - let mpih = mpi(); - let max_neighs = get_initial_maximum_neighbor_ranks(); - let max_faces_dim = math.max(math.max(grid.nx * grid.ny, grid.nx * grid.nz), grid.ny * grid.nz); - let send_capacity = max_neighs * max_faces_dim * 20; - let recv_capacity = max_neighs * max_faces_dim * 20; - let null_buf = Buffer { - device: 0, - data: 0 as &[i8], - size: 0 as i64 - }; - - let mut world_rank: i32; - - mpih.comm_rank(mpih.comms.world, &mut world_rank); - - Comm { - me: world_rank, - local_start: 0, - send_buffer: allocate_array(send_capacity, 7, sizeof[real_t](), true), - send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), - send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), - send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), - send_offsets: allocate_array(send_capacity, 1, sizeof[i32](), true), - send_pbc: allocate_array(send_capacity, 3, sizeof[i8](), true), - copy_list: allocate_array(send_capacity, 1, sizeof[i32](), true), - send_capacity: send_capacity, - send_noffsets: 0, - recv_buffer: allocate_array(recv_capacity, 7, sizeof[real_t](), true), - recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), - recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), - recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), - recv_capacity: recv_capacity, - recv_noffsets: 0, - max_neighs: max_neighs, - neighs: 0 - } -} - // Synchronize ghost layer cells with neighbor ranks fn synchronize_ghost_layer(grid: Grid, comm: Comm) -> () { let mpih = mpi(); @@ -734,10 +703,7 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () { transfer_array_to_host(comm.send_buffer); } - // Requests buffers let mut nreq = 0; - - // Exchange sizes communication_remote(comm.me, *grid, |send_rank, recv_rank, _, _, _, _, _| { mpih.irecv(&mut recv_rank_lengths(nreq) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq); mpih.isend(&send_rank_lengths(nreq) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq); @@ -747,10 +713,8 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () { mpih.wait_all(nreq); range(0, nreq, |r| { - // Rank receive offset recv_rank_offsets(r) = irecv; - // Readjust receive capacity if it is not enough if irecv + recv_rank_lengths(r) >= comm.recv_capacity { resize_recv_capacity(comm, (irecv + recv_rank_lengths(r)) * 2); } @@ -760,10 +724,7 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () { }); nreq = 0; - - // Synchronize borders with other ranks communication_remote(comm.me, *grid, |send_rank, recv_rank, _, _, _, _, _| { - // Offsets to send and receive let send_offset = send_rank_offsets(nreq) * 4; let recv_offset = recv_rank_offsets(nreq) * 4; @@ -775,7 +736,6 @@ fn borders(grid: &mut Grid, comm: &mut Comm) -> () { &get_array_real_ref(array_host, comm.send_buffer)(send_offset) as MPI_Buf, send_rank_lengths(nreq) * 4, mpih.double_t, send_rank, 0, mpih.comms.world, nreq); - // Update ranks to send and receive during synchronization bitcast[&mut[i32]](comm.send_neighbors.data)(nreq) = send_rank; bitcast[&mut[i32]](comm.recv_neighbors.data)(nreq) = recv_rank; nreq++; diff --git a/comm/mpi.impala b/comm/mpi.impala index 59b8064..04b8e46 100644 --- a/comm/mpi.impala +++ b/comm/mpi.impala @@ -90,6 +90,26 @@ struct MPI { finalize : fn() -> i32, } +fn get_world_size() -> i32 { + let mpih = mpi(); + let mut world_size: i32; + mpih.comm_size(mpih.comms.world, &mut world_size); + world_size +} + +fn get_process_rank() -> i32 { + let mpih = mpi(); + let mut rank: i32; + mpih.comm_rank(mpih.comms.world, &mut rank); + rank +} + +fn barrier() -> () { + let mpih = mpi(); + let mut request: MPI_Request; + mpih.barrier(mpih.comms.world, &mut request); +} + fn @mpi() -> MPI { MPI { comms : MPIComms {