diff --git a/source/adios2/toolkit/sst/dp/rdma_dp.c b/source/adios2/toolkit/sst/dp/rdma_dp.c index 2f73bb9e16..f122e16bc2 100644 --- a/source/adios2/toolkit/sst/dp/rdma_dp.c +++ b/source/adios2/toolkit/sst/dp/rdma_dp.c @@ -19,6 +19,12 @@ #include #include +#ifdef SST_HAVE_CRAY_CXI +// Needs to be included before rdma/fi_cxi_ext.h +#include +#include +#endif + #if defined(__has_feature) #if __has_feature(thread_sanitizer) #define NO_SANITIZE_THREAD __attribute__((no_sanitize("thread"))) @@ -130,6 +136,9 @@ struct fabric_state struct fid_cq *cq_signal; struct fid_av *av; pthread_t listener; +#ifdef SST_HAVE_CRAY_CXI + struct cxi_auth_key *cxi_auth_key; +#endif #ifdef SST_HAVE_CRAY_DRC drc_info_handle_t drc_info; uint32_t credential; @@ -173,13 +182,24 @@ struct fabric_state * plane would replace one or both of these with RDMA functionality. */ +static char const *get_preferred_domain(struct _SstParams *Params) +{ + if (Params->DataInterface) + { + return Params->DataInterface; + } + else + { + return getenv("FABRIC_IFACE"); + } +} + static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, CP_Services Svcs, - void *CP_Stream) + void *CP_Stream, char const *ifname) { struct fi_info *hints, *info, *originfo, *useinfo; struct fi_av_attr av_attr = {FI_AV_UNSPEC}; struct fi_cq_attr cq_attr = {0}; - char *ifname; int result; hints = fi_allocinfo(); @@ -187,24 +207,68 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, FI_MSG | FI_SEND | FI_RECV | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA | FI_READ | FI_WRITE; hints->mode = FI_CONTEXT | FI_LOCAL_MR | FI_CONTEXT2 | FI_MSG_PREFIX | FI_ASYNC_IOV | FI_RX_CQ_DATA; - hints->domain_attr->mr_mode = FI_MR_BASIC; - hints->domain_attr->control_progress = FI_PROGRESS_AUTO; - hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->ep_attr->type = FI_EP_RDM; - if (Params->DataInterface) + uint32_t fi_version; +#ifdef SST_HAVE_CRAY_CXI + if (fabric->cxi_auth_key) { - ifname = Params->DataInterface; + fi_version = FI_VERSION(1, 11); + + hints->domain_attr->mr_mode = FI_MR_ENDPOINT; + hints->domain_attr->control_progress = FI_PROGRESS_MANUAL; + hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; + + // Authentication is needed + // TODO: the first ID in SLINGSHOT_SVC_IDS is chosen, but we should + // rather choose the one corresponding with the FABRIC_IFACE + // example: + // SLINGSHOT_SVC_IDS=5,5,5,5 + // SLINGSHOT_VNIS=1310,1271 + // SLINGSHOT_DEVICES=cxi0,cxi1,cxi2,cxi3 + // FABRIC_IFACE=cxi2 (user specified) + + hints->ep_attr->auth_key = malloc(sizeof(struct cxi_auth_key)); + memcpy(hints->ep_attr->auth_key, fabric->cxi_auth_key, sizeof(struct cxi_auth_key)); + hints->ep_attr->auth_key_size = sizeof(struct cxi_auth_key); + + hints->domain_attr->auth_key = malloc(sizeof(struct cxi_auth_key)); + memcpy(hints->domain_attr->auth_key, fabric->cxi_auth_key, sizeof(struct cxi_auth_key)); + hints->domain_attr->auth_key_size = sizeof(struct cxi_auth_key); } else { - ifname = getenv("FABRIC_IFACE"); + fi_version = FI_VERSION(1, 5); + + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; + } +#else + fi_version = FI_VERSION(1, 5); + + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; +#endif + + /* + * ifname is passed as a function parameter of init_fabric() if + * a provider-specific key was configured and sent to the reader. + * Since the key is generally domain-specific, we must use that one in this + * case. + * The preferred domain is already considered upon key configuration, + * so this is fine. + */ + if (!ifname) + { + ifname = get_preferred_domain(Params); } fabric->info = NULL; pthread_mutex_lock(&fabric_mutex); - fi_getinfo(FI_VERSION(1, 5), NULL, NULL, 0, hints, &info); + fi_getinfo(fi_version, NULL, NULL, 0, hints, &info); pthread_mutex_unlock(&fabric_mutex); if (!info) { @@ -228,7 +292,8 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, break; } if ((((strcmp(prov_name, "verbs") == 0) && info->src_addr) || - (strcmp(prov_name, "gni") == 0) || (strcmp(prov_name, "psm2") == 0)) && + (strcmp(prov_name, "gni") == 0) || (strcmp(prov_name, "psm2") == 0) || + (strcmp(prov_name, "cxi") == 0)) && (!useinfo || !ifname || (strcmp(useinfo->domain_attr->name, ifname) != 0))) { Svcs->verbose(CP_Stream, DPTraceVerbose, @@ -238,7 +303,7 @@ static void init_fabric(struct fabric_state *fabric, struct _SstParams *Params, useinfo = info; } else if (((strstr(prov_name, "verbs") && info->src_addr) || strstr(prov_name, "gni") || - strstr(prov_name, "psm2")) && + strstr(prov_name, "psm2") || strstr(prov_name, "cxi")) && !useinfo) { Svcs->verbose(CP_Stream, DPTraceVerbose, @@ -489,6 +554,12 @@ static void fini_fabric(struct fabric_state *fabric, CP_Services Svcs, void *CP_ free(fabric->ctx); } +#ifdef SST_HAVE_CRAY_CXI + if (fabric->cxi_auth_key) + { + free(fabric->cxi_auth_key); + } +#endif #ifdef SST_HAVE_CRAY_DRC if (Fabric->auth_key) { @@ -677,6 +748,191 @@ static TimestepList GetStep(Rdma_WS_Stream Stream, long Timestep) return (Step); } +#ifdef SST_HAVE_CRAY_CXI +static int get_cxi_auth_key_from_env(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params, + struct cxi_auth_key *key, char **used_device) +{ + int vni, first_vni, second_vni, svc_id; + + // Just some safety against faulty strings in string processing. + size_t const no_infinite_loops = 10000; + + // struct cxi_auth_key { + // /* The CXI service assigned to the Domain and Endpoints. A CXI + // service + // * is associated with a set of local resource limits, VNIs, and + // Traffic + // * Classes. + // * + // * The svc_id used by an OFI Domain must match all Endpoints belonging + // * to the Domain. + // */ + // uint32_t svc_id; + + // /* The Virtual Network ID (VNI) assigned to the Endpoint. Two + // Endpoints + // * must use the same VNI in order to communicate. + // * + // * Note that while the CXI service may define one or more VNIs which a + // * process can access, an Endpoint is assigned to only one. + // */ + // uint16_t vni; + // }; + + // typical value SLINGSHOT_DEVICES=cxi0,cxi1,cxi2,cxi3 + char const *slingshot_devices = getenv("SLINGSHOT_DEVICES"); + char const *preferred_device = get_preferred_domain(Params); + + /* + * In the following loop, find out if the preferred_device is found within + * the slingshot_devices. + * If the preferred_device is NULL, just pick the first. + * Upon success, modifies the output parameter used_device and stores + * the retrieved device index. + */ + size_t device_index = 0; + for (size_t no_infinite_loop_counter = 0;; ++device_index, ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + // Are we at the end of the environment variable? + int found_end = 0; + + // Find out the length of the current item in slingshot_devices. + size_t find_end_of_current_string = 0; + for (size_t no_infinite_loop_inner_counter = 0;; + ++find_end_of_current_string, ++no_infinite_loop_inner_counter) + { + if (no_infinite_loop_inner_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (slingshot_devices[find_end_of_current_string]) + { + case '\0': + found_end = 1; + goto break_first_loop; + case ',': + goto break_first_loop; + default: + break; + } + } + break_first_loop:; + int use_this_device = !preferred_device || (strncmp(preferred_device, slingshot_devices, + find_end_of_current_string) == 0); + if (use_this_device) + { + char *construct_used_device = malloc(find_end_of_current_string + 1); + memcpy(construct_used_device, slingshot_devices, find_end_of_current_string); + construct_used_device[find_end_of_current_string] = '\0'; + *used_device = construct_used_device; + break; + } + else if (found_end) + { + return EXIT_FAILURE; + } + else + { + // go to next iteration + slingshot_devices += find_end_of_current_string + 1; + } + } + + Svcs->verbose(CP_Stream, DPTraceVerbose, "Found device %s at index %zu\n", *used_device, + device_index); + + // typical value SLINGSHOT_VNIS=4576,4530 + char const *vni_env_str = getenv("SLINGSHOT_VNIS"); + if (!vni_env_str) + { + return EXIT_FAILURE; + } + + // typical value SLINGSHOT_SVC_IDS=5,5,5,5 + char const *svc_ids_env_str = getenv("SLINGSHOT_SVC_IDS"); + if (!svc_ids_env_str) + { + return EXIT_FAILURE; + } + + { + int num_vnis = sscanf(vni_env_str, "%d,%d", &first_vni, &second_vni); + switch (num_vnis) + { + // first VNI is the subjob's VNI + case 1: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using first vni.\n"); + vni = first_vni; + break; + // if present, the second VNI is the containing job's VNI + // the first VNI belongs to the subjob + case 2: + Svcs->verbose(CP_Stream, DPTraceVerbose, "Using second vni.\n"); + vni = second_vni; + break; + default: + return EXIT_FAILURE; + } + } + + { + // Pick the service ID according to the device_index found above. + for (size_t svc_id_index = 0; svc_id_index < device_index; ++svc_id_index) + { + for (size_t no_infinite_loop_counter = 0;; ++no_infinite_loop_counter) + { + if (no_infinite_loop_counter > no_infinite_loops) + { + return EXIT_FAILURE; + } + + switch (*(svc_ids_env_str++)) + { + case ',': + goto break_second_loop; + case '\0': + return EXIT_FAILURE; + default: + continue; + } + } + break_second_loop:; + } + + int num_svc_ids = sscanf(svc_ids_env_str, "%d", &svc_id); + switch (num_svc_ids) + { + case 1: + break; + default: + return EXIT_FAILURE; + } + } + + key->vni = vni; + key->svc_id = svc_id; + + return EXIT_SUCCESS; +} + +static int get_cxi_auth_key_from_writer(struct cxi_auth_key *key, attr_list WriterContact) +{ + long vni; + if (!get_long_attr(WriterContact, attr_atom_from_string("vni"), &vni)) + { + return EXIT_FAILURE; + } + key->vni = (uint16_t)vni; + return EXIT_SUCCESS; +} +#endif + static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **ReaderContactInfoPtr, struct _SstParams *Params, attr_list WriterContact, SstStats Stats) @@ -723,6 +979,38 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **Rea Stream->PreloadAvail = 0; } + char *required_device = NULL; +#ifdef SST_HAVE_CRAY_CXI + struct + { + struct cxi_auth_key key; + int valid; + } tagged_key; + + /* + * The svc_id of the key must match the device that this particular reader + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (get this from the writer). + */ + + tagged_key.valid = + get_cxi_auth_key_from_env(Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + if (tagged_key.valid == EXIT_SUCCESS && + get_cxi_auth_key_from_writer(&tagged_key.key, WriterContact) == EXIT_SUCCESS) + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Reader found CXI auth key: %d %d\n", + tagged_key.key.vni, tagged_key.key.svc_id); + Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); + memcpy(Stream->Fabric->cxi_auth_key, &tagged_key.key, sizeof(struct cxi_auth_key)); + } + else + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Reader found no CXI auth key\n"); + } +#endif + #ifdef SST_HAVE_CRAY_DRC int attr_cred, try_left, rc; if (!get_int_attr(WriterContact, attr_atom_from_string("RDMA_DRC_KEY"), &attr_cred)) @@ -753,7 +1041,11 @@ static DP_RS_Stream RdmaInitReader(CP_Services Svcs, void *CP_Stream, void **Rea #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Stream->Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } if (!Fabric->info) { Svcs->verbose(CP_Stream, DPCriticalVerbose, "Could not find a valid transport fabric.\n"); @@ -858,6 +1150,42 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, struct _Ss Stream->Fabric = calloc(1, sizeof(struct fabric_state)); Fabric = Stream->Fabric; + + char *required_device = NULL; +#ifdef SST_HAVE_CRAY_CXI + struct + { + struct cxi_auth_key key; + int valid; + } tagged_key; + + /* + * The svc_id of the key must match the device that this particular writer + * connects with. + * The vni (virtual network ID) must be the same across all communicating + * instances (use the one seen by rank 0). + */ + tagged_key.valid = + get_cxi_auth_key_from_env(Svcs, CP_Stream, Params, &tagged_key.key, &required_device); + + // Ensure that all writers use the same virtual network ID + SMPI_Bcast(&tagged_key.key.vni, sizeof(tagged_key.key.vni), SMPI_BYTE, 0, comm); + + if (tagged_key.valid == EXIT_SUCCESS) + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Writer found CXI auth key: %d %d\n", + tagged_key.key.vni, tagged_key.key.svc_id); + + set_long_attr(DPAttrs, attr_atom_from_string("vni"), tagged_key.key.vni); + Stream->Fabric->cxi_auth_key = calloc(1, sizeof(struct cxi_auth_key)); + memcpy(Stream->Fabric->cxi_auth_key, &tagged_key.key, sizeof(struct cxi_auth_key)); + } + else + { + Svcs->verbose(CP_Stream, DPSummaryVerbose, "Writer found no CXI auth key"); + } +#endif + #ifdef SST_HAVE_CRAY_DRC int try_left, rc; if (Stream->Rank == 0) @@ -901,7 +1229,11 @@ static DP_WS_Stream RdmaInitWriter(CP_Services Svcs, void *CP_Stream, struct _Ss set_long_attr(DPAttrs, attr_atom_from_string("RDMA_DRC_CRED"), attr_cred); #endif /* SST_HAVE_CRAY_DRC */ - init_fabric(Stream->Fabric, Params, Svcs, CP_Stream); + init_fabric(Stream->Fabric, Params, Svcs, CP_Stream, required_device); + if (required_device) + { + free(required_device); + } Fabric = Stream->Fabric; if (!Fabric->info) { @@ -1735,7 +2067,7 @@ static struct _CP_DP_Interface RdmaDPInterface = {0}; static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams *Params) { struct fi_info *hints, *info, *originfo; - char *ifname; + char const *ifname; char *forkunsafe; int Ret = -1; @@ -1744,20 +2076,41 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams FI_MSG | FI_SEND | FI_RECV | FI_REMOTE_READ | FI_REMOTE_WRITE | FI_RMA | FI_READ | FI_WRITE; hints->mode = FI_CONTEXT | FI_LOCAL_MR | FI_CONTEXT2 | FI_MSG_PREFIX | FI_ASYNC_IOV | FI_RX_CQ_DATA; - hints->domain_attr->mr_mode = FI_MR_BASIC; - hints->domain_attr->control_progress = FI_PROGRESS_AUTO; - hints->domain_attr->data_progress = FI_PROGRESS_AUTO; hints->ep_attr->type = FI_EP_RDM; - if (Params->DataInterface) + char const *vni_env_str = getenv("SLINGSHOT_VNIS"); + + uint32_t fi_version; + if (vni_env_str) { - ifname = Params->DataInterface; + // try fishing for the CXI provider + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "RDMA Dataplane trying to check for an available CXI " + "provider since environment variable SLINGSHOT_VNIS is " + "defined (value: '%s').\n", + vni_env_str); + fi_version = FI_VERSION(1, 11); + + hints->domain_attr->mr_mode = FI_MR_ENDPOINT; + hints->domain_attr->control_progress = FI_PROGRESS_MANUAL; + hints->domain_attr->data_progress = FI_PROGRESS_MANUAL; } else { - ifname = getenv("FABRIC_IFACE"); + Svcs->verbose(CP_Stream, DPSummaryVerbose, + "RDMA Dataplane trying to check for an available non-CXI " + "provider since environment variable SLINGSHOT_VNIS is " + "not defined.\n"); + + fi_version = FI_VERSION(1, 5); + + hints->domain_attr->mr_mode = FI_MR_BASIC; + hints->domain_attr->control_progress = FI_PROGRESS_AUTO; + hints->domain_attr->data_progress = FI_PROGRESS_AUTO; } + ifname = get_preferred_domain(Params); + forkunsafe = getenv("FI_FORK_UNSAFE"); if (!forkunsafe) { @@ -1765,7 +2118,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams } pthread_mutex_lock(&fabric_mutex); - fi_getinfo(FI_VERSION(1, 5), NULL, NULL, 0, hints, &info); + fi_getinfo(fi_version, NULL, NULL, 0, hints, &info); pthread_mutex_unlock(&fabric_mutex); fi_freeinfo(hints); @@ -1783,6 +2136,10 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams prov_name = info->fabric_attr->prov_name; domain_name = info->domain_attr->name; + Svcs->verbose(CP_Stream, DPPerStepVerbose, + "[RdmaGetPriority] Seeing and evaluating fabric with " + "provider: '%s', domain: '%s'\n", + prov_name, domain_name); if (ifname && strcmp(ifname, domain_name) == 0) { Svcs->verbose(CP_Stream, DPPerStepVerbose, @@ -1793,7 +2150,7 @@ static int RdmaGetPriority(CP_Services Svcs, void *CP_Stream, struct _SstParams break; } if ((strstr(prov_name, "verbs") && info->src_addr) || strstr(prov_name, "gni") || - strstr(prov_name, "psm2")) + strstr(prov_name, "psm2") || strstr(prov_name, "cxi")) { Svcs->verbose(CP_Stream, DPPerStepVerbose,