diff --git a/src/ucp/core/ucp_ep.c b/src/ucp/core/ucp_ep.c index f378a95a212..d9bd15404a2 100644 --- a/src/ucp/core/ucp_ep.c +++ b/src/ucp/core/ucp_ep.c @@ -899,6 +899,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) { ucp_context_h context = worker->context; ucp_ep_rma_config_t *rma_config; + ucp_ep_addr_domain_config_t *domain_config; uct_iface_attr_t *iface_attr; uct_md_attr_t *md_attr; ucp_rsc_index_t rsc_index; @@ -917,6 +918,7 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) config->tag.eager.zcopy_auto_thresh = 0; config->am.zcopy_auto_thresh = 0; config->p2p_lanes = 0; + config->domain_lanes = 0; config->bcopy_thresh = context->config.ext.bcopy_thresh; config->tag.lane = UCP_NULL_LANE; config->tag.proto = &ucp_tag_eager_proto; @@ -1004,6 +1006,23 @@ void ucp_ep_config_init(ucp_worker_h worker, ucp_ep_config_t *config) } } + /* Configuration for memory domains */ + for (lane = 0; lane < config->key.num_lanes; ++lane) { + if (config->key.domain_lanes[lane] == UCP_NULL_LANE) { + continue; + } + config->domain_lanes |= UCS_BIT(lane); + + domain_config = &config->domain[lane]; + rsc_index = config->key.lanes[lane].rsc_index; + iface_attr = &worker->ifaces[rsc_index].attr; + + domain_config->tag.eager.max_short = iface_attr->cap.am.max_short; + //TODO: zcopy threshold should be based on the ep AM lane capability with domain addr(i.e can UCT do zcopy from domain) + memset(domain_config->tag.eager.zcopy_thresh, 0, UCP_MAX_IOV * sizeof(size_t)); + + } + /* Configuration for remote memory access */ for (lane = 0; lane < config->key.num_lanes; ++lane) { if (ucp_ep_config_get_rma_prio(config->key.rma_lanes, lane) == -1) { diff --git a/src/ucp/core/ucp_ep.h b/src/ucp/core/ucp_ep.h index f6fd896ee1b..4dd92a1b678 100644 --- a/src/ucp/core/ucp_ep.h +++ b/src/ucp/core/ucp_ep.h @@ -81,6 +81,9 @@ typedef struct ucp_ep_config_key { /* Lanes for atomic operations, sorted by priority, highest first */ ucp_lane_index_t amo_lanes[UCP_MAX_LANES]; + /* Lanes for domain operations, sorted by priority, highest first */ + ucp_lane_index_t domain_lanes[UCP_MAX_LANES]; + /* Bitmap of remote mds which are reachable from this endpoint (with any set * of transports which could be selected in the future). */ @@ -106,6 +109,15 @@ typedef struct ucp_ep_rma_config { } ucp_ep_rma_config_t; +typedef struct ucp_ep_addr_domain_config { + struct { + struct { + ssize_t max_short; + size_t zcopy_thresh[UCP_MAX_IOV]; + } eager; + } tag; +} ucp_ep_addr_domain_config_t; + /* * Configuration for AM and tag offload protocols */ @@ -136,6 +148,10 @@ typedef struct ucp_ep_config { */ ucp_lane_map_t p2p_lanes; + /* Bitmap of which lanes are domain lanes + */ + ucp_lane_map_t domain_lanes; + /* Configuration for each lane that provides RMA */ ucp_ep_rma_config_t rma[UCP_MAX_LANES]; /* Threshold for switching from put_short to put_bcopy */ @@ -179,8 +195,11 @@ typedef struct ucp_ep_config { * (currently it's only AM based). */ const ucp_proto_t *proto; } stream; -} ucp_ep_config_t; + /* Configuration of all domains */ + ucp_ep_addr_domain_config_t domain[UCP_MAX_LANES]; + +} ucp_ep_config_t; /** * Remote protocol layer endpoint @@ -245,4 +264,8 @@ size_t ucp_ep_config_get_zcopy_auto_thresh(size_t iovcnt, const ucp_context_h context, double bandwidth); +ucp_lane_index_t ucp_config_find_domain_lane(const ucp_ep_config_t *config, + const ucp_lane_index_t *lanes, + ucp_md_map_t dn_md_map); +ucs_status_t ucp_ep_set_domain_lanes(ucp_ep_h ep, ucp_mem_type_h mem_type_h); #endif diff --git a/src/ucp/core/ucp_mm.h b/src/ucp/core/ucp_mm.h index 82f035f3efa..c08c281f6fd 100644 --- a/src/ucp/core/ucp_mm.h +++ b/src/ucp/core/ucp_mm.h @@ -71,6 +71,7 @@ typedef struct ucp_mem_desc { typedef struct ucp_mem_type { ucp_md_map_t md_map; /* Which MDs have own ths addr Domain */ uct_memory_type_t id; /* memory type */ + ucp_lane_index_t eager_lane; } ucp_mem_type_t; void ucp_rkey_resolve_inner(ucp_rkey_h rkey, ucp_ep_h ep); diff --git a/src/ucp/dt/dt.c b/src/ucp/dt/dt.c index 418b9775446..44d8b88ec72 100644 --- a/src/ucp/dt/dt.c +++ b/src/ucp/dt/dt.c @@ -5,6 +5,7 @@ */ #include "dt.h" +#include size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src, @@ -44,3 +45,123 @@ size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src, state->offset += result_len; return result_len; } + +static UCS_F_ALWAYS_INLINE ucs_status_t ucp_dn_dt_unpack(ucp_request_t *req, void *buffer, size_t buffer_size, + const void *recv_data, size_t recv_length) +{ + ucs_status_t status; + ucp_worker_h worker = req->recv.worker; + ucp_context_h context = worker->context; + ucp_ep_h ep = ucp_worker_ep_find(worker, worker->uuid); + ucp_ep_config_t *config = ucp_ep_config(ep); + ucp_md_map_t dn_md_map = req->mem_type.md_map; + ucp_lane_index_t dn_lane; + ucp_rsc_index_t rsc_index; + uct_iface_attr_t *iface_attr; + unsigned md_index; + uct_mem_h memh; + uct_iov_t iov; + + if (recv_length == 0) { + return UCS_OK; + } + + while (1) { + dn_lane = ucp_config_find_domain_lane(config, config->key.domain_lanes, dn_md_map); + if (dn_lane == UCP_NULL_LANE) { + ucs_error("Not find address domain lane."); + return UCS_ERR_IO_ERROR; + } + rsc_index = ucp_ep_get_rsc_index(ep, dn_lane); + iface_attr = &worker->ifaces[rsc_index].attr; + md_index = config->key.lanes[dn_lane].dst_md_index; + if (!(iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY)) { + dn_md_map |= ~UCS_BIT(md_index); + continue; + } + break; + } + + + status = uct_md_mem_reg(context->tl_mds[md_index].md, buffer, buffer_size, + UCT_MD_MEM_ACCESS_REMOTE_PUT, &memh); + if (status != UCS_OK) { + ucs_error("Failed to reg address %p with md %s", buffer, + context->tl_mds[md_index].rsc.md_name); + return status; + } + + ucs_assert(buffer_size >= recv_length); + iov.buffer = (void *)recv_data; + iov.length = recv_length; + iov.count = 1; + iov.memh = UCT_MEM_HANDLE_NULL; + + + status = uct_ep_put_zcopy(ep->uct_eps[dn_lane], &iov, 1, (uint64_t)buffer, + (uct_rkey_t )memh, NULL); + if (status != UCS_OK) { + uct_md_mem_dereg(context->tl_mds[md_index].md, memh); + ucs_error("Failed to perform uct_ep_put_zcopy to address %p", recv_data); + return status; + } + + status = uct_md_mem_dereg(context->tl_mds[md_index].md, memh); + if (status != UCS_OK) { + ucs_error("Failed to dereg address %p with md %s", buffer, + context->tl_mds[md_index].rsc.md_name); + return status; + } + + return UCS_OK; +} + + +ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype, void *buffer, size_t buffer_size, + ucp_dt_state_t *state, const void *recv_data, size_t recv_length, int last) +{ + ucp_dt_generic_t *dt_gen; + size_t offset = state->offset; + ucs_status_t status; + + if (ucs_unlikely((recv_length + offset) > buffer_size)) { + ucs_trace_req("message truncated: recv_length %zu offset %zu buffer_size %zu", + recv_length, offset, buffer_size); + if (UCP_DT_IS_GENERIC(datatype) && last) { + ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state); + } + return UCS_ERR_MESSAGE_TRUNCATED; + } + + switch (datatype & UCP_DATATYPE_CLASS_MASK) { + case UCP_DATATYPE_CONTIG: + if (ucs_likely(UCP_IS_DEFAULT_MEMORY_TYPE(req->mem_type.id))) { + UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset, + recv_data, recv_length); + return UCS_OK; + } else { + return ucp_dn_dt_unpack(req, buffer, buffer_size, recv_data, recv_length); + } + + case UCP_DATATYPE_IOV: + UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt, + recv_data, recv_length, &state->dt.iov.iov_offset, + &state->dt.iov.iovcnt_offset); + return UCS_OK; + + case UCP_DATATYPE_GENERIC: + dt_gen = ucp_dt_generic(datatype); + status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, + state->dt.generic.state, offset, + recv_data, recv_length); + if (last) { + UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, + state->dt.generic.state); + } + return status; + + default: + ucs_error("unexpected datatype=%lx", datatype); + return UCS_ERR_INVALID_PARAM; + } +} diff --git a/src/ucp/dt/dt.h b/src/ucp/dt/dt.h index f35b03f75b3..82c83a84ce1 100644 --- a/src/ucp/dt/dt.h +++ b/src/ucp/dt/dt.h @@ -15,6 +15,7 @@ #include #include #include +#include /** @@ -72,51 +73,8 @@ size_t ucp_dt_length(ucp_datatype_t datatype, size_t count, size_t ucp_dt_pack(ucp_datatype_t datatype, void *dest, const void *src, ucp_dt_state_t *state, size_t length); -static UCS_F_ALWAYS_INLINE ucs_status_t -ucp_dt_unpack(ucp_datatype_t datatype, void *buffer, size_t buffer_size, - ucp_dt_state_t *state, const void *recv_data, - size_t recv_length, int last) -{ - ucp_dt_generic_t *dt_gen; - size_t offset = state->offset; - ucs_status_t status; - - if (ucs_unlikely((recv_length + offset) > buffer_size)) { - ucs_trace_req("message truncated: recv_length %zu offset %zu buffer_size %zu", - recv_length, offset, buffer_size); - if (UCP_DT_IS_GENERIC(datatype) && last) { - ucp_dt_generic(datatype)->ops.finish(state->dt.generic.state); - } - return UCS_ERR_MESSAGE_TRUNCATED; - } - - switch (datatype & UCP_DATATYPE_CLASS_MASK) { - case UCP_DATATYPE_CONTIG: - UCS_PROFILE_NAMED_CALL("memcpy_recv", memcpy, buffer + offset, - recv_data, recv_length); - return UCS_OK; - - case UCP_DATATYPE_IOV: - UCS_PROFILE_CALL(ucp_dt_iov_scatter, buffer, state->dt.iov.iovcnt, - recv_data, recv_length, &state->dt.iov.iov_offset, - &state->dt.iov.iovcnt_offset); - return UCS_OK; - - case UCP_DATATYPE_GENERIC: - dt_gen = ucp_dt_generic(datatype); - status = UCS_PROFILE_NAMED_CALL("dt_unpack", dt_gen->ops.unpack, - state->dt.generic.state, offset, - recv_data, recv_length); - if (last) { - UCS_PROFILE_NAMED_CALL_VOID("dt_finish", dt_gen->ops.finish, - state->dt.generic.state); - } - return status; - - default: - ucs_error("unexpected datatype=%lx", datatype); - return UCS_ERR_INVALID_PARAM; - } -} +ucs_status_t ucp_dt_unpack(ucp_request_t *req, ucp_datatype_t datatype, + void *buffer, size_t buffer_size, ucp_dt_state_t *state, + const void *recv_data, size_t recv_length, int last); #endif diff --git a/src/ucp/tag/eager.h b/src/ucp/tag/eager.h index 94202c2f477..2e1580cc326 100644 --- a/src/ucp/tag/eager.h +++ b/src/ucp/tag/eager.h @@ -101,7 +101,7 @@ static UCS_F_ALWAYS_INLINE ucs_status_t ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag, unsigned flags, void *buffer, size_t count, ucp_datatype_t datatype, ucp_dt_state_t *state, - ucp_tag_recv_info_t *info) + ucp_request_t *req, ucp_tag_recv_info_t *info) { size_t recv_len, hdr_len; ucs_status_t status; @@ -110,7 +110,7 @@ ucp_eager_unexp_match(ucp_worker_h worker, ucp_recv_desc_t *rdesc, ucp_tag_t tag UCP_WORKER_STAT_EAGER_CHUNK(worker, UNEXP); hdr_len = rdesc->hdr_len; recv_len = rdesc->length - hdr_len; - status = ucp_dt_unpack(datatype, buffer, count, state, data + hdr_len, + status = ucp_dt_unpack(req, datatype, buffer, count, state, data + hdr_len, recv_len, flags & UCP_RECV_DESC_FLAG_LAST); state->offset += recv_len; diff --git a/src/ucp/tag/eager_rcv.c b/src/ucp/tag/eager_rcv.c index 742c43e18a0..1a2138aee4c 100644 --- a/src/ucp/tag/eager_rcv.c +++ b/src/ucp/tag/eager_rcv.c @@ -71,7 +71,7 @@ ucp_eager_handler(void *arg, void *data, size_t length, unsigned am_flags, if (req != NULL) { UCS_PROFILE_REQUEST_EVENT(req, "eager_recv", recv_len); - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, + status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer, req->recv.length, &req->recv.state, data + hdr_len, recv_len, flags & UCP_RECV_DESC_FLAG_LAST); diff --git a/src/ucp/tag/offload.c b/src/ucp/tag/offload.c index 91142df2d6d..550c4f30e1e 100644 --- a/src/ucp/tag/offload.c +++ b/src/ucp/tag/offload.c @@ -64,7 +64,7 @@ void ucp_tag_offload_completed(uct_tag_context_t *self, uct_tag_t stag, } if (req->recv.rdesc != NULL) { - status = ucp_dt_unpack(req->recv.datatype, req->recv.buffer, req->recv.length, + status = ucp_dt_unpack(req, req->recv.datatype, req->recv.buffer, req->recv.length, &req->recv.state, req->recv.rdesc + 1, length, 1); ucs_mpool_put_inline(req->recv.rdesc); } else { diff --git a/src/ucp/tag/rndv.c b/src/ucp/tag/rndv.c index 548d3c4c10a..05cad2ce386 100644 --- a/src/ucp/tag/rndv.c +++ b/src/ucp/tag/rndv.c @@ -731,7 +731,7 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_handler, } UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, + status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer, rreq->recv.length, &rreq->recv.state, data + hdr_len, recv_len, 0); if ((status == UCS_OK) || (status == UCS_INPROGRESS)) { @@ -764,9 +764,9 @@ UCS_PROFILE_FUNC(ucs_status_t, ucp_rndv_data_last_handler, /* Check that total received length matches RTS->length */ ucs_assert(rreq->recv.info.length == rreq->recv.state.offset + recv_len); UCS_PROFILE_REQUEST_EVENT(rreq, "rndv_data_last_recv", recv_len); - status = ucp_dt_unpack(rreq->recv.datatype, rreq->recv.buffer, - rreq->recv.length, &rreq->recv.state, - data + hdr_len, recv_len, 1); + status = ucp_dt_unpack(rreq, rreq->recv.datatype, rreq->recv.buffer, + rreq->recv.length, &rreq->recv.state, + data + hdr_len, recv_len, 1); } else { ucs_trace_data("drop last segment for rreq %p, length %zu, status %s", rreq, recv_len, ucs_status_string(rreq->status)); diff --git a/src/ucp/tag/tag_recv.c b/src/ucp/tag/tag_recv.c index e9413eefd11..14d772a8cdf 100644 --- a/src/ucp/tag/tag_recv.c +++ b/src/ucp/tag/tag_recv.c @@ -85,7 +85,7 @@ ucp_tag_search_unexp(ucp_worker_h worker, void *buffer, size_t buffer_size, UCS_PROFILE_REQUEST_EVENT(req, "eager_match", 0); status = ucp_eager_unexp_match(worker, rdesc, recv_tag, flags, buffer, buffer_size, datatype, - &req->recv.state, info); + &req->recv.state, req, info); ucs_trace_req("release receive descriptor %p", rdesc); if (status != UCS_INPROGRESS) { goto out_release_desc; @@ -128,6 +128,8 @@ ucp_tag_recv_request_init(ucp_request_t *req, ucp_worker_h worker, void* buffer, req->recv.state.offset = 0; req->recv.worker = worker; + ucp_addr_domain_detect_mds(worker->context, buffer, &req->mem_type); + switch (datatype & UCP_DATATYPE_CLASS_MASK) { case UCP_DATATYPE_IOV: req->recv.state.dt.iov.iov_offset = 0; diff --git a/src/ucp/tag/tag_send.c b/src/ucp/tag/tag_send.c index 9b7326e3d98..86bec096422 100644 --- a/src/ucp/tag/tag_send.c +++ b/src/ucp/tag/tag_send.c @@ -202,7 +202,8 @@ ucp_tag_send_req(ucp_request_t *req, size_t count, ssize_t max_short, static void ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, const void* buffer, uintptr_t datatype, - ucp_tag_t tag, uint16_t flags) + ucp_tag_t tag, uint16_t flags, + ucp_mem_type_t mem_type) { req->flags = flags; req->send.ep = ep; @@ -211,6 +212,7 @@ static void ucp_tag_send_req_init(ucp_request_t* req, ucp_ep_h ep, req->send.tag = tag; req->send.reg_rsc = UCP_NULL_RESOURCE; req->send.state.offset = 0; + req->mem_type = mem_type; VALGRIND_MAKE_MEM_UNDEFINED(&req->send.uct_comp.count, sizeof(req->send.uct_comp.count)); @@ -228,13 +230,20 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, ucp_request_t *req; size_t length; ucs_status_ptr_t ret; + ucp_mem_type_t mem_type; UCP_THREAD_CS_ENTER_CONDITIONAL(&ep->worker->mt_lock); ucs_trace_req("send_nb buffer %p count %zu tag %"PRIx64" to %s cb %p", buffer, count, tag, ucp_ep_peer_name(ep), cb); - if (ucs_likely(UCP_DT_IS_CONTIG(datatype))) { + ucp_addr_domain_detect_mds(ep->worker->context, (void *)buffer, &mem_type); + if (ucs_likely(!UCP_IS_DEFAULT_MEMORY_TYPE(mem_type.id))) { + ucp_ep_set_domain_lanes(ep, &mem_type); + } + + if (ucs_likely(UCP_IS_DEFAULT_MEMORY_TYPE(mem_type.id)) && + ucs_likely(UCP_DT_IS_CONTIG(datatype))) { length = ucp_contig_dt_length(datatype, count); if (ucs_likely((ssize_t)length <= ucp_ep_config(ep)->tag.eager.max_short)) { status = UCS_PROFILE_CALL(ucp_tag_send_eager_short, ep, tag, buffer, @@ -253,11 +262,15 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_nb, goto out; } - ucp_tag_send_req_init(req, ep, buffer, datatype, tag, 0); + ucp_tag_send_req_init(req, ep, buffer, datatype, tag, 0, mem_type); ret = ucp_tag_send_req(req, count, - ucp_ep_config(ep)->tag.eager.max_short, - ucp_ep_config(ep)->tag.eager.zcopy_thresh, + ucs_likely(UCP_IS_DEFAULT_MEMORY_TYPE(mem_type.id)) ? + ucp_ep_config(ep)->tag.eager.max_short : + ucp_ep_config(ep)->domain[mem_type.eager_lane].tag.eager.max_short, + ucs_likely(UCP_IS_DEFAULT_MEMORY_TYPE(mem_type.id)) ? + ucp_ep_config(ep)->tag.eager.zcopy_thresh : + ucp_ep_config(ep)->domain[mem_type.eager_lane].tag.eager.zcopy_thresh, ucp_ep_config(ep)->tag.rndv.rma_thresh, ucp_ep_config(ep)->tag.rndv.am_thresh, cb, ucp_ep_config(ep)->tag.proto); @@ -293,7 +306,9 @@ UCS_PROFILE_FUNC(ucs_status_ptr_t, ucp_tag_send_sync_nb, /* Remote side needs to send reply, so have it connect to us */ ucp_ep_connect_remote(ep); - ucp_tag_send_req_init(req, ep, buffer, datatype, tag, UCP_REQUEST_FLAG_SYNC); + + ucp_tag_send_req_init(req, ep, buffer, datatype, tag, UCP_REQUEST_FLAG_SYNC, + ucp_mem_type_dummy_handle); ret = ucp_tag_send_req(req, count, -1, /* disable short method */ diff --git a/src/ucp/wireup/address.c b/src/ucp/wireup/address.c index 43cdaca9de3..5509608ca05 100644 --- a/src/ucp/wireup/address.c +++ b/src/ucp/wireup/address.c @@ -325,7 +325,7 @@ static ucs_status_t ucp_address_do_pack(ucp_worker_h worker, ucp_ep_h ep, *(uint8_t*)ptr = md_index | ((dev->tl_bitmap == 0) ? UCP_ADDRESS_FLAG_EMPTY : 0) | ((md_flags & UCT_MD_FLAG_ALLOC) ? UCP_ADDRESS_FLAG_MD_ALLOC : 0) | - ((md_flags & UCT_MD_FLAG_REG) ? UCP_ADDRESS_FLAG_MD_REG : 0); + ((md_flags & UCT_MD_FLAG_REG) ? UCP_ADDRESS_FLAG_MD_REG : 0) ; ++ptr; /* Device address length */ diff --git a/src/ucp/wireup/select.c b/src/ucp/wireup/select.c index 1f6ae4457b1..b86fa92a7f2 100644 --- a/src/ucp/wireup/select.c +++ b/src/ucp/wireup/select.c @@ -10,17 +10,19 @@ #include #include #include +#include #include #include #define UCP_WIREUP_RNDV_TEST_MSG_SIZE 262144 enum { - UCP_WIREUP_LANE_USAGE_AM = UCS_BIT(0), - UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1), - UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2), - UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3), - UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4) + UCP_WIREUP_LANE_USAGE_AM = UCS_BIT(0), + UCP_WIREUP_LANE_USAGE_RMA = UCS_BIT(1), + UCP_WIREUP_LANE_USAGE_AMO = UCS_BIT(2), + UCP_WIREUP_LANE_USAGE_RNDV = UCS_BIT(3), + UCP_WIREUP_LANE_USAGE_TAG = UCS_BIT(4), + UCP_WIREUP_LANE_USAGE_DOMAIN = UCS_BIT(5) }; @@ -32,6 +34,7 @@ typedef struct { uint32_t usage; double rma_score; double amo_score; + double domain_score; } ucp_wireup_lane_desc_t; @@ -361,6 +364,7 @@ ucp_wireup_add_lane_desc(ucp_wireup_lane_desc_t *lane_descs, lane_desc->usage = usage; lane_desc->rma_score = 0.0; lane_desc->amo_score = 0.0; + lane_desc->domain_score = 0.0; out_update_score: if (usage & UCP_WIREUP_LANE_USAGE_RMA) { @@ -369,6 +373,9 @@ ucp_wireup_add_lane_desc(ucp_wireup_lane_desc_t *lane_descs, if (usage & UCP_WIREUP_LANE_USAGE_AMO) { lane_desc->amo_score = score; } + if (usage & UCP_WIREUP_LANE_USAGE_DOMAIN) { + lane_desc->domain_score = score; + } } #define UCP_WIREUP_COMPARE_SCORE(_elem1, _elem2, _arg, _token) \ @@ -396,6 +403,12 @@ static int ucp_wireup_compare_lane_amo_score(const void *elem1, const void *elem return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, amo); } +static int ucp_wireup_compare_lane_domain_score(const void *elem1, const void *elem2, + void *arg) +{ + return UCP_WIREUP_COMPARE_SCORE(elem1, elem2, arg, amo); +} + static UCS_F_NOINLINE ucs_status_t ucp_wireup_add_memaccess_lanes(ucp_ep_h ep, unsigned address_count, const ucp_address_entry_t *address_list, @@ -548,6 +561,163 @@ static ucs_status_t ucp_wireup_add_rma_lanes(ucp_ep_h ep, const ucp_ep_params_t -1, UCP_WIREUP_LANE_USAGE_RMA); } +ucp_lane_index_t ucp_config_find_domain_lane(const ucp_ep_config_t *config, + const ucp_lane_index_t *lanes, + ucp_md_map_t dn_md_map) +{ + ucp_md_index_t dst_md_index; + ucp_lane_index_t lane; + ucp_md_map_t dst_md_mask; + int prio; + + for (prio = 0; prio < UCP_MAX_LANES; ++prio) { + lane = lanes[prio]; + if (lane == UCP_NULL_LANE) { + return UCP_NULL_LANE; /* No more lanes */ + } + + dst_md_index = config->key.lanes[lane].dst_md_index; + dst_md_mask = UCS_BIT(dst_md_index); + if (dn_md_map & dst_md_mask) { + return lane; + } + } + return UCP_NULL_LANE; +} + +ucs_status_t ucp_ep_set_domain_lanes(ucp_ep_h ep, ucp_mem_type_h mem_type_h) +{ + ucp_rsc_index_t rsc_index; + uct_iface_attr_t *iface_attr; + ucp_md_map_t dn_md_map; + ucp_lane_index_t dn_lane; + ucp_md_index_t md_index; + + dn_md_map = mem_type_h->md_map; + + while (1) { + dn_lane = ucp_config_find_domain_lane(ucp_ep_config(ep), + ucp_ep_config(ep)->key.domain_lanes, dn_md_map); + if (dn_lane == UCP_NULL_LANE) { + ucs_error("Not find address domain lane."); + return UCS_ERR_IO_ERROR; + } + rsc_index = ucp_ep_get_rsc_index(ep, dn_lane); + iface_attr = &ep->worker->ifaces[rsc_index].attr; + md_index = ucp_ep_config(ep)->key.lanes[dn_lane].dst_md_index; + if (iface_attr->cap.flags & UCT_IFACE_FLAG_PUT_ZCOPY) { + mem_type_h->eager_lane = dn_lane; + } + /*TODO: revisit cap flags for rndv lane*/ + /*if (iface_attr->cap.flags & UCT_IFACE_FLAG_GET_ZCOPY) { + *mem_type_h->rndv_lane = dn_lane + }*/ + dn_md_map |= ~UCS_BIT(md_index); + if (mem_type_h->eager_lane != UCP_NULL_LANE || dn_md_map == 0) { + break; + } + } + + return UCS_OK; +} + + +double ucp_wireup_addr_domain_score_func(ucp_context_h context, + const uct_md_attr_t *md_attr, + const uct_iface_attr_t *iface_attr, + const ucp_address_iface_attr_t *remote_iface_attr) +{ + /* best end-to-end latency and larger bcopy size */ + return (1e-3 / (ucp_wireup_tl_iface_latency(context, iface_attr, remote_iface_attr) + + iface_attr->overhead + remote_iface_attr->overhead)); +} + +static UCS_F_NOINLINE ucs_status_t +ucp_wireup_add_addr_domain_lanes(ucp_ep_h ep, unsigned address_count, + const ucp_address_entry_t *address_list, + ucp_wireup_lane_desc_t *lane_descs, + ucp_lane_index_t *num_lanes_p, + const ucp_wireup_criteria_t *criteria, + uint64_t tl_bitmap, uint32_t usage) +{ + ucp_address_entry_t *address_list_copy; + ucp_rsc_index_t rsc_index, dst_md_index; + size_t address_list_size; + double score; + uint64_t remote_md_map; + unsigned addr_index; + ucs_status_t status; + + remote_md_map = -1; + + /* Create a copy of the address list */ + address_list_size = sizeof(*address_list_copy) * address_count; + address_list_copy = ucs_malloc(address_list_size, "rma address list"); + if (address_list_copy == NULL) { + status = UCS_ERR_NO_MEMORY; + goto out; + } + + memcpy(address_list_copy, address_list, address_list_size); + + status = ucp_wireup_select_transport(ep, address_list_copy, address_count, + criteria, tl_bitmap, remote_md_map, + 0, &rsc_index, &addr_index, &score); + if (status != UCS_OK) { + goto out_free_address_list; + } + + dst_md_index = address_list_copy[addr_index].md_index; + + /* Add to the list of lanes and remove all occurrences of the remote md + * from the address list, to avoid selecting the same remote md again.*/ + ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, + dst_md_index, score, usage, 0); + remote_md_map &= ~UCS_BIT(dst_md_index); + + while (address_count > 0) { + status = ucp_wireup_select_transport(ep, address_list_copy, address_count, + criteria, tl_bitmap, remote_md_map, + 0, &rsc_index, &addr_index, &score); + if (status != UCS_OK) { + break; + } + + /* Add lane description and remove all occurrences of the remote md */ + dst_md_index = address_list_copy[addr_index].md_index; + ucp_wireup_add_lane_desc(lane_descs, num_lanes_p, rsc_index, addr_index, + dst_md_index, score, usage, 0); + remote_md_map &= ~UCS_BIT(dst_md_index); + } + + status = UCS_OK; + +out_free_address_list: + ucs_free(address_list_copy); +out: + return UCS_OK; +} +static ucs_status_t ucp_wireup_add_domain_lane(ucp_ep_h ep, const ucp_ep_params_t *params, + unsigned address_count, + const ucp_address_entry_t *address_list, + ucp_wireup_lane_desc_t *lane_descs, + ucp_lane_index_t *num_lanes_p) +{ + ucp_wireup_criteria_t criteria; + + criteria.title = "adress domain"; + criteria.local_md_flags = 0; + criteria.remote_md_flags = 0; + criteria.remote_iface_flags = UCT_IFACE_FLAG_CONNECT_TO_IFACE; + criteria.local_iface_flags = criteria.remote_iface_flags; + criteria.calc_score = ucp_wireup_addr_domain_score_func; + ucp_wireup_fill_ep_params_criteria(&criteria, params); + + return ucp_wireup_add_addr_domain_lanes(ep, address_count, address_list, + lane_descs, num_lanes_p, &criteria, + -1, UCP_WIREUP_LANE_USAGE_DOMAIN); +} + double ucp_wireup_amo_score_func(ucp_context_h context, const uct_md_attr_t *md_attr, const uct_iface_attr_t *iface_attr, @@ -899,6 +1069,12 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, return status; } + status = ucp_wireup_add_domain_lane(ep, params, address_count, address_list, + lane_descs, &key->num_lanes); + if (status != UCS_OK) { + return status; + } + /* User should not create endpoints unless requested communication features */ if (key->num_lanes == 0) { ucs_error("No transports selected to %s (features: 0x%lx)", @@ -936,6 +1112,9 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, ucs_assert(key->tag_lane == UCP_NULL_LANE); key->tag_lane = lane; } + if (lane_descs[lane].usage & UCP_WIREUP_LANE_USAGE_DOMAIN) { + key->domain_lanes[lane] = lane; + } } /* Sort RMA and AMO lanes according to score */ @@ -943,6 +1122,9 @@ ucs_status_t ucp_wireup_select_lanes(ucp_ep_h ep, const ucp_ep_params_t *params, ucp_wireup_compare_lane_rma_score, lane_descs); ucs_qsort_r(key->amo_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t), ucp_wireup_compare_lane_amo_score, lane_descs); + ucs_qsort_r(key->domain_lanes, UCP_MAX_LANES, sizeof(ucp_lane_index_t), + ucp_wireup_compare_lane_domain_score, lane_descs); + /* Get all reachable MDs from full remote address list */ key->reachable_md_map = ucp_wireup_get_reachable_mds(worker, address_count,