diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index eb96f32bdd..12d8f7c27a 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -89,6 +89,16 @@ int sst_fi_mr_reg(struct fid_domain *domain, const void *buf, size_t len, return res; } +int guard_fi_return(int code, CP_Services Svcs, CManager cm, char const *msg) +{ + if (code != FI_SUCCESS) + { + Svcs->verbose(cm, DPCriticalVerbose, "%s: %s (%lu)\n", msg, + fi_strerror(code), code); + } + return code; +} + struct fabric_state { struct fi_context *ctx; @@ -812,8 +822,14 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, ContactInfo->Length = Fabric->info->src_addrlen; ContactInfo->Address = malloc(ContactInfo->Length); - fi_getname((fid_t)Fabric->signal, ContactInfo->Address, - &ContactInfo->Length); + if (guard_fi_return(fi_getname((fid_t)Fabric->signal, ContactInfo->Address, + &ContactInfo->Length), + Svcs, CP_Stream, + "[RdmaInitReader] fi_getname() failed with:") != + FI_SUCCESS) + { + return NULL; + } Stream->PreloadStep = -1; Stream->ContactInfo = ContactInfo; @@ -1122,8 +1138,15 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs, for (i = 0; i < readerCohortSize; i++) { - fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1, - &WSR_Stream->ReaderAddr[i], 0, NULL); + if (fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1, + &WSR_Stream->ReaderAddr[i], 0, NULL) < 1) + { + + Svcs->verbose(WS_Stream->CP_Stream, DPCriticalVerbose, + "[RdmaInitWRiterPerReader] Failed inserting address " + "into vector\n"); + return NULL; + } Svcs->verbose(WS_Stream->CP_Stream, DPTraceVerbose, "Received contact info for RS_Stream %p, WSR Rank %d\n", providedReaderInfo[i]->RS_Stream, i); @@ -1145,8 +1168,14 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs, ContactInfo->Length = Fabric->info->src_addrlen; ContactInfo->Address = malloc(ContactInfo->Length); - fi_getname((fid_t)Fabric->signal, ContactInfo->Address, - &ContactInfo->Length); + if (guard_fi_return(fi_getname((fid_t)Fabric->signal, ContactInfo->Address, + &ContactInfo->Length), + Svcs, WS_Stream->CP_Stream, + "[RdmaInitWriterPerReader] fi_getname() failed with") != + FI_SUCCESS) + { + return NULL; + } ReaderRollHandle = &ContactInfo->ReaderRollHandle; ReaderRollHandle->Block = @@ -1204,8 +1233,15 @@ static void RdmaProvideWriterDataToReader(CP_Services Svcs, { RS_Stream->WriterContactInfo[i].WS_Stream = providedWriterInfo[i]->WS_Stream; - fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1, - &RS_Stream->WriterAddr[i], 0, NULL); + if (fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1, + &RS_Stream->WriterAddr[i], 0, NULL) < 1) + { + Svcs->verbose(RS_Stream->CP_Stream, DPCriticalVerbose, + "[RdmaProvideWriterDataToReader] " + "Failed inserting address " + "into vector\n"); + return; + } RS_Stream->WriterRoll[i] = providedWriterInfo[i]->ReaderRollHandle; Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose, "Received contact info for WS_stream %p, WSR Rank %d\n", @@ -2233,9 +2269,12 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) SendBuffer[WRidx].Handle.Key = fi_mr_key(RankLog->preqbmr); RollDest = (uint64_t)Stream->WriterRoll[i].Block + (sizeof(struct _RdmaBuffer) * Stream->Rank); - fi_write(Fabric->signal, &SendBuffer[WRidx], - sizeof(struct _RdmaBuffer), sbdesc, Stream->WriterAddr[i], - RollDest, Stream->WriterRoll[i].Key, &SendBuffer[WRidx]); + guard_fi_return( + (int)fi_write(Fabric->signal, &SendBuffer[WRidx], + sizeof(struct _RdmaBuffer), sbdesc, + Stream->WriterAddr[i], RollDest, + Stream->WriterRoll[i].Key, &SendBuffer[WRidx]), + Svcs, Stream->CP_Stream, "[PostPreload] fi_write failed with:"); RankLog->PreloadHandles = malloc(sizeof(void *) * 2); RankLog->PreloadHandles[0] = calloc(sizeof(struct _RdmaCompletionHandle), RankLog->Entries); @@ -2247,7 +2286,20 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep) while (WRidx > 0) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = + fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose, + "[PostPreload] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, + error.err_data, NULL, error.len)); + return; + } CQBuffer = CQEntry.op_context; if (CQBuffer >= SendBuffer && CQBuffer < (SendBuffer + StepLog->WRanks)) { @@ -2363,17 +2415,34 @@ static void PullSelection(CP_Services Svcs, Rdma_WSR_Stream Stream) for (RankReq = Stream->PreloadReq; RankReq; RankReq = RankReq->next) { RankReq->ReqLog = (RdmaBuffer)ReadBuffer; - fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize, rrdesc, - Stream->ReaderAddr[RankReq->Rank], - (uint64_t)ReaderRoll[RankReq->Rank].Handle.Block, - ReaderRoll[RankReq->Rank].Handle.Key, RankReq); + guard_fi_return( + (int)fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize, + rrdesc, Stream->ReaderAddr[RankReq->Rank], + (uint64_t)ReaderRoll[RankReq->Rank].Handle.Block, + ReaderRoll[RankReq->Rank].Handle.Key, RankReq), + Svcs, WS_Stream->CP_Stream, + "[PullSelection] fi_read() failed with:"); ReadBuffer += RankReq->BufferSize; } RankReq = Stream->PreloadReq; while (RankReq) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = + fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + WS_Stream->CP_Stream, DPCriticalVerbose, + "[PullSelection] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, + NULL, error.len)); + return; + } CQRankReq = CQEntry.op_context; if (CQEntry.flags & FI_READ) { @@ -2408,7 +2477,21 @@ static void CompletePush(CP_Services Svcs, Rdma_WSR_Stream Stream, while (Step->OutstandingWrites > 0) { - fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + ssize_t rc = + fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1); + if (rc < 1) + { + struct fi_cq_err_entry error; + fi_cq_readerr(Fabric->cq_signal, &error, 0); + Svcs->verbose( + WS_Stream->CP_Stream, DPCriticalVerbose, + "[CompletePush] failure while waiting for completions " + "(%d (%s - %s)).\n", + rc, fi_strerror(error.err), + fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data, + NULL, error.len)); + return; + } if (CQEntry.flags & FI_WRITE) { CQTimestep = (long)CQEntry.op_context;