diff --git a/src/components/tl/ucp/Makefile.am b/src/components/tl/ucp/Makefile.am index 02270feaa7..6bdaa3c22e 100644 --- a/src/components/tl/ucp/Makefile.am +++ b/src/components/tl/ucp/Makefile.am @@ -78,11 +78,12 @@ gatherv = \ gatherv/gatherv.c \ gatherv/gatherv_linear.c -reduce = \ - reduce/reduce.h \ - reduce/reduce.c \ - reduce/reduce_knomial.c \ - reduce/reduce_dbt.c +reduce = \ + reduce/reduce.h \ + reduce/reduce.c \ + reduce/reduce_knomial.c \ + reduce/reduce_dbt.c \ + reduce/reduce_srg_knomial.c reduce_scatter = \ reduce_scatter/reduce_scatter.h \ diff --git a/src/components/tl/ucp/reduce/reduce.c b/src/components/tl/ucp/reduce/reduce.c index 8a9fb3b74b..d6be59421e 100644 --- a/src/components/tl/ucp/reduce/reduce.c +++ b/src/components/tl/ucp/reduce/reduce.c @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -18,6 +18,11 @@ ucc_base_coll_alg_info_t .name = "dbt", .desc = "reduce over double binary tree where a leaf in one tree " "will be intermediate in other (optimized for BW)"}, + [UCC_TL_UCP_REDUCE_ALG_SRG] = + {.id = UCC_TL_UCP_REDUCE_ALG_SRG, + .name = "srg", + .desc = "recursive knomial scatter-reduce followed by knomial " + "gather"}, [UCC_TL_UCP_REDUCE_ALG_LAST] = { .id = 0, .name = NULL, .desc = NULL}}; diff --git a/src/components/tl/ucp/reduce/reduce.h b/src/components/tl/ucp/reduce/reduce.h index 98bc183ff3..fbc3dd5453 100644 --- a/src/components/tl/ucp/reduce/reduce.h +++ b/src/components/tl/ucp/reduce/reduce.h @@ -1,5 +1,5 @@ /** - * Copyright (c) 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * Copyright (c) 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. * * See file LICENSE for terms. */ @@ -10,6 +10,7 @@ enum { UCC_TL_UCP_REDUCE_ALG_KNOMIAL, UCC_TL_UCP_REDUCE_ALG_DBT, + UCC_TL_UCP_REDUCE_ALG_SRG, UCC_TL_UCP_REDUCE_ALG_LAST }; @@ -17,7 +18,7 @@ extern ucc_base_coll_alg_info_t ucc_tl_ucp_reduce_algs[UCC_TL_UCP_REDUCE_ALG_LAST + 1]; #define UCC_TL_UCP_REDUCE_DEFAULT_ALG_SELECT_STR \ - "reduce:0-inf:@0" + "reduce:0-32K:@0#reduce:32K-inf:@2" /* A set of convenience macros used to implement sw based progress of the reduce algorithm that uses kn pattern */ @@ -55,8 +56,8 @@ static inline int ucc_tl_ucp_reduce_alg_from_str(const char *str) ucc_status_t ucc_tl_ucp_reduce_init(ucc_tl_ucp_task_t *task); ucc_status_t ucc_tl_ucp_reduce_knomial_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h); + ucc_base_team_t *team, + ucc_coll_task_t **task_h); ucc_status_t ucc_tl_ucp_reduce_knomial_start(ucc_coll_task_t *task); @@ -65,7 +66,11 @@ void ucc_tl_ucp_reduce_knomial_progress(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_reduce_knomial_finalize(ucc_coll_task_t *task); ucc_status_t ucc_tl_ucp_reduce_dbt_init(ucc_base_coll_args_t *coll_args, - ucc_base_team_t *team, - ucc_coll_task_t **task_h); + ucc_base_team_t *team, + ucc_coll_task_t **task_h); + +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h); #endif diff --git a/src/components/tl/ucp/reduce/reduce_srg_knomial.c b/src/components/tl/ucp/reduce/reduce_srg_knomial.c new file mode 100644 index 0000000000..7ded90453d --- /dev/null +++ b/src/components/tl/ucp/reduce/reduce_srg_knomial.c @@ -0,0 +1,334 @@ +/** + * Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * + * See file LICENSE for terms. + */ + +#include "config.h" +#include "reduce.h" +#include "core/ucc_progress_queue.h" +#include "tl_ucp_sendrecv.h" +#include "coll_patterns/sra_knomial.h" +#include "utils/ucc_math.h" +#include "utils/ucc_coll_utils.h" +#include "components/mc/ucc_mc.h" +#include "../reduce_scatter/reduce_scatter.h" +#include "../gather/gather.h" +#include "../allgather/allgather.h" + +/* SRG - scatter-reduce-gather knomial algorithm + 1. The algorithm performs collective reduce operation as a sequence of + K-nomial Reduce-Scatter followed by K-nomial (with the same radix K) + gather. + 2. In essence this is an extension of the Bi-nomial SRA algorithm algorithm + proposed by Rabenseifner2004 (https://doi.org/10.1007/978-3-540-24685-5_1). + The extension adds the support for arbitrary radix. + 3. The algorithm targets Large message sizes (ie. optimized for max bandwidth). + 4. If number of ranks in the team can not form a full radix subtree + (for radix=2 this means the team size is not power of 2) then there will be + "extra" ranks which don't participate in the main exchange loop. They + will send the data to their "proxy" ranks in the beginning and then wait + for the response with the final data. + 5. The knomial reduce-scatter and gather primitives can be used separately. + However, if they are used together as part of SRG reduce one has to + provide the same radix for both routines. + 6. After the completion of reduce-scatter phase the local result (at non EXTRA + ranks) will be located in dst buffer at offset the can be commputed by the + routine from coll_patterns/sra_knomial.h: ucc_sra_kn_get_offset. + */ + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_start(ucc_coll_task_t *task) +{ + return ucc_schedule_start(task); +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_finalize(ucc_coll_task_t *task) +{ + ucc_schedule_t *schedule = ucc_derived_of(task, ucc_schedule_t); + ucc_status_t status; + + status = ucc_schedule_finalize(task); + ucc_tl_ucp_put_schedule(schedule); + return status; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_setup(ucc_schedule_pipelined_t *schedule_p, + ucc_schedule_t *frag, int frag_num) +{ + int n_frags = schedule_p->super.n_tasks; + ucc_coll_args_t *args = &schedule_p->super.super.bargs.args; + size_t dt_size; + size_t count; + size_t frag_count; + size_t offset; + ucc_coll_args_t *targs; + ucc_rank_t rank; + ucc_tl_ucp_team_t *team; + + team = TASK_TEAM(&schedule_p->super); + rank = UCC_TL_TEAM_RANK(team); + if (UCC_IS_ROOT(*args, rank)) { + count = args->dst.info.count; + dt_size = ucc_dt_size(args->dst.info.datatype); + } else { + count = args->src.info.count; + dt_size = ucc_dt_size(args->src.info.datatype); + } + frag_count = ucc_buffer_block_count(count, n_frags, frag_num); + offset = ucc_buffer_block_offset(count, n_frags, frag_num); + + targs = &frag->tasks[0]->bargs.args; /* REDUCE_SCATTER */ + targs->src.info.buffer = PTR_OFFSET(targs->src.info.buffer, offset * dt_size); + targs->src.info.count = frag_count; + targs->dst.info.buffer = PTR_OFFSET(targs->dst.info.buffer, offset * dt_size); + targs->dst.info.count = frag_count; + + targs = &frag->tasks[1]->bargs.args; /* GATHER */ + targs->src.info.buffer = PTR_OFFSET(targs->src.info.buffer, offset * dt_size);; + targs->src.info.count = 0; + targs->dst.info.buffer = PTR_OFFSET(targs->dst.info.buffer, offset * dt_size); + targs->dst.info.count = frag_count; + + return UCC_OK; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_frag_init(ucc_base_coll_args_t *coll_args, + ucc_schedule_pipelined_t *sp, + ucc_base_team_t *team, + ucc_schedule_t **frag_p) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_base_coll_args_t args = *coll_args; + ucc_mrange_uint_t *p = &tl_team->cfg.reduce_srg_kn_radix; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + ucc_schedule_t *schedule; + ucc_coll_task_t *task, *rs_task; + ucc_status_t status; + ucc_kn_radix_t radix, cfg_radix; + size_t count; + ucc_datatype_t dt; + void *rs_rbuf, *rs_sbuf; + ucc_tl_ucp_schedule_t *rsg_schedule; + ucc_memory_type_t mt; + + rsg_schedule = ucc_derived_of(sp, ucc_tl_ucp_schedule_t); + status = ucc_tl_ucp_get_schedule(tl_team, coll_args, + (ucc_tl_ucp_schedule_t **)&schedule); + if (ucc_unlikely(UCC_OK != status)) { + return status; + } + + if (UCC_IS_ROOT(coll_args->args, trank)) { + dt = coll_args->args.dst.info.datatype; + mt = coll_args->args.dst.info.mem_type; + if (UCC_IS_INPLACE(coll_args->args)) { + rs_rbuf = rsg_schedule->scratch_mc_header->addr; + rs_sbuf = coll_args->args.dst.info.buffer; + } else { + rs_rbuf = coll_args->args.dst.info.buffer; + rs_sbuf = coll_args->args.src.info.buffer; + } + count = coll_args->args.dst.info.count; + } else { + dt = coll_args->args.src.info.datatype; + mt = coll_args->args.src.info.mem_type; + rs_rbuf = rsg_schedule->scratch_mc_header->addr; + rs_sbuf = coll_args->args.src.info.buffer; + count = coll_args->args.src.info.count; + } + + if (coll_args->mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) { + count = coll_args->max_frag_count; + } + + args.args.flags &= ~UCC_COLL_ARGS_FLAG_IN_PLACE; + args.args.dst.info.buffer = rs_rbuf; + args.args.dst.info.count = count; + args.args.dst.info.datatype = dt; + args.args.dst.info.mem_type = mt; + args.args.src.info.buffer = rs_sbuf; + args.args.src.info.count = count; + args.args.src.info.datatype = dt; + args.args.src.info.mem_type = mt; + + cfg_radix = ucc_tl_ucp_get_radix_from_range(tl_team, + count * ucc_dt_size(dt), + mt, p, 4); + radix = ucc_knomial_pattern_get_min_radix(cfg_radix, + UCC_TL_TEAM_SIZE(tl_team), + count); + + /* 1st step of reduce: knomial reduce_scatter */ + UCC_CHECK_GOTO(ucc_tl_ucp_reduce_scatter_knomial_init_r(&args, team, &task, + radix), + out, status); + UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, task), out, status); + UCC_CHECK_GOTO(ucc_task_subscribe_dep(&schedule->super, task, + UCC_EVENT_SCHEDULE_STARTED), + out, status); + rs_task = task; + + /* 2nd step of reduce: knomial gather */ + args.args.src.info.buffer = rs_rbuf; + if (UCC_IS_ROOT(coll_args->args, trank)) { + if (UCC_IS_INPLACE (coll_args->args)) { + args.args.dst.info.buffer = rs_sbuf; + args.args.src.info.buffer = rs_rbuf; + } else { + args.args.dst.info.buffer = rs_rbuf; + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + + } + } else { + args.args.mask |= UCC_COLL_ARGS_FIELD_FLAGS; + args.args.flags |= UCC_COLL_ARGS_FLAG_IN_PLACE; + } + + UCC_CHECK_GOTO(ucc_tl_ucp_gather_knomial_init_r(&args, team, &task, radix), + out, status); + UCC_CHECK_GOTO(ucc_schedule_add_task(schedule, task), out, status); + UCC_CHECK_GOTO(ucc_task_subscribe_dep(rs_task, task, UCC_EVENT_COMPLETED), + out, status); + schedule->super.finalize = ucc_tl_ucp_reduce_srg_knomial_frag_finalize; + schedule->super.post = ucc_tl_ucp_reduce_srg_knomial_frag_start; + *frag_p = schedule; + return UCC_OK; +out: + return status; +} + +static ucc_status_t +ucc_tl_ucp_reduce_srg_knomial_finalize(ucc_coll_task_t *task) +{ + ucc_tl_ucp_schedule_t *schedule = ucc_derived_of(task, + ucc_tl_ucp_schedule_t); + ucc_status_t status; + + UCC_TL_UCP_PROFILE_REQUEST_EVENT(schedule, "ucp_reduce_srg_kn_done", 0); + if (schedule->scratch_mc_header) { + ucc_mc_free(schedule->scratch_mc_header); + } + status = ucc_schedule_pipelined_finalize(task); + ucc_tl_ucp_put_schedule(&schedule->super.super); + return status; +} + +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_start(ucc_coll_task_t *task) +{ + UCC_TL_UCP_PROFILE_REQUEST_EVENT(task, "ucp_reduce_srg_kn_start", 0); + return ucc_schedule_pipelined_post(task); +} + +static void +ucc_tl_ucp_reduce_srg_knomial_get_pipeline_params(ucc_tl_ucp_team_t *team, + ucc_memory_type_t mtype, + ucc_pipeline_params_t *pp) +{ + ucc_tl_ucp_lib_config_t *cfg = &team->cfg; + ucc_mc_attr_t mc_attr; + + if (!ucc_pipeline_params_is_auto(&cfg->reduce_srg_kn_pipeline)) { + *pp = cfg->reduce_srg_kn_pipeline; + return; + } + + if (mtype == UCC_MEMORY_TYPE_CUDA) { + mc_attr.field_mask = UCC_MC_ATTR_FIELD_FAST_ALLOC_SIZE; + ucc_mc_get_attr(&mc_attr, UCC_MEMORY_TYPE_CUDA); + pp->threshold = mc_attr.fast_alloc_size; + pp->n_frags = 2; + pp->frag_size = mc_attr.fast_alloc_size; + pp->order = UCC_PIPELINE_PARALLEL; + pp->pdepth = 2; + } else { + pp->threshold = SIZE_MAX; + pp->n_frags = 0; + pp->frag_size = 0; + pp->pdepth = 1; + pp->order = UCC_PIPELINE_PARALLEL; + } +} + +ucc_status_t ucc_tl_ucp_reduce_srg_knomial_init(ucc_base_coll_args_t *coll_args, + ucc_base_team_t *team, + ucc_coll_task_t **task_h) +{ + ucc_tl_ucp_team_t *tl_team = ucc_derived_of(team, ucc_tl_ucp_team_t); + ucc_coll_args_t *args = &coll_args->args; + ucc_rank_t trank = UCC_TL_TEAM_RANK(tl_team); + int n_frags, pipeline_depth; + ucc_tl_ucp_schedule_t *schedule; + ucc_status_t st; + ucc_base_coll_args_t bargs; + size_t max_frag_count, dt_size, count; + ucc_pipeline_params_t pipeline_params; + ucc_datatype_t dt; + ucc_memory_type_t mt; + + st = ucc_tl_ucp_get_schedule(tl_team, coll_args, &schedule); + if (ucc_unlikely(UCC_OK != st)) { + goto err_out; + } + + schedule->scratch_mc_header = NULL; + if (UCC_IS_ROOT(*args, trank)) { + count = args->dst.info.count; + dt = args->dst.info.datatype; + mt = args->dst.info.mem_type; + dt_size = ucc_dt_size(dt); + } else { + count = args->src.info.count; + dt = args->src.info.datatype; + mt = args->src.info.mem_type; + dt_size = ucc_dt_size(dt); + } + + if (!UCC_IS_ROOT(*args, trank) || UCC_IS_INPLACE(*args)) { + st = ucc_mc_alloc(&schedule->scratch_mc_header, count * dt_size, mt); + if (ucc_unlikely(UCC_OK != st)) { + tl_error(team->context->lib, "failed to alloc scratch memory"); + goto err_free_schedule; + } + } + + bargs = *coll_args; + max_frag_count = (bargs.mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) ? + bargs.max_frag_count: count; + ucc_tl_ucp_reduce_srg_knomial_get_pipeline_params(tl_team, mt, + &pipeline_params); + ucc_pipeline_nfrags_pdepth(&pipeline_params, max_frag_count * dt_size, + &n_frags, &pipeline_depth); + if (n_frags > 1) { + bargs.mask |= UCC_BASE_CARGS_MAX_FRAG_COUNT; + bargs.max_frag_count = ucc_buffer_block_count(max_frag_count, n_frags, 0); + } + + st = ucc_schedule_pipelined_init(&bargs, team, + ucc_tl_ucp_reduce_srg_knomial_frag_init, + ucc_tl_ucp_reduce_srg_knomial_frag_setup, + pipeline_depth, n_frags, + pipeline_params.order, + &schedule->super); + if (ucc_unlikely(UCC_OK != st)) { + tl_error(team->context->lib, "failed to init pipelined schedule"); + goto err_free_scratch; + } + + schedule->super.super.super.finalize = ucc_tl_ucp_reduce_srg_knomial_finalize; + schedule->super.super.super.post = ucc_tl_ucp_reduce_srg_knomial_start; + + *task_h = &schedule->super.super.super; + return UCC_OK; + +err_free_scratch: + ucc_mc_free(schedule->scratch_mc_header); +err_free_schedule: + ucc_tl_ucp_put_schedule(&schedule->super.super); +err_out: + return st; +} diff --git a/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c b/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c index 365579d8f9..e247fd0a05 100644 --- a/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c +++ b/src/components/tl/ucp/reduce_scatter/reduce_scatter_knomial.c @@ -20,6 +20,7 @@ size_t _count = 0; \ switch ((_args)->coll_type) { \ case UCC_COLL_TYPE_ALLREDUCE: \ + case UCC_COLL_TYPE_REDUCE: \ _count = (_args)->dst.info.count; \ break; \ case UCC_COLL_TYPE_REDUCE_SCATTER: \ @@ -48,7 +49,7 @@ typedef struct ucc_tl_ucp_rs_work_buf { void *reduce_loop; } ucc_tl_ucp_rs_work_buf_t; -/* get work buffers for allreduce */ +/* get work buffers for allreduce and reduce */ static inline void get_sbuf_rbuf_ar(ucc_tl_ucp_task_t *task, size_t block_count, ucc_tl_ucp_rs_work_buf_t *wb) @@ -188,6 +189,7 @@ static inline void get_rs_work_buf(ucc_tl_ucp_task_t *task, switch (args->coll_type) { case UCC_COLL_TYPE_ALLREDUCE: + case UCC_COLL_TYPE_REDUCE: return get_sbuf_rbuf_ar(task, block_count, wb); case UCC_COLL_TYPE_REDUCE_SCATTER: return get_sbuf_rbuf_rs(task, wb); @@ -214,6 +216,7 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) size_t data_size = count * dt_size; ucc_rank_t rank = task->subset.myrank; ucc_rank_t size = task->subset.map.ep_num; + ucc_rank_t root = 0; size_t local_seg_count = 0; ucc_tl_ucp_rs_work_buf_t wb = (ucc_tl_ucp_rs_work_buf_t){0}; ptrdiff_t peer_seg_offset, local_seg_offset; @@ -224,12 +227,18 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) void *local_data; int is_avg; + if (args->coll_type == UCC_COLL_TYPE_REDUCE) { + root = args->root; + rank = VRANK(rank, root, size); + } + UCC_KN_REDUCE_GOTO_PHASE(task->reduce_scatter_kn.phase); block_count = ucc_sra_kn_compute_block_count(count, rank, p); get_rs_work_buf(task, block_count, &wb); if (KN_NODE_EXTRA == node_type) { peer = ucc_ep_map_eval(task->subset.map, ucc_knomial_pattern_get_proxy(p, rank)); + peer = INV_VRANK(peer, root, size); UCPCHECK_GOTO(ucc_tl_ucp_send_nb(wb.src_data, data_size, mem_type, peer, team, task), task, out); @@ -243,6 +252,7 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) if (KN_NODE_PROXY == node_type) { peer = ucc_ep_map_eval(task->subset.map, ucc_knomial_pattern_get_extra(p, rank)); + peer = INV_VRANK(peer, root, size); UCPCHECK_GOTO(ucc_tl_ucp_recv_nb(wb.dst_proxy, data_size, mem_type, peer, team, task), task, out); @@ -288,7 +298,8 @@ void ucc_tl_ucp_reduce_scatter_knomial_progress(ucc_coll_task_t *coll_task) } ucc_kn_rs_pattern_peer_seg(peer, p, &peer_seg_count, &peer_seg_offset); - peer = ucc_ep_map_eval(task->subset.map, peer); + peer = INV_VRANK(ucc_ep_map_eval(task->subset.map, peer), root, + size); UCPCHECK_GOTO( ucc_tl_ucp_send_nb(PTR_OFFSET(wb.src_loop, peer_seg_offset * dt_size), peer_seg_count * dt_size, mem_type, peer, @@ -371,9 +382,10 @@ ucc_status_t ucc_tl_ucp_reduce_scatter_knomial_start(ucc_coll_task_t *coll_task) ucc_tl_ucp_task_t *task = ucc_derived_of(coll_task, ucc_tl_ucp_task_t); ucc_coll_args_t *args = &TASK_ARGS(task); ucc_tl_ucp_team_t *team = TASK_TEAM(task); - ucc_rank_t rank = task->subset.myrank; ucc_rank_t size = task->subset.map.ep_num; ucc_coll_type_t ct = args->coll_type; + ucc_rank_t root = (ct == UCC_COLL_TYPE_REDUCE) ? args->root : 0; + ucc_rank_t rank = VRANK(task->subset.myrank, root, size); size_t count = GET_COUNT(args); ucc_status_t status; @@ -381,7 +393,8 @@ ucc_status_t ucc_tl_ucp_reduce_scatter_knomial_start(ucc_coll_task_t *coll_task) 0); ucc_tl_ucp_task_reset(task, UCC_INPROGRESS); - if (ct == UCC_COLL_TYPE_ALLREDUCE) { + if ((ct == UCC_COLL_TYPE_ALLREDUCE) || + (ct == UCC_COLL_TYPE_REDUCE)) { ucc_kn_rsx_pattern_init(size, rank, task->reduce_scatter_kn.p.radix, count, &task->reduce_scatter_kn.p); } else { @@ -425,7 +438,8 @@ static size_t compute_scratch_size(ucc_tl_ucp_task_t *task) ucc_kn_radix_t step_radix; size_t max_recv_size; - if (args->coll_type == UCC_COLL_TYPE_ALLREDUCE) { + if ((args->coll_type == UCC_COLL_TYPE_ALLREDUCE) || + (args->coll_type == UCC_COLL_TYPE_REDUCE)) { if (KN_NODE_EXTRA != task->reduce_scatter_kn.p.node_type) { if (coll_args->mask & UCC_BASE_CARGS_MAX_FRAG_COUNT) { count = coll_args->max_frag_count; @@ -499,6 +513,9 @@ ucc_tl_ucp_reduce_scatter_knomial_init_r(ucc_base_coll_args_t *coll_args, ucc_kn_rsx_pattern_init(size, rank, radix, count, &task->reduce_scatter_kn.p); + } else if (ct == UCC_COLL_TYPE_REDUCE) { + ucc_kn_rsx_pattern_init(size, VRANK(rank, coll_args->args.root, size), + radix, count, &task->reduce_scatter_kn.p); } else { ucc_kn_rs_pattern_init(size, rank, radix, count, &task->reduce_scatter_kn.p); diff --git a/src/components/tl/ucp/tl_ucp.c b/src/components/tl/ucp/tl_ucp.c index 7db99bdaf2..8b0ca7ede0 100644 --- a/src/components/tl/ucp/tl_ucp.c +++ b/src/components/tl/ucp/tl_ucp.c @@ -153,6 +153,16 @@ ucc_config_field_t ucc_tl_ucp_lib_config_table[] = { ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_kn_radix), UCC_CONFIG_TYPE_UINT}, + {"REDUCE_SRG_KN_PIPELINE", "auto", + "Pipelining settings for SRG Knomial reduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_srg_kn_pipeline), + UCC_CONFIG_TYPE_PIPELINE_PARAMS}, + + {"REDUCE_SRG_KN_RADIX", "auto", + "Radix of the scatter-reduce-gather (SRG) knomial reduce algorithm", + ucc_offsetof(ucc_tl_ucp_lib_config_t, reduce_srg_kn_radix), + UCC_CONFIG_TYPE_UINT_RANGED}, + {"GATHER_KN_RADIX", "4", "Radix of the knomial tree reduce algorithm", ucc_offsetof(ucc_tl_ucp_lib_config_t, gather_kn_radix), UCC_CONFIG_TYPE_UINT}, diff --git a/src/components/tl/ucp/tl_ucp.h b/src/components/tl/ucp/tl_ucp.h index 3c439f4ae5..4db5788e15 100644 --- a/src/components/tl/ucp/tl_ucp.h +++ b/src/components/tl/ucp/tl_ucp.h @@ -58,6 +58,8 @@ typedef struct ucc_tl_ucp_lib_config { uint32_t bcast_kn_radix; ucc_mrange_uint_t bcast_sag_kn_radix; uint32_t reduce_kn_radix; + ucc_pipeline_params_t reduce_srg_kn_pipeline; + ucc_mrange_uint_t reduce_srg_kn_radix; uint32_t gather_kn_radix; uint32_t gatherv_linear_num_posts; uint32_t scatter_kn_radix; diff --git a/src/components/tl/ucp/tl_ucp_coll.c b/src/components/tl/ucp/tl_ucp_coll.c index 419b602156..8cef58de08 100644 --- a/src/components/tl/ucp/tl_ucp_coll.c +++ b/src/components/tl/ucp/tl_ucp_coll.c @@ -367,6 +367,9 @@ ucc_status_t ucc_tl_ucp_alg_id_to_init(int alg_id, const char *alg_id_str, case UCC_TL_UCP_REDUCE_ALG_DBT: *init = ucc_tl_ucp_reduce_dbt_init; break; + case UCC_TL_UCP_REDUCE_ALG_SRG: + *init = ucc_tl_ucp_reduce_srg_knomial_init; + break; default: status = UCC_ERR_INVALID_PARAM; break;