Skip to content

Commit

Permalink
Check all return status
Browse files Browse the repository at this point in the history
  • Loading branch information
Franz Poeschel committed Jun 26, 2023
1 parent 1de243d commit 205c9f8
Showing 1 changed file with 101 additions and 18 deletions.
119 changes: 101 additions & 18 deletions source/adios2/toolkit/sst/dp/rdma_dp.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,16 @@ int sst_fi_mr_reg(struct fid_domain *domain, const void *buf, size_t len,
return res;
}

int guard_fi_return(int code, CP_Services Svcs, CManager cm, char const *msg)
{
if (code != FI_SUCCESS)
{
Svcs->verbose(cm, DPCriticalVerbose, "%s: %s (%lu)\n", msg,
fi_strerror(code), code);
}
return code;
}

struct fabric_state
{
struct fi_context *ctx;
Expand Down Expand Up @@ -812,8 +822,14 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream,

ContactInfo->Length = Fabric->info->src_addrlen;
ContactInfo->Address = malloc(ContactInfo->Length);
fi_getname((fid_t)Fabric->signal, ContactInfo->Address,
&ContactInfo->Length);
if (guard_fi_return(fi_getname((fid_t)Fabric->signal, ContactInfo->Address,
&ContactInfo->Length),
Svcs, CP_Stream,
"[RdmaInitReader] fi_getname() failed with:") !=
FI_SUCCESS)
{
return NULL;
}

Stream->PreloadStep = -1;
Stream->ContactInfo = ContactInfo;
Expand Down Expand Up @@ -1122,8 +1138,15 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs,

for (i = 0; i < readerCohortSize; i++)
{
fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1,
&WSR_Stream->ReaderAddr[i], 0, NULL);
if (fi_av_insert(Fabric->av, providedReaderInfo[i]->Address, 1,
&WSR_Stream->ReaderAddr[i], 0, NULL) < 1)
{

Svcs->verbose(WS_Stream->CP_Stream, DPCriticalVerbose,
"[RdmaInitWRiterPerReader] Failed inserting address "
"into vector\n");
return NULL;
}
Svcs->verbose(WS_Stream->CP_Stream, DPTraceVerbose,
"Received contact info for RS_Stream %p, WSR Rank %d\n",
providedReaderInfo[i]->RS_Stream, i);
Expand All @@ -1145,8 +1168,14 @@ static DP_WSR_Stream RdmaInitWriterPerReader(CP_Services Svcs,

ContactInfo->Length = Fabric->info->src_addrlen;
ContactInfo->Address = malloc(ContactInfo->Length);
fi_getname((fid_t)Fabric->signal, ContactInfo->Address,
&ContactInfo->Length);
if (guard_fi_return(fi_getname((fid_t)Fabric->signal, ContactInfo->Address,
&ContactInfo->Length),
Svcs, WS_Stream->CP_Stream,
"[RdmaInitWriterPerReader] fi_getname() failed with") !=
FI_SUCCESS)
{
return NULL;
}

ReaderRollHandle = &ContactInfo->ReaderRollHandle;
ReaderRollHandle->Block =
Expand Down Expand Up @@ -1204,8 +1233,15 @@ static void RdmaProvideWriterDataToReader(CP_Services Svcs,
{
RS_Stream->WriterContactInfo[i].WS_Stream =
providedWriterInfo[i]->WS_Stream;
fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1,
&RS_Stream->WriterAddr[i], 0, NULL);
if (fi_av_insert(Fabric->av, providedWriterInfo[i]->Address, 1,
&RS_Stream->WriterAddr[i], 0, NULL) < 1)
{
Svcs->verbose(RS_Stream->CP_Stream, DPCriticalVerbose,
"[RdmaProvideWriterDataToReader] "
"Failed inserting address "
"into vector\n");
return;
}
RS_Stream->WriterRoll[i] = providedWriterInfo[i]->ReaderRollHandle;
Svcs->verbose(RS_Stream->CP_Stream, DPTraceVerbose,
"Received contact info for WS_stream %p, WSR Rank %d\n",
Expand Down Expand Up @@ -2233,9 +2269,12 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep)
SendBuffer[WRidx].Handle.Key = fi_mr_key(RankLog->preqbmr);
RollDest = (uint64_t)Stream->WriterRoll[i].Block +
(sizeof(struct _RdmaBuffer) * Stream->Rank);
fi_write(Fabric->signal, &SendBuffer[WRidx],
sizeof(struct _RdmaBuffer), sbdesc, Stream->WriterAddr[i],
RollDest, Stream->WriterRoll[i].Key, &SendBuffer[WRidx]);
guard_fi_return(
(int)fi_write(Fabric->signal, &SendBuffer[WRidx],
sizeof(struct _RdmaBuffer), sbdesc,
Stream->WriterAddr[i], RollDest,
Stream->WriterRoll[i].Key, &SendBuffer[WRidx]),
Svcs, Stream->CP_Stream, "[PostPreload] fi_write failed with:");
RankLog->PreloadHandles = malloc(sizeof(void *) * 2);
RankLog->PreloadHandles[0] =
calloc(sizeof(struct _RdmaCompletionHandle), RankLog->Entries);
Expand All @@ -2247,7 +2286,20 @@ static void PostPreload(CP_Services Svcs, Rdma_RS_Stream Stream, long Timestep)

while (WRidx > 0)
{
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
ssize_t rc =
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
if (rc < 1)
{
struct fi_cq_err_entry error;
fi_cq_readerr(Fabric->cq_signal, &error, 0);
Svcs->verbose(Stream->CP_Stream, DPCriticalVerbose,
"[PostPreload] failure while waiting for completions "
"(%d (%s - %s)).\n",
rc, fi_strerror(error.err),
fi_cq_strerror(Fabric->cq_signal, error.err,
error.err_data, NULL, error.len));
return;
}
CQBuffer = CQEntry.op_context;
if (CQBuffer >= SendBuffer && CQBuffer < (SendBuffer + StepLog->WRanks))
{
Expand Down Expand Up @@ -2363,17 +2415,34 @@ static void PullSelection(CP_Services Svcs, Rdma_WSR_Stream Stream)
for (RankReq = Stream->PreloadReq; RankReq; RankReq = RankReq->next)
{
RankReq->ReqLog = (RdmaBuffer)ReadBuffer;
fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize, rrdesc,
Stream->ReaderAddr[RankReq->Rank],
(uint64_t)ReaderRoll[RankReq->Rank].Handle.Block,
ReaderRoll[RankReq->Rank].Handle.Key, RankReq);
guard_fi_return(
(int)fi_read(Fabric->signal, RankReq->ReqLog, RankReq->BufferSize,
rrdesc, Stream->ReaderAddr[RankReq->Rank],
(uint64_t)ReaderRoll[RankReq->Rank].Handle.Block,
ReaderRoll[RankReq->Rank].Handle.Key, RankReq),
Svcs, WS_Stream->CP_Stream,
"[PullSelection] fi_read() failed with:");
ReadBuffer += RankReq->BufferSize;
}

RankReq = Stream->PreloadReq;
while (RankReq)
{
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
ssize_t rc =
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
if (rc < 1)
{
struct fi_cq_err_entry error;
fi_cq_readerr(Fabric->cq_signal, &error, 0);
Svcs->verbose(
WS_Stream->CP_Stream, DPCriticalVerbose,
"[PullSelection] failure while waiting for completions "
"(%d (%s - %s)).\n",
rc, fi_strerror(error.err),
fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data,
NULL, error.len));
return;
}
CQRankReq = CQEntry.op_context;
if (CQEntry.flags & FI_READ)
{
Expand Down Expand Up @@ -2408,7 +2477,21 @@ static void CompletePush(CP_Services Svcs, Rdma_WSR_Stream Stream,

while (Step->OutstandingWrites > 0)
{
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
ssize_t rc =
fi_cq_sread(Fabric->cq_signal, (void *)(&CQEntry), 1, NULL, -1);
if (rc < 1)
{
struct fi_cq_err_entry error;
fi_cq_readerr(Fabric->cq_signal, &error, 0);
Svcs->verbose(
WS_Stream->CP_Stream, DPCriticalVerbose,
"[CompletePush] failure while waiting for completions "
"(%d (%s - %s)).\n",
rc, fi_strerror(error.err),
fi_cq_strerror(Fabric->cq_signal, error.err, error.err_data,
NULL, error.len));
return;
}
if (CQEntry.flags & FI_WRITE)
{
CQTimestep = (long)CQEntry.op_context;
Expand Down

0 comments on commit 205c9f8

Please sign in to comment.