Skip to content

Commit

Permalink
Treat requests in C++ code
Browse files Browse the repository at this point in the history
Signed-off-by: Rafael Ravedutti <[email protected]>
  • Loading branch information
rafaelravedutti committed Jul 2, 2020
1 parent 7ebe891 commit ea52704
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 54 deletions.
63 changes: 19 additions & 44 deletions comm/comm.impala
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ struct CommOffsets {
send_neighbors: Buffer,
send_rank_offsets: Buffer,
send_rank_lengths: Buffer,
send_requests: Buffer,
send_offsets: Buffer,
send_pbc: Buffer,
copy_list: Buffer,
Expand All @@ -23,7 +22,6 @@ struct CommOffsets {
recv_neighbors: Buffer,
recv_rank_offsets: Buffer,
recv_rank_lengths: Buffer,
recv_requests: Buffer,
recv_capacity: i32,
recv_noffsets: i32,

Expand Down Expand Up @@ -221,11 +219,9 @@ fn barrier() -> () {
// Release communication buffers
fn release_comm_offsets(comm_offsets: CommOffsets) -> () {
release(comm_offsets.send_buffer);
release(comm_offsets.send_requests);
release(comm_offsets.send_offsets);
release(comm_offsets.send_pbc);
release(comm_offsets.recv_buffer);
release(comm_offsets.recv_requests);
release(comm_offsets.copy_list);

if comm_offsets.send_capacity > 0 {
Expand All @@ -252,11 +248,9 @@ fn resize_max_neighbors_capacity(comm_offsets: &mut CommOffsets, max_neighs: i32
reallocate_buffer(&mut comm_offsets.send_neighbors, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.send_rank_offsets, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.send_rank_lengths, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.send_requests, max_neighs, sizeof[MPI_Request](), false, alloc_cpu);
reallocate_buffer(&mut comm_offsets.recv_neighbors, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.recv_rank_offsets, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.recv_rank_lengths, max_neighs, sizeof[i32](), true, alloc_cpu);
reallocate_buffer(&mut comm_offsets.recv_requests, max_neighs, sizeof[MPI_Request](), false, alloc_cpu);

comm_offsets.max_neighs = max_neighs;
}
Expand Down Expand Up @@ -405,7 +399,6 @@ fn alloc_comm_offsets(grid: Grid) -> CommOffsets {
send_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
send_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
send_requests: alloc_cpu(max_neighs * sizeof[MPI_Request]()),
send_offsets: alloc_cpu(send_capacity * sizeof[i32]()),
send_pbc: alloc_cpu(send_capacity * 3 * sizeof[i8]()),
copy_list: alloc_cpu(send_capacity * sizeof[i32]()),
Expand All @@ -417,7 +410,6 @@ fn alloc_comm_offsets(grid: Grid) -> CommOffsets {
recv_neighbors: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_offsets: alloc_cpu(max_neighs * sizeof[i32]()),
recv_rank_lengths: alloc_cpu(max_neighs * sizeof[i32]()),
recv_requests: alloc_cpu(max_neighs * sizeof[MPI_Request]()),
recv_capacity: recv_capacity,
recv_noffsets: 0,

Expand Down Expand Up @@ -449,8 +441,6 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () {
let send_rank_lengths = get_array_of_i32(comm_offsets.send_rank_lengths);
let recv_rank_lengths = get_array_of_i32(comm_offsets.recv_rank_lengths);
let nlocal = comm_offsets.send_noffsets - comm_offsets.local_start;
let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data);
let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data);
let mut nreq = 0;

comm_buffer_iterate(comm_offsets, comm_offsets.send_noffsets, |index, send_data, _| {
Expand All @@ -475,7 +465,7 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () {

mpih.irecv(
&recv_buffer.data(recv_offset) as MPI_MutBuf, recv_rank_lengths(neigh) * 3,
mpih.double_t, recv_neighbors(neigh), 0, mpih.comms.world, &mut recv_requests(nreq));
mpih.double_t, recv_neighbors(neigh), 0, mpih.comms.world, nreq);

nreq++;
}
Expand All @@ -489,14 +479,13 @@ fn synchronize_ghost_layer(grid: Grid, comm_offsets: CommOffsets) -> () {

mpih.isend(
&send_buffer.data(send_offset) as MPI_Buf, send_rank_lengths(neigh) * 3,
mpih.double_t, send_neighbors(neigh), 0, mpih.comms.world, &mut send_requests(nreq));
mpih.double_t, send_neighbors(neigh), 0, mpih.comms.world, nreq);

nreq++;
}
});

mpih.wait_all(nreq, recv_requests, mpih.status.ignore);
mpih.wait_all(nreq, send_requests, mpih.status.ignore);
mpih.wait_all(nreq);

if comm_offsets.recv_noffsets > 0 {
dev.transfer(comm_offsets.recv_buffer, comm_offsets.recv_buffer_accelerator);
Expand Down Expand Up @@ -668,29 +657,20 @@ fn exchange_particles(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {
neigh = 0;

// Requests buffers
let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data);
let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data);
let mut nreq = 0;

// Exchange sizes
communication_ranks(*grid, |send_rank, recv_rank, _, _, _, _, _| {
if send_rank != comm_offsets.me {
mpih.irecv(
&mut exchg_rank_recv_lengths(neigh) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world,
&mut recv_requests(nreq));

mpih.isend(
&exchg_rank_send_lengths(neigh) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world,
&mut send_requests(nreq));

mpih.irecv(&mut exchg_rank_recv_lengths(neigh) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq);
mpih.isend(&exchg_rank_send_lengths(neigh) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq);
nreq++;
}

neigh++;
});

mpih.wait_all(nreq, recv_requests, mpih.status.ignore);
mpih.wait_all(nreq, send_requests, mpih.status.ignore);
mpih.wait_all(nreq);

range(0, nreq, |r| {
// Readjust receive capacity if it is not enough
Expand All @@ -713,20 +693,19 @@ fn exchange_particles(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {

mpih.irecv(
&comm_offsets.recv_buffer.data(exchg_recv_offset) as MPI_MutBuf, exchg_rank_recv_lengths(neigh) * 7,
mpih.double_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq));
mpih.double_t, recv_rank, 0, mpih.comms.world, nreq);

mpih.isend(
&comm_offsets.send_buffer.data(exchg_send_offset) as MPI_Buf, exchg_rank_send_lengths(neigh) * 7,
mpih.double_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq));
mpih.double_t, send_rank, 0, mpih.comms.world, nreq);

nreq++;
}

neigh++;
});

mpih.wait_all(nreq, recv_requests, mpih.status.ignore);
mpih.wait_all(nreq, send_requests, mpih.status.ignore);
mpih.wait_all(nreq);

dev.transfer(comm_offsets.recv_buffer, comm_offsets.recv_buffer_accelerator);

Expand Down Expand Up @@ -873,20 +852,17 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {
}

// Requests buffers
let send_requests = bitcast[&mut[MPI_Request]](comm_offsets.send_requests.data);
let recv_requests = bitcast[&mut[MPI_Request]](comm_offsets.recv_requests.data);
let mut nreq = 0;

// Exchange sizes
communication_remote(comm_offsets.me, *grid, |send_rank, recv_rank, _, _, _, _, _| {
let n = nreq;
mpih.irecv(&mut recv_rank_lengths(n) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq));
mpih.isend(&send_rank_lengths(n) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq));
mpih.irecv(&mut recv_rank_lengths(n) as MPI_MutBuf, 1, mpih.int_t, recv_rank, 0, mpih.comms.world, nreq);
mpih.isend(&send_rank_lengths(n) as MPI_Buf, 1, mpih.int_t, send_rank, 0, mpih.comms.world, nreq);
nreq++;
});

