diff --git a/ompi/mca/mtl/ofi/mtl_ofi.c b/ompi/mca/mtl/ofi/mtl_ofi.c index fcabc55baa4..35cb52443d9 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi.c +++ b/ompi/mca/mtl/ofi/mtl_ofi.c @@ -1,6 +1,6 @@ /* * Copyright (c) 2013-2020 Intel, Inc. All rights reserved. - * Copyright (c) 2021-2024 Triad National Security, LLC. All rights + * Copyright (c) 2021-2022 Triad National Security, LLC. All rights * reserved. * * $COPYRIGHT$ @@ -14,6 +14,8 @@ OMPI_DECLSPEC extern mca_mtl_ofi_component_t mca_mtl_ofi_component; +OBJ_CLASS_INSTANCE(mca_mtl_comm_t, opal_object_t, NULL, NULL); + mca_mtl_ofi_module_t ompi_mtl_ofi = { { (int)((1ULL << MTL_OFI_CID_BIT_COUNT_1) - 1), /* max cid */ @@ -344,10 +346,43 @@ int ompi_mtl_ofi_add_comm(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm) { int ret = OMPI_SUCCESS; + uint32_t comm_size; + mca_mtl_comm_t* mtl_comm; mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ? OFI_REGULAR_EP : OFI_SCALABLE_EP; + if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { + mtl_comm = OBJ_NEW(mca_mtl_comm_t); + + if (OMPI_COMM_IS_INTER(comm)) { + comm_size = ompi_comm_remote_size(comm); + } else { + comm_size = ompi_comm_size(comm); + } + mtl_comm->c_index_vec = (c_index_vec_t *)malloc(sizeof(c_index_vec_t) * comm_size); + if (NULL == mtl_comm->c_index_vec) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + OBJ_RELEASE(mtl_comm); + goto error; + } else { + for (uint32_t i=0; i < comm_size; i++) { + mtl_comm->c_index_vec[i].c_index_state = MCA_MTL_OFI_CID_NOT_EXCHANGED; + } + } + if (OMPI_COMM_IS_INTRA(comm)) { + mtl_comm->c_index_vec[comm->c_my_rank].c_index = comm->c_index; + mtl_comm->c_index_vec[comm->c_my_rank].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; + } + + comm->c_mtl_comm = mtl_comm; + + } else { + + comm->c_mtl_comm = NULL; + + } + /* * If thread grouping enabled, add new OFI context for each communicator * other than MPI_COMM_SELF. @@ -377,6 +412,12 @@ int ompi_mtl_ofi_del_comm(struct mca_mtl_base_module_t *mtl, mca_mtl_ofi_ep_type ep_type = (0 == ompi_mtl_ofi.enable_sep) ? OFI_REGULAR_EP : OFI_SCALABLE_EP; + if(NULL != comm->c_mtl_comm) { + free(comm->c_mtl_comm->c_index_vec); + OBJ_RELEASE(comm->c_mtl_comm); + comm->c_mtl_comm = NULL; + } + /* * Clean up OFI contexts information. */ diff --git a/ompi/mca/mtl/ofi/mtl_ofi.h b/ompi/mca/mtl/ofi/mtl_ofi.h index ee8a24fca69..aae756b0518 100644 --- a/ompi/mca/mtl/ofi/mtl_ofi.h +++ b/ompi/mca/mtl/ofi/mtl_ofi.h @@ -77,6 +77,32 @@ int ompi_mtl_ofi_progress_no_inline(void); extern opal_thread_local int ompi_mtl_ofi_per_thread_ctx; #endif +#define MCA_MTL_OFI_CID_NOT_EXCHANGED 2 +#define MCA_MTL_OFI_CID_EXCHANGING 1 +#define MCA_MTL_OFI_CID_EXCHANGED 0 + +typedef struct { + uint32_t c_index:30; + uint32_t c_index_state:2; +} c_index_vec_t; + +typedef struct mca_mtl_comm_t { + opal_object_t super; + c_index_vec_t *c_index_vec; +} mca_mtl_comm_t; + +OBJ_CLASS_DECLARATION(mca_mtl_comm_t); + +struct mca_mtl_ofi_cid_hdr_t { + ompi_comm_extended_cid_t hdr_cid; + int16_t hdr_src_c_index; + int32_t hdr_src; + bool need_response; + bool ofi_cq_data; +}; + +typedef struct mca_mtl_ofi_cid_hdr_t mca_mtl_ofi_cid_hdr_t; + /* Set OFI context for operations which generate completion events */ __opal_attribute_always_inline__ static inline void set_thread_context(int ctxt) @@ -487,6 +513,135 @@ ompi_mtl_ofi_map_comm_to_ctxt(uint32_t comm_id) return ompi_mtl_ofi.comm_to_context[comm_id]; } +__opal_attribute_always_inline__ static inline int +ompi_mtl_ofi_post_recv_excid_buffer(bool blocking, struct ompi_communicator_t *comm, int src); + +__opal_attribute_always_inline__ static inline int +ompi_mtl_ofi_send_excid(struct mca_mtl_base_module_t *mtl, + struct ompi_communicator_t *comm, + int dest, + bool ofi_cq_data, + bool is_send); + +__opal_attribute_always_inline__ static inline int +ompi_mtl_ofi_recv_excid_error_callback(struct fi_cq_err_entry *error, + ompi_mtl_ofi_request_t *ofi_req) +{ + ompi_status_public_t *status; + assert(ofi_req->super.ompi_req); + status = &ofi_req->super.ompi_req->req_status; + status->MPI_TAG = MTL_OFI_GET_TAG(ofi_req->match_bits); + status->MPI_SOURCE = mtl_ofi_get_source((struct fi_cq_tagged_entry *) error); + + switch (error->err) { + case FI_ETRUNC: + status->MPI_ERROR = MPI_ERR_TRUNCATE; + break; + case FI_ECANCELED: + status->_cancelled = true; + break; + default: + status->MPI_ERROR = MPI_ERR_INTERN; + } + + ofi_req->super.completion_callback(&ofi_req->super); + return OMPI_SUCCESS; +} + +__opal_attribute_always_inline__ static inline int +ompi_mtl_ofi_post_recv_excid_buffer_callback(struct fi_cq_tagged_entry *wc, + ompi_mtl_ofi_request_t *ofi_req) +{ + ofi_req->completion_count--; + int ret; + mca_mtl_ofi_cid_hdr_t *buffer = (mca_mtl_ofi_cid_hdr_t *)ofi_req->buffer; + ompi_comm_extended_cid_t excid; + ompi_communicator_t *comm; + int src = buffer->hdr_src; + mca_mtl_comm_t *mtl_comm; + + excid.cid_base = buffer->hdr_cid.cid_base; + excid.cid_sub.u64 = buffer->hdr_cid.cid_sub.u64; + for (int i = 0; i < 8; i++) { + excid.cid_sub.u8[i] = buffer->hdr_cid.cid_sub.u8[i]; + } + + comm = ompi_comm_lookup_cid(excid); + if (comm == NULL) { + comm = ompi_comm_lookup(buffer->hdr_src_c_index); + } + + if (comm == NULL) { + return OMPI_SUCCESS; + } + + mtl_comm = comm->c_mtl_comm; + + if (mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED + && buffer->need_response) { + mtl_comm->c_index_vec[src].c_index = buffer->hdr_src_c_index; + mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; + ret = ompi_mtl_ofi_send_excid(ofi_req->mtl, comm, src, buffer->ofi_cq_data, false); + } else { + mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGED; + mtl_comm->c_index_vec[src].c_index = buffer->hdr_src_c_index; + } + + ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); + return ret; +} + +__opal_attribute_always_inline__ static inline int +ompi_mtl_ofi_post_recv_excid_buffer(bool blocking, struct ompi_communicator_t *comm, int src) +{ + int ctxt_id = 0; + ssize_t ret; + ompi_mtl_ofi_request_t *ofi_req = malloc(sizeof(ompi_mtl_ofi_request_t)); + mca_mtl_ofi_cid_hdr_t *start = malloc(sizeof(mca_mtl_ofi_cid_hdr_t)); + size_t length = sizeof(mca_mtl_ofi_cid_hdr_t); + mca_mtl_comm_t *mtl_comm; + + mtl_comm = comm->c_mtl_comm; + + set_thread_context(ctxt_id); + + ofi_req->type = OMPI_MTL_OFI_RECV; + ofi_req->event_callback = ompi_mtl_ofi_post_recv_excid_buffer_callback; + ofi_req->error_callback = ompi_mtl_ofi_recv_excid_error_callback; + ofi_req->buffer = start; + ofi_req->length = length; + ofi_req->convertor = NULL; + ofi_req->req_started = false; + ofi_req->status.MPI_ERROR = OMPI_SUCCESS; + ofi_req->remote_addr = 0UL; + ofi_req->match_bits = 0UL; + ofi_req->completion_count = 1; + ofi_req->comm = comm; + + OFI_RETRY_UNTIL_DONE(fi_recv(ompi_mtl_ofi.ofi_ctxt[0].rx_ep, + start, + length, + NULL, + FI_ADDR_UNSPEC, + (void *)&ofi_req->ctx), ret); + if (OPAL_UNLIKELY(0 > ret)) { + if (NULL != ofi_req->buffer) { + free(ofi_req->buffer); + } + MTL_OFI_LOG_FI_ERR(ret, "fi_recv failed"); + return ompi_mtl_ofi_get_error(ret); + } + + if (blocking) { + assert(src != -1); + while (mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + ompi_mtl_ofi_progress(); + } + } + + return OMPI_SUCCESS; +} + __opal_attribute_always_inline__ static inline int ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req, struct ompi_communicator_t *comm, @@ -537,6 +692,139 @@ ompi_mtl_ofi_ssend_recv(ompi_mtl_ofi_request_t *ack_req, return OMPI_SUCCESS; } +/* + * this routine is invoked in the case of communicators which are not using a + * global cid, i.e. those created using MPI_Comm_create_from_group/ + * MPI_Intercomm_create_from_groups in order to exchange the local cid used + * by the sender for this supplied communicator. This function is only invoked + * for the first message sent to a given receiver. + */ +static int +ompi_mtl_ofi_send_excid(struct mca_mtl_base_module_t *mtl, + struct ompi_communicator_t *comm, + int dest, + bool ofi_cq_data, + bool is_send) +{ + ssize_t ret = OMPI_SUCCESS; + ompi_mtl_ofi_request_t *ofi_req = NULL; + int ctxt_id = 0; + mca_mtl_ofi_cid_hdr_t *start = NULL; + ompi_proc_t *ompi_proc = NULL; + mca_mtl_ofi_endpoint_t *endpoint = NULL; + fi_addr_t sep_peer_fiaddr = 0; + mca_mtl_comm_t *mtl_comm; + + ofi_req = (ompi_mtl_ofi_request_t *)malloc(sizeof(ompi_mtl_ofi_request_t)); + if (NULL == ofi_req) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + start = (mca_mtl_ofi_cid_hdr_t *)malloc(sizeof(mca_mtl_ofi_cid_hdr_t)); + if (NULL == start) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto fn_exit; + } + + mtl_comm = comm->c_mtl_comm; + + ctxt_id = 0; + set_thread_context(ctxt_id); + + /** + * Create a send request, start it and wait until it completes. + */ + ofi_req->type = OMPI_MTL_OFI_SEND; + ofi_req->event_callback = ompi_mtl_ofi_send_excid_callback; + ofi_req->error_callback = ompi_mtl_ofi_send_error_callback; + ofi_req->buffer = start; + + ompi_proc = ompi_comm_peer_lookup(comm, dest); + endpoint = ompi_mtl_ofi_get_endpoint(mtl, ompi_proc); + + /* For Scalable Endpoints, gather target receive context */ + sep_peer_fiaddr = fi_rx_addr(endpoint->peer_fiaddr, ctxt_id, ompi_mtl_ofi.rx_ctx_bits); + + start->hdr_cid = comm->c_contextid; + start->hdr_src = comm->c_my_rank; + start->hdr_src_c_index = comm->c_index; + start->ofi_cq_data = ofi_cq_data; + if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + start->need_response = true; + } else { + start->need_response = false; + } + size_t length = sizeof(mca_mtl_ofi_cid_hdr_t); + + ofi_req->length = length; + ofi_req->status.MPI_ERROR = OMPI_SUCCESS; + ofi_req->completion_count = 0; + if (OPAL_UNLIKELY(length > endpoint->mtl_ofi_module->max_msg_size)) { + opal_show_help("help-mtl-ofi.txt", + "message too big", false, + length, endpoint->mtl_ofi_module->max_msg_size); + ret = OMPI_ERROR; + goto fn_exit; + } + + if (ompi_mtl_ofi.max_inject_size >= length) { + if (ofi_cq_data) { + OFI_RETRY_UNTIL_DONE(fi_injectdata(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, + start, + length, + comm->c_my_rank, + sep_peer_fiaddr), ret); + } else { + OFI_RETRY_UNTIL_DONE(fi_inject(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, + start, + length, + sep_peer_fiaddr), ret); + } + if (OPAL_UNLIKELY(0 > ret)) { + MTL_OFI_LOG_FI_ERR(ret, + ofi_cq_data ? "fi_injectdata failed" + : "fi_inject failed"); + + } + } else { + ofi_req->completion_count = 1; + if (ofi_cq_data) { + OFI_RETRY_UNTIL_DONE(fi_senddata(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, + start, + length, + NULL, + comm->c_my_rank, + sep_peer_fiaddr, + (void *) &ofi_req->ctx), ret); + } else { + OFI_RETRY_UNTIL_DONE(fi_send(ompi_mtl_ofi.ofi_ctxt[0].tx_ep, + start, + length, + NULL, + sep_peer_fiaddr, + (void *) &ofi_req->ctx), ret); + } + if (OPAL_UNLIKELY(0 > ret)) { + MTL_OFI_LOG_FI_ERR(ret, + ofi_cq_data ? "fi_tsenddata failed" + : "fi_tsend failed"); + } + } + + ret = ompi_mtl_ofi_get_error(ret); + ofi_req->status.MPI_ERROR = ret; + +fn_exit: + + if ((OMPI_SUCCESS != ret) || (ofi_req->completion_count == 0)) { + if (NULL != ofi_req) free(ofi_req); + if (NULL != start) free(start); + } + + return ret; +} + __opal_attribute_always_inline__ static inline int ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, struct ompi_communicator_t *comm, @@ -548,8 +836,7 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, { ssize_t ret = OMPI_SUCCESS; ompi_mtl_ofi_request_t ofi_req; - int ompi_ret, ctxt_id = 0; - uint32_t c_index_for_tag; + int ompi_ret, ctxt_id = 0, c_index_for_tag; void *start; bool free_after; size_t length; @@ -559,10 +846,29 @@ ompi_mtl_ofi_send_generic(struct mca_mtl_base_module_t *mtl, ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */ fi_addr_t src_addr = 0; fi_addr_t sep_peer_fiaddr = 0; + mca_mtl_comm_t *mtl_comm; - ompi_ret = ompi_comm_get_remote_cid(comm, dest, &c_index_for_tag); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { - return ompi_ret; + if (OPAL_LIKELY(OMPI_COMM_IS_GLOBAL_INDEX(comm))) { + c_index_for_tag = comm->c_index; + } else { + mtl_comm = comm->c_mtl_comm; + if (mtl_comm->c_index_vec[dest].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { + mtl_comm->c_index_vec[dest].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; + ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, dest, ofi_cq_data, true); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { + return ompi_ret; + } + } + + if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + while (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(true, comm, dest); + if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { + return ompi_ret; + } + } + } + c_index_for_tag = mtl_comm->c_index_vec[dest].c_index; } ompi_mtl_ofi_set_mr_null(&ofi_req); @@ -790,8 +1096,7 @@ ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl, { ssize_t ret = OMPI_SUCCESS; ompi_mtl_ofi_request_t *ofi_req = (ompi_mtl_ofi_request_t *) mtl_request; - int ompi_ret, ctxt_id = 0; - uint32_t c_index_for_tag; + int ompi_ret, ctxt_id = 0, c_index_for_tag; void *start; size_t length; bool free_after; @@ -800,12 +1105,24 @@ ompi_mtl_ofi_isend_generic(struct mca_mtl_base_module_t *mtl, mca_mtl_ofi_endpoint_t *endpoint = NULL; ompi_mtl_ofi_request_t *ack_req = NULL; /* For synchronous send */ fi_addr_t sep_peer_fiaddr = 0; + mca_mtl_comm_t *mtl_comm; ompi_mtl_ofi_set_mr_null(ofi_req); - ompi_ret = ompi_comm_get_remote_cid(comm, dest, &c_index_for_tag); - if (OPAL_UNLIKELY(OMPI_SUCCESS != ompi_ret)) { - return ompi_ret; + if (OMPI_COMM_IS_GLOBAL_INDEX(comm)) { + c_index_for_tag = comm->c_index; + } else { + mtl_comm = comm->c_mtl_comm; + if (mtl_comm->c_index_vec[dest].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { + mtl_comm->c_index_vec[dest].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; + ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, dest, ofi_cq_data, true); + } + if (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + while (mtl_comm->c_index_vec[dest].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) { + ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(true, comm, dest); + } + } + c_index_for_tag = mtl_comm->c_index_vec[dest].c_index; } if (ompi_mtl_ofi.total_ctxts_used > 0) { @@ -1056,9 +1373,23 @@ ompi_mtl_ofi_irecv_generic(struct mca_mtl_base_module_t *mtl, void *start; size_t length; bool free_after; + mca_mtl_comm_t *mtl_comm; ompi_mtl_ofi_set_mr_null(ofi_req); + if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { + mtl_comm = comm->c_mtl_comm; + if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) && + !ompi_mtl_ofi.has_posted_initial_buffer) { + ompi_mtl_ofi.has_posted_initial_buffer = true; + ompi_ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); + } + if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { + mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; + ompi_ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); + } + } + if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used; } else { @@ -1330,6 +1661,20 @@ ompi_mtl_ofi_iprobe_generic(struct mca_mtl_base_module_t *mtl, struct fi_msg_tagged msg; uint64_t msgflags = FI_PEEK | FI_COMPLETION; int ctxt_id = 0; + mca_mtl_comm_t *mtl_comm; + + if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { + mtl_comm = comm->c_mtl_comm; + if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) && + !ompi_mtl_ofi.has_posted_initial_buffer) { + ompi_mtl_ofi.has_posted_initial_buffer = true; + ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); + } + if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { + mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; + ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); + } + } if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used; @@ -1416,6 +1761,20 @@ ompi_mtl_ofi_improbe_generic(struct mca_mtl_base_module_t *mtl, struct fi_msg_tagged msg; uint64_t msgflags = FI_PEEK | FI_CLAIM | FI_COMPLETION; int ctxt_id = 0; + mca_mtl_comm_t *mtl_comm; + + if (!OMPI_COMM_IS_GLOBAL_INDEX(comm)) { + mtl_comm = comm->c_mtl_comm; + if ((src == MPI_ANY_SOURCE || mtl_comm->c_index_vec[src].c_index_state > MCA_MTL_OFI_CID_EXCHANGED) + && !ompi_mtl_ofi.has_posted_initial_buffer) { + ompi_mtl_ofi.has_posted_initial_buffer = true; + ret = ompi_mtl_ofi_post_recv_excid_buffer(false, comm, -1); + } + if (src >= 0 && mtl_comm->c_index_vec[src].c_index_state == MCA_MTL_OFI_CID_NOT_EXCHANGED) { + mtl_comm->c_index_vec[src].c_index_state = MCA_MTL_OFI_CID_EXCHANGING; + ret = ompi_mtl_ofi_send_excid(mtl, comm, src, ofi_cq_data, false); + } + } if (ompi_mtl_ofi.total_ctxts_used > 0) { ctxt_id = comm->c_contextid.cid_sub.u64 % ompi_mtl_ofi.total_ctxts_used;