diff --git a/comm/comm.impala b/comm/comm.impala index b5632c1..ce74e3f 100644 --- a/comm/comm.impala +++ b/comm/comm.impala @@ -11,7 +11,6 @@ struct CommOffsets { send_neighbors: Buffer, send_rank_offsets: Buffer, send_rank_lengths: Buffer, - send_requests: Buffer, send_offsets: Buffer, send_pbc: Buffer, copy_list: Buffer, @@ -23,7 +22,6 @@ struct CommOffsets { recv_neighbors: Buffer, recv_rank_offsets: Buffer, recv_rank_lengths: Buffer, - recv_requests: Buffer, recv_capacity: i32, recv_noffsets: i32, @@ -221,11 +219,9 @@ fn barrier() -> () { // Release communication buffers fn release_comm_offsets(comm_offsets: CommOffsets) -> () { release(comm_offsets.send_buffer); - release(comm_offsets.send_requests); release(comm_offsets.send_offsets); release(comm_offsets.send_pbc); release(comm_offsets.recv_buffer); - release(comm_offsets.recv_requests); release(comm_offsets.copy_list); if comm_offsets.send_capacity > 0 { @@ -252,11 +248,9 @@ fn resize_max_neighbors_capacity(comm_offsets: &mut CommOffsets, max_neighs: i32 reallocate_buffer(&mut comm_offsets.send_neighbors, max_neighs, sizeof[i32](), true, alloc_cpu); reallocate_buffer(&mut comm_offsets.send_rank_offsets, max_neighs, sizeof[i32](), true, alloc_cpu); reallocate_buffer(&mut comm_offsets.send_rank_lengths, max_neighs, sizeof[i32](), true, alloc_cpu); - reallocate_buffer(&mut comm_offsets.send_requests, max_neighs, sizeof[MPI_Request](), false, alloc_cpu); reallocate_buffer(&mut comm_offsets.recv_neighbors, max_neighs, sizeof[i32](), true, alloc_cpu); reallocate_buffer(&mut comm_offsets.recv_rank_offsets, max_neighs, sizeof[i32](), true, alloc_cpu); reallocate_buffer(&mut comm_offsets.recv_rank_lengths, max_neighs, sizeof[i32](), true, alloc_cpu); - reallocate_buffer(&mut comm_offsets.recv_requests, max_neighs, sizeof[MPI_Request](), false, alloc_cpu); comm_offsets.max_neighs = max_neighs; } @@ -405,7 +399,6 @@ fn alloc_comm_offsets(grid: Grid) -> CommOffsets { send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), - send_requests: alloc_cpu(max_neighs * sizeof[MPI_Request]()), send_offsets: alloc_cpu(send_capacity * sizeof[i32]()), send_pbc: alloc_cpu(send_capacity * 3 * sizeof[i8]()), copy_list: alloc_cpu(send_capacity * sizeof[i32]()), @@ -417,7 +410,6 @@ fn alloc_comm_offsets(grid: Grid) -> CommOffsets { recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()), recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()), recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()), - recv_requests: alloc_cpu(max_neighs * sizeof[MPI_Request]()), recv_capacity: recv_capacity, recv_noffsets: 0, @@ -449,8 +441,6 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () { let send_rank_lengths = get_array_of_i32(comm_offsets.send_rank_lengths); let recv_rank_lengths = get_array_of_i32(comm_offsets.recv_rank_lengths); let nlocal = comm_offsets.send_noffsets - comm_offsets.local_start; - let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data); - let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data); let mut nreq = 0; comm_buffer_iterate(comm_offsets, comm_offsets.send_noffsets, |index, send_data, _| { @@ -475,7 +465,7 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () { mpih.irecv( &recv_buffer.data(recv_offset) as MPI_MutBuf, recv_rank_lengths(neigh) * 3, - mpih.double_t, recv_neighbors(neigh), 0, mpih.comms.world, &mut recv_requests(nreq)); + mpih.double_t, recv_neighbors(neigh), 0, mpih.comms.world, nreq); nreq++; } @@ -489,14 +479,13 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () { mpih.isend( &send_buffer.data(send_offset) as MPI_Buf, send_rank_lengths(neigh) * 3, - mpih.double_t, send_neighbors(neigh), 0, mpih.comms.world, &mut send_requests(nreq)); + mpih.double_t, send_neighbors(neigh), 0, mpih.comms.world, nreq); nreq++; } }); - mpih.wait_all(nreq, recv_requests, mpih.status.ignore); - mpih.wait_all(nreq, send_requests, mpih.status.ignore); + mpih.wait_all(nreq); if comm_offsets.recv_noffsets > 0 { dev.transfer(comm_offsets.recv_buffer, comm_offsets.recv_buffer_accelerator); @@ -668,29 +657,20 @@ fn exchange_particles(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { neigh = 0; // Requests buffers - let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data); - let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data); let mut nreq = 0; // Exchange sizes communication_ranks(*grid, |send_rank, recv_rank, _, _, _, _, _| { if send_rank != comm_offsets.me { - mpih.irecv( - &mut exchg_rank_recv_lengths(neigh) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, - &mut recv_requests(nreq)); - - mpih.isend( - &exchg_rank_send_lengths(neigh) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, - &mut send_requests(nreq)); - + mpih.irecv(&mut exchg_rank_recv_lengths(neigh) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq); + mpih.isend(&exchg_rank_send_lengths(neigh) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq); nreq++; } neigh++; }); - mpih.wait_all(nreq, recv_requests, mpih.status.ignore); - mpih.wait_all(nreq, send_requests, mpih.status.ignore); + mpih.wait_all(nreq); range(0, nreq, |r| { // Readjust receive capacity if it is not enough @@ -713,11 +693,11 @@ fn exchange_particles(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { mpih.irecv( &comm_offsets.recv_buffer.data(exchg_recv_offset) as MPI_MutBuf, exchg_rank_recv_lengths(neigh) * 7, - mpih.double_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq)); + mpih.double_t, recv_rank, 0, mpih.comms.world, nreq); mpih.isend( &comm_offsets.send_buffer.data(exchg_send_offset) as MPI_Buf, exchg_rank_send_lengths(neigh) * 7, - mpih.double_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq)); + mpih.double_t, send_rank, 0, mpih.comms.world, nreq); nreq++; } @@ -725,8 +705,7 @@ fn exchange_particles(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { neigh++; }); - mpih.wait_all(nreq, recv_requests, mpih.status.ignore); - mpih.wait_all(nreq, send_requests, mpih.status.ignore); + mpih.wait_all(nreq); dev.transfer(comm_offsets.recv_buffer, comm_offsets.recv_buffer_accelerator); @@ -873,20 +852,17 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { } // Requests buffers - let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data); - let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data); let mut nreq = 0; // Exchange sizes communication_remote(comm_offsets.me, *grid, |send_rank, recv_rank, _, _, _, _, _| { let n = nreq; - mpih.irecv(&mut recv_rank_lengths(n) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq)); - mpih.isend(&send_rank_lengths(n) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq)); + mpih.irecv(&mut recv_rank_lengths(n) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq); + mpih.isend(&send_rank_lengths(n) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq); nreq++; }); - mpih.wait_all(nreq, recv_requests, mpih.status.ignore); - mpih.wait_all(nreq, send_requests, mpih.status.ignore); + mpih.wait_all(nreq); range(0, nreq, |r| { // Rank receive offset @@ -912,11 +888,11 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { mpih.irecv( &comm_offsets.recv_buffer.data(recv_offset) as MPI_MutBuf, recv_rank_lengths(n) * 4, - mpih.double_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq)); + mpih.double_t, recv_rank, 0, mpih.comms.world, nreq); mpih.isend( &comm_offsets.send_buffer.data(send_offset) as MPI_Buf, send_rank_lengths(n) * 4, - mpih.double_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq)); + mpih.double_t, send_rank, 0, mpih.comms.world, nreq); // Update ranks to send and receive during synchronization send_neighbors(n) = send_rank; @@ -925,8 +901,7 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { nreq++; }); - mpih.wait_all(nreq, recv_requests, mpih.status.ignore); - mpih.wait_all(nreq, send_requests, mpih.status.ignore); + mpih.wait_all(nreq); let const_grid = *grid; let const_comm_offsets = *comm_offsets; @@ -977,23 +952,23 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () { fn reduce_f64_sum(local_value: f64, global_value: &mut f64) -> () { let mpih = mpi(); let mut local = local_value; - mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.sum, mpih.comms.world); + mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.sum, mpih.comms.world); } fn reduce_f64_max(local_value: f64, global_value: &mut f64) -> () { let mpih = mpi(); let mut local = local_value; - mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.max, mpih.comms.world); + mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.max, mpih.comms.world); } fn reduce_i32_sum(local_value: i32, global_value: &mut i32) -> () { let mpih = mpi(); let mut local = local_value; - mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.int_t, mpih.ops.sum, mpih.comms.world); + mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.int_t, mpih.ops.sum, mpih.comms.world); } fn reduce_i64_sum(local_value: i64, global_value: &mut i64) -> () { let mpih = mpi(); let mut local = local_value; - mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.int64_t, mpih.ops.sum, mpih.comms.world); + mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.int64_t, mpih.ops.sum, mpih.comms.world); } diff --git a/comm/mpi.cpp b/comm/mpi.cpp index 783736a..c6c7894 100644 --- a/comm/mpi.cpp +++ b/comm/mpi.cpp @@ -71,4 +71,20 @@ void sync_ghost_layer_loop( } } +static MPI_Request send_reqs[128]; +static MPI_Request recv_reqs[128]; + +int MPI_Isend_ex(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, int req_id) { + return MPI_Isend(buf, count, datatype, dest, tag, comm, &send_reqs[req_id]); +} + +int MPI_Irecv_ex(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, int req_id) { + return MPI_Irecv(buf, count, datatype, source, tag, comm, &recv_reqs[req_id]); +} + +void MPI_Waitall_ex(int count) { + MPI_Waitall(count, recv_reqs, MPI_STATUSES_IGNORE); + MPI_Waitall(count, send_reqs, MPI_STATUSES_IGNORE); +} + } diff --git a/comm/mpi.impala b/comm/mpi.impala index efb2c48..59b8064 100644 --- a/comm/mpi.impala +++ b/comm/mpi.impala @@ -17,10 +17,10 @@ extern "C" { fn MPI_Allreduce(MPI_Buf, MPI_MutBuf, i32, MPI_Datatype, MPI_Op, MPI_Comm) -> i32; fn MPI_Send(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm) -> i32; fn MPI_Recv(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, MPI_Status) -> i32; - fn MPI_Isend(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32; - fn MPI_Irecv(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32; + fn MPI_Isend_ex(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32; + fn MPI_Irecv_ex(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32; fn MPI_Wait(&MPI_Request, MPI_Status) -> i32; - fn MPI_Waitall(i32, &[MPI_Request], MPI_Status) -> i32; + fn MPI_Waitall_ex(i32) -> i32; fn MPI_Probe(i32, i32, MPI_Comm, MPI_Status) -> i32; fn MPI_Get_count(MPI_Status, MPI_Datatype, &mut i32) -> i32; fn MPI_Gather(MPI_Buf, i32, MPI_Datatype, MPI_MutBuf, i32, MPI_Datatype, i32, MPI_Comm) -> i32; @@ -78,10 +78,10 @@ struct MPI { allreduce : fn(MPI_Buf, MPI_MutBuf, i32, MPI_Datatype, MPI_Op, MPI_Comm) -> i32, send : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm) -> i32, recv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &mut MPIStatus) -> i32, - isend : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32, - irecv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32, + isend : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32, + irecv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32, wait : fn(&MPI_Request, &mut MPIStatus) -> i32, - wait_all : fn(i32, &[MPI_Request], &mut MPIStatus) -> i32, + wait_all : fn(i32) -> i32, probe: fn(i32, i32, MPI_Comm, &mut MPIStatus) -> i32, get_count: fn(&mut MPIStatus, MPI_Datatype, &mut i32) -> i32, gather: fn(MPI_Buf, i32, MPI_Datatype, MPI_MutBuf, i32, MPI_Datatype, i32, MPI_Comm) -> i32, @@ -115,13 +115,13 @@ fn @mpi() -> MPI { recv : @|buf, count, datatype, source, tag, comm, status| { MPI_Recv(buf, count, datatype, source, tag, comm, status as MPI_Status) }, - isend : MPI_Isend, - irecv : MPI_Irecv, + isend : MPI_Isend_ex, + irecv : MPI_Irecv_ex, wait : @|request, status| { MPI_Wait(request, status as MPI_Status) }, - wait_all : @|count, requests, status| { - MPI_Waitall(count, requests, status as MPI_Status) + wait_all : @|count| { + MPI_Waitall_ex(count) }, probe: @|source, tag, comm, status| { MPI_Probe(source, tag, comm, status as MPI_Status)