mpih.wait_all(nreq, recv_requests, mpih.status.ignore);
mpih.wait_all(nreq, send_requests, mpih.status.ignore);
mpih.wait_all(nreq);

range(0, nreq, |r| {
// Rank receive offset
Expand All @@ -912,11 +888,11 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {

mpih.irecv(
&comm_offsets.recv_buffer.data(recv_offset) as MPI_MutBuf, recv_rank_lengths(n) * 4,
mpih.double_t, recv_rank, 0, mpih.comms.world, &mut recv_requests(nreq));
mpih.double_t, recv_rank, 0, mpih.comms.world, nreq);

mpih.isend(
&comm_offsets.send_buffer.data(send_offset) as MPI_Buf, send_rank_lengths(n) * 4,
mpih.double_t, send_rank, 0, mpih.comms.world, &mut send_requests(nreq));
mpih.double_t, send_rank, 0, mpih.comms.world, nreq);

// Update ranks to send and receive during synchronization
send_neighbors(n) = send_rank;
Expand All @@ -925,8 +901,7 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {
nreq++;
});

mpih.wait_all(nreq, recv_requests, mpih.status.ignore);
mpih.wait_all(nreq, send_requests, mpih.status.ignore);
mpih.wait_all(nreq);

let const_grid = *grid;
let const_comm_offsets = *comm_offsets;
Expand Down Expand Up @@ -977,23 +952,23 @@ fn borders(grid: &mut Grid, comm_offsets: &mut CommOffsets) -> () {
fn reduce_f64_sum(local_value: f64, global_value: &mut f64) -> () {
let mpih = mpi();
let mut local = local_value;
mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.sum, mpih.comms.world);
mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.sum, mpih.comms.world);
}

