diff --git a/ompi/mca/coll/ucc/coll_ucc_allgather.c b/ompi/mca/coll/ucc/coll_ucc_allgather.c index 410323c9b10..d38a62f55f7 100644 --- a/ompi/mca/coll/ucc/coll_ucc_allgather.c +++ b/ompi/mca/coll/ucc/coll_ucc_allgather.c @@ -11,6 +11,7 @@ static inline ucc_status_t mca_coll_ucc_allgather_init(const void *sbuf, size_t scount, struct ompi_datatype_t *sdtype, void* rbuf, size_t rcount, struct ompi_datatype_t *rdtype, + bool blocking, mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) @@ -34,6 +35,7 @@ static inline ucc_status_t mca_coll_ucc_allgather_init(const void *sbuf, size_t ucc_coll_args_t coll = { .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_ALLGATHER, .src.info = { .buffer = (void*)sbuf, @@ -53,6 +55,10 @@ static inline ucc_status_t mca_coll_ucc_allgather_init(const void *sbuf, size_t coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -70,7 +76,7 @@ int mca_coll_ucc_allgather(const void *sbuf, size_t scount, struct ompi_datatype UCC_VERBOSE(3, "running ucc allgather"); COLL_UCC_CHECK(mca_coll_ucc_allgather_init(sbuf, scount, sdtype, rbuf, rcount, rdtype, - ucc_module, &req, NULL)); + true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -94,7 +100,7 @@ int mca_coll_ucc_iallgather(const void *sbuf, size_t scount, struct ompi_datatyp COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_allgather_init(sbuf, scount, sdtype, rbuf, rcount, rdtype, - ucc_module, &req, coll_req)); + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_allgatherv.c b/ompi/mca/coll/ucc/coll_ucc_allgatherv.c index 6cf33a5dd80..4bf87bcb84a 100644 --- a/ompi/mca/coll/ucc/coll_ucc_allgatherv.c +++ b/ompi/mca/coll/ucc/coll_ucc_allgatherv.c @@ -13,6 +13,7 @@ static inline ucc_status_t mca_coll_ucc_allgatherv_init(const void *sbuf, int sc struct ompi_datatype_t *sdtype, void* rbuf, const int *rcounts, const int *rdisps, struct ompi_datatype_t *rdtype, + bool blocking, mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) @@ -31,6 +32,7 @@ static inline ucc_status_t mca_coll_ucc_allgatherv_init(const void *sbuf, int sc ucc_coll_args_t coll = { .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_ALLGATHERV, .src.info = { .buffer = (void*)sbuf, @@ -51,6 +53,10 @@ static inline ucc_status_t mca_coll_ucc_allgatherv_init(const void *sbuf, int sc coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -71,7 +77,7 @@ int mca_coll_ucc_allgatherv(const void *sbuf, int scount, COLL_UCC_CHECK(mca_coll_ucc_allgatherv_init(sbuf, scount, sdtype, rbuf, rcounts, rdisps, rdtype, - ucc_module, &req, NULL)); + true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -98,7 +104,7 @@ int mca_coll_ucc_iallgatherv(const void *sbuf, int scount, COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_allgatherv_init(sbuf, scount, sdtype, rbuf, rcounts, rdisps, rdtype, - ucc_module, &req, coll_req)); + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_allreduce.c b/ompi/mca/coll/ucc/coll_ucc_allreduce.c index 5320641cb8b..f99a4659091 100644 --- a/ompi/mca/coll/ucc/coll_ucc_allreduce.c +++ b/ompi/mca/coll/ucc/coll_ucc_allreduce.c @@ -11,7 +11,9 @@ static inline ucc_status_t mca_coll_ucc_allreduce_init(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, - struct ompi_op_t *op, mca_coll_ucc_module_t *ucc_module, + struct ompi_op_t *op, + bool blocking, + mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) { @@ -32,6 +34,7 @@ static inline ucc_status_t mca_coll_ucc_allreduce_init(const void *sbuf, void *r } ucc_coll_args_t coll = { .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_ALLREDUCE, .src.info = { .buffer = (void*)sbuf, @@ -51,6 +54,10 @@ static inline ucc_status_t mca_coll_ucc_allreduce_init(const void *sbuf, void *r coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -67,7 +74,7 @@ int mca_coll_ucc_allreduce(const void *sbuf, void *rbuf, size_t count, UCC_VERBOSE(3, "running ucc allreduce"); COLL_UCC_CHECK(mca_coll_ucc_allreduce_init(sbuf, rbuf, count, dtype, op, - ucc_module, &req, NULL)); + true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -90,7 +97,7 @@ int mca_coll_ucc_iallreduce(const void *sbuf, void *rbuf, size_t count, UCC_VERBOSE(3, "running ucc iallreduce"); COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_allreduce_init(sbuf, rbuf, count, dtype, op, - ucc_module, &req, coll_req)); + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_alltoall.c b/ompi/mca/coll/ucc/coll_ucc_alltoall.c index b71f183fac8..a90b1eddf59 100644 --- a/ompi/mca/coll/ucc/coll_ucc_alltoall.c +++ b/ompi/mca/coll/ucc/coll_ucc_alltoall.c @@ -11,6 +11,7 @@ static inline ucc_status_t mca_coll_ucc_alltoall_init(const void *sbuf, size_t scount, struct ompi_datatype_t *sdtype, void* rbuf, size_t rcount, struct ompi_datatype_t *rdtype, + bool blocking, mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) @@ -34,6 +35,7 @@ static inline ucc_status_t mca_coll_ucc_alltoall_init(const void *sbuf, size_t s ucc_coll_args_t coll = { .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_ALLTOALL, .src.info = { .buffer = (void*)sbuf, @@ -53,6 +55,10 @@ static inline ucc_status_t mca_coll_ucc_alltoall_init(const void *sbuf, size_t s coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -70,6 +76,7 @@ int mca_coll_ucc_alltoall(const void *sbuf, size_t scount, struct ompi_datatype_ UCC_VERBOSE(3, "running ucc alltoall"); COLL_UCC_CHECK(mca_coll_ucc_alltoall_init(sbuf, scount, sdtype, rbuf, rcount, rdtype, + true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); @@ -94,6 +101,7 @@ int mca_coll_ucc_ialltoall(const void *sbuf, size_t scount, struct ompi_datatype COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_alltoall_init(sbuf, scount, sdtype, rbuf, rcount, rdtype, + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; diff --git a/ompi/mca/coll/ucc/coll_ucc_alltoallv.c b/ompi/mca/coll/ucc/coll_ucc_alltoallv.c index 75b0dd6b6b7..5e969f46c04 100644 --- a/ompi/mca/coll/ucc/coll_ucc_alltoallv.c +++ b/ompi/mca/coll/ucc/coll_ucc_alltoallv.c @@ -13,6 +13,7 @@ static inline ucc_status_t mca_coll_ucc_alltoallv_init(const void *sbuf, const i const int *sdisps, struct ompi_datatype_t *sdtype, void* rbuf, const int *rcounts, const int *rdisps, struct ompi_datatype_t *rdtype, + bool blocking, mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) @@ -31,6 +32,7 @@ static inline ucc_status_t mca_coll_ucc_alltoallv_init(const void *sbuf, const i ucc_coll_args_t coll = { .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_ALLTOALLV, .src.info_v = { .buffer = (void*)sbuf, @@ -52,6 +54,10 @@ static inline ucc_status_t mca_coll_ucc_alltoallv_init(const void *sbuf, const i coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -72,7 +78,7 @@ int mca_coll_ucc_alltoallv(const void *sbuf, const int *scounts, COLL_UCC_CHECK(mca_coll_ucc_alltoallv_init(sbuf, scounts, sdisps, sdtype, rbuf, rcounts, rdisps, rdtype, - ucc_module, &req, NULL)); + true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -99,7 +105,7 @@ int mca_coll_ucc_ialltoallv(const void *sbuf, const int *scounts, COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_alltoallv_init(sbuf, scounts, sdisps, sdtype, rbuf, rcounts, rdisps, rdtype, - ucc_module, &req, coll_req)); + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_barrier.c b/ompi/mca/coll/ucc/coll_ucc_barrier.c index fdbc11b49aa..6e338a1ef90 100644 --- a/ompi/mca/coll/ucc/coll_ucc_barrier.c +++ b/ompi/mca/coll/ucc/coll_ucc_barrier.c @@ -8,7 +8,8 @@ #include "coll_ucc_common.h" -static inline ucc_status_t mca_coll_ucc_barrier_init(mca_coll_ucc_module_t *ucc_module, +static inline ucc_status_t mca_coll_ucc_barrier_init(bool blocking, + mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) { @@ -16,6 +17,10 @@ static inline ucc_status_t mca_coll_ucc_barrier_init(mca_coll_ucc_module_t *ucc_ .mask = 0, .coll_type = UCC_COLL_TYPE_BARRIER }; + if (blocking) { + coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags = UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -29,7 +34,7 @@ int mca_coll_ucc_barrier(struct ompi_communicator_t *comm, ucc_coll_req_h req; UCC_VERBOSE(3, "running ucc barrier"); - COLL_UCC_CHECK(mca_coll_ucc_barrier_init(ucc_module, &req, NULL)); + COLL_UCC_CHECK(mca_coll_ucc_barrier_init(true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -48,7 +53,7 @@ int mca_coll_ucc_ibarrier(struct ompi_communicator_t *comm, UCC_VERBOSE(3, "running ucc ibarrier"); COLL_UCC_GET_REQ(coll_req); - COLL_UCC_CHECK(mca_coll_ucc_barrier_init(ucc_module, &req, coll_req)); + COLL_UCC_CHECK(mca_coll_ucc_barrier_init(false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_bcast.c b/ompi/mca/coll/ucc/coll_ucc_bcast.c index 425e6869c8e..1b108af08ce 100644 --- a/ompi/mca/coll/ucc/coll_ucc_bcast.c +++ b/ompi/mca/coll/ucc/coll_ucc_bcast.c @@ -9,7 +9,9 @@ #include "coll_ucc_common.h" static inline ucc_status_t mca_coll_ucc_bcast_init(void *buf, size_t count, struct ompi_datatype_t *dtype, - int root, mca_coll_ucc_module_t *ucc_module, + int root, + bool blocking, + mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) { @@ -30,6 +32,10 @@ static inline ucc_status_t mca_coll_ucc_bcast_init(void *buf, size_t count, stru .mem_type = UCC_MEMORY_TYPE_UNKNOWN } }; + if (blocking) { + coll.mask = UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags = UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -43,7 +49,7 @@ int mca_coll_ucc_bcast(void *buf, size_t count, struct ompi_datatype_t *dtype, mca_coll_ucc_module_t *ucc_module = (mca_coll_ucc_module_t*)module; ucc_coll_req_h req; UCC_VERBOSE(3, "running ucc bcast"); - COLL_UCC_CHECK(mca_coll_ucc_bcast_init(buf, count, dtype, root, + COLL_UCC_CHECK(mca_coll_ucc_bcast_init(buf, count, dtype, root, true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); @@ -65,7 +71,7 @@ int mca_coll_ucc_ibcast(void *buf, size_t count, struct ompi_datatype_t *dtype, UCC_VERBOSE(3, "running ucc ibcast"); COLL_UCC_GET_REQ(coll_req); - COLL_UCC_CHECK(mca_coll_ucc_bcast_init(buf, count, dtype, root, + COLL_UCC_CHECK(mca_coll_ucc_bcast_init(buf, count, dtype, root, false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; diff --git a/ompi/mca/coll/ucc/coll_ucc_gatherv.c b/ompi/mca/coll/ucc/coll_ucc_gatherv.c index 81474d0c38a..389bc3d6e18 100644 --- a/ompi/mca/coll/ucc/coll_ucc_gatherv.c +++ b/ompi/mca/coll/ucc/coll_ucc_gatherv.c @@ -19,7 +19,6 @@ static inline ucc_status_t mca_coll_ucc_gatherv_init(const void *sbuf, int scoun { ucc_datatype_t ucc_sdt, ucc_rdt; int comm_rank = ompi_comm_rank(ucc_module->comm); - int comm_size = ompi_comm_size(ucc_module->comm); ucc_sdt = ompi_dtype_to_ucc_dtype(sdtype); if (comm_rank == root) { diff --git a/ompi/mca/coll/ucc/coll_ucc_module.c b/ompi/mca/coll/ucc/coll_ucc_module.c index 8860bb0ba2c..3916650b949 100644 --- a/ompi/mca/coll/ucc/coll_ucc_module.c +++ b/ompi/mca/coll/ucc/coll_ucc_module.c @@ -93,7 +93,7 @@ static void mca_coll_ucc_module_construct(mca_coll_ucc_module_t *ucc_module) mca_coll_ucc_module_clear(ucc_module); } -int mca_coll_ucc_progress(void) +static int mca_coll_ucc_progress(void) { ucc_context_progress(mca_coll_ucc_component.ucc_context); return OPAL_SUCCESS; diff --git a/ompi/mca/coll/ucc/coll_ucc_reduce.c b/ompi/mca/coll/ucc/coll_ucc_reduce.c index c37be5413ff..df78bb28767 100644 --- a/ompi/mca/coll/ucc/coll_ucc_reduce.c +++ b/ompi/mca/coll/ucc/coll_ucc_reduce.c @@ -11,6 +11,7 @@ static inline ucc_status_t mca_coll_ucc_reduce_init(const void *sbuf, void *rbuf, size_t count, struct ompi_datatype_t *dtype, struct ompi_op_t *op, int root, + bool blocking, mca_coll_ucc_module_t *ucc_module, ucc_coll_req_h *req, mca_coll_ucc_req_t *coll_req) @@ -31,7 +32,8 @@ static inline ucc_status_t mca_coll_ucc_reduce_init(const void *sbuf, void *rbuf goto fallback; } ucc_coll_args_t coll = { - .mask = 0, + .mask = 0, + .flags = 0, .coll_type = UCC_COLL_TYPE_REDUCE, .root = root, .src.info = { @@ -52,6 +54,10 @@ static inline ucc_status_t mca_coll_ucc_reduce_init(const void *sbuf, void *rbuf coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; coll.flags = UCC_COLL_ARGS_FLAG_IN_PLACE; } + if (blocking) { + coll.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + coll.flags |= UCC_COLL_ARGS_HINT_OPTIMIZE_LATENCY; + } COLL_UCC_REQ_INIT(coll_req, req, coll, ucc_module); return UCC_OK; fallback: @@ -69,7 +75,7 @@ int mca_coll_ucc_reduce(const void *sbuf, void* rbuf, size_t count, UCC_VERBOSE(3, "running ucc reduce"); COLL_UCC_CHECK(mca_coll_ucc_reduce_init(sbuf, rbuf, count, dtype, op, - root, ucc_module, &req, NULL)); + root, true, ucc_module, &req, NULL)); COLL_UCC_POST_AND_CHECK(req); COLL_UCC_CHECK(coll_ucc_req_wait(req)); return OMPI_SUCCESS; @@ -93,7 +99,7 @@ int mca_coll_ucc_ireduce(const void *sbuf, void* rbuf, size_t count, UCC_VERBOSE(3, "running ucc ireduce"); COLL_UCC_GET_REQ(coll_req); COLL_UCC_CHECK(mca_coll_ucc_reduce_init(sbuf, rbuf, count, dtype, op, root, - ucc_module, &req, coll_req)); + false, ucc_module, &req, coll_req)); COLL_UCC_POST_AND_CHECK(req); *request = &coll_req->super; return OMPI_SUCCESS;