fn reduce_f64_max(local_value: f64, global_value: &mut f64) -> () {
let mpih = mpi();
let mut local = local_value;
mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.max, mpih.comms.world);
mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.double_t, mpih.ops.max, mpih.comms.world);
}

fn reduce_i32_sum(local_value: i32, global_value: &mut i32) -> () {
let mpih = mpi();
let mut local = local_value;
mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.int_t, mpih.ops.sum, mpih.comms.world);
mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.int_t, mpih.ops.sum, mpih.comms.world);
}

fn reduce_i64_sum(local_value: i64, global_value: &mut i64) -> () {
let mpih = mpi();
let mut local = local_value;
mpih.allreduce(&mut local as MPI_MutBuf, global_value as MPI_MutBuf, 1, mpih.int64_t, mpih.ops.sum, mpih.comms.world);
mpih.allreduce(&mut local as MPI_Buf, global_value as MPI_MutBuf, 1, mpih.int64_t, mpih.ops.sum, mpih.comms.world);
}
16 changes: 16 additions & 0 deletions comm/mpi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,20 @@ void sync_ghost_layer_loop(
}
}

static MPI_Request send_reqs[128];
static MPI_Request recv_reqs[128];

int MPI_Isend_ex(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, int req_id) {
return MPI_Isend(buf, count, datatype, dest, tag, comm, &send_reqs[req_id]);
}

int MPI_Irecv_ex(void *buf, int count, MPI_Datatype datatype, int source, int tag, MPI_Comm comm, int req_id) {
return MPI_Irecv(buf, count, datatype, source, tag, comm, &recv_reqs[req_id]);
}

void MPI_Waitall_ex(int count) {
MPI_Waitall(count, recv_reqs, MPI_STATUSES_IGNORE);
MPI_Waitall(count, send_reqs, MPI_STATUSES_IGNORE);
}

}
20 changes: 10 additions & 10 deletions comm/mpi.impala
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ extern "C" {
fn MPI_Allreduce(MPI_Buf, MPI_MutBuf, i32, MPI_Datatype, MPI_Op, MPI_Comm) -> i32;
fn MPI_Send(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm) -> i32;
fn MPI_Recv(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, MPI_Status) -> i32;
fn MPI_Isend(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32;
fn MPI_Irecv(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32;
fn MPI_Isend_ex(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32;
fn MPI_Irecv_ex(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32;
fn MPI_Wait(&MPI_Request, MPI_Status) -> i32;
fn MPI_Waitall(i32, &[MPI_Request], MPI_Status) -> i32;
fn MPI_Waitall_ex(i32) -> i32;
fn MPI_Probe(i32, i32, MPI_Comm, MPI_Status) -> i32;
fn MPI_Get_count(MPI_Status, MPI_Datatype, &mut i32) -> i32;
fn MPI_Gather(MPI_Buf, i32, MPI_Datatype, MPI_MutBuf, i32, MPI_Datatype, i32, MPI_Comm) -> i32;
Expand Down Expand Up @@ -78,10 +78,10 @@ struct MPI {
allreduce : fn(MPI_Buf, MPI_MutBuf, i32, MPI_Datatype, MPI_Op, MPI_Comm) -> i32,
send : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm) -> i32,
recv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &mut MPIStatus) -> i32,
isend : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32,
irecv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, &MPI_Request) -> i32,
isend : fn(MPI_Buf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32,
irecv : fn(MPI_MutBuf, i32, MPI_Datatype, i32, i32, MPI_Comm, i32) -> i32,
wait : fn(&MPI_Request, &mut MPIStatus) -> i32,
wait_all : fn(i32, &[MPI_Request], &mut MPIStatus) -> i32,
wait_all : fn(i32) -> i32,
probe: fn(i32, i32, MPI_Comm, &mut MPIStatus) -> i32,
get_count: fn(&mut MPIStatus, MPI_Datatype, &mut i32) -> i32,
gather: fn(MPI_Buf, i32, MPI_Datatype, MPI_MutBuf, i32, MPI_Datatype, i32, MPI_Comm) -> i32,
Expand Down Expand Up @@ -115,13 +115,13 @@ fn @mpi() -> MPI {
recv : @|buf, count, datatype, source, tag, comm, status| {
MPI_Recv(buf, count, datatype, source, tag, comm, status as MPI_Status)
},
isend : MPI_Isend,
irecv : MPI_Irecv,
isend : MPI_Isend_ex,
irecv : MPI_Irecv_ex,
wait : @|request, status| {
MPI_Wait(request, status as MPI_Status)
},
wait_all : @|count, requests, status| {
MPI_Waitall(count, requests, status as MPI_Status)
wait_all : @|count| {
MPI_Waitall_ex(count)
},
probe: @|source, tag, comm, status| {
MPI_Probe(source, tag, comm, status as MPI_Status)
Expand Down

0 comments on commit ea52704

Please sign in to comment.