diff --git a/ompi/mca/fcoll/vulcan/Makefile.am b/ompi/mca/fcoll/vulcan/Makefile.am index e805880a661..c4680544abb 100644 --- a/ompi/mca/fcoll/vulcan/Makefile.am +++ b/ompi/mca/fcoll/vulcan/Makefile.am @@ -13,6 +13,7 @@ # Copyright (c) 2012 Cisco Systems, Inc. All rights reserved. # Copyright (c) 2018 Research Organization for Information Science # and Technology (RIST). All rights reserved. +# Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. # $COPYRIGHT$ # # Additional copyrights may follow @@ -22,6 +23,7 @@ sources = \ fcoll_vulcan.h \ + fcoll_vulcan_internal.h \ fcoll_vulcan_module.c \ fcoll_vulcan_component.c \ fcoll_vulcan_file_read_all.c \ diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h index a2fd6ca82bc..3165a0b0797 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan.h +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan.h @@ -14,6 +14,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -40,8 +41,6 @@ BEGIN_C_DECLS /* Globally exported variables */ extern int mca_fcoll_vulcan_priority; -extern int mca_fcoll_vulcan_num_groups; -extern int mca_fcoll_vulcan_write_chunksize; extern int mca_fcoll_vulcan_async_io; extern int mca_fcoll_vulcan_use_accelerator_buffers; diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c index 80a5bfb872a..5fc8254f164 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_component.c @@ -16,6 +16,7 @@ * reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -43,8 +44,6 @@ const char *mca_fcoll_vulcan_component_version_string = * Global variables */ int mca_fcoll_vulcan_priority = 10; -int mca_fcoll_vulcan_num_groups = 1; -int mca_fcoll_vulcan_write_chunksize = -1; int mca_fcoll_vulcan_async_io = 0; /* @@ -91,20 +90,6 @@ vulcan_register(void) OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_priority); - mca_fcoll_vulcan_num_groups = 1; - (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, - "num_groups", "Number of subgroups created by the vulcan component", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, - MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_num_groups); - - mca_fcoll_vulcan_write_chunksize = -1; - (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, - "write_chunksize", "Chunk size written at once. Default: stripe_size of the file system", - MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, - OPAL_INFO_LVL_9, - MCA_BASE_VAR_SCOPE_READONLY, &mca_fcoll_vulcan_write_chunksize); - mca_fcoll_vulcan_async_io = 0; (void) mca_base_component_var_register(&mca_fcoll_vulcan_component.fcollm_version, "async_io", "Asynchronous I/O support options. 0: Automatic choice (default) " diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c index c372e9f14b4..f6a492e621c 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_read_all.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -14,6 +15,7 @@ * and Technology (RIST). All rights reserved. * Copyright (c) 2024 Triad National Security, LLC. All rights * reserved. + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. * $COPYRIGHT$ * * Additional copyrights may follow @@ -23,10 +25,31 @@ #include "ompi_config.h" #include "fcoll_vulcan.h" +#include "fcoll_vulcan_internal.h" #include "mpi.h" +#include "ompi/constants.h" +#include "ompi/mca/fcoll/fcoll.h" +#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h" #include "ompi/mca/common/ompio/common_ompio.h" +#include "ompi/mca/common/ompio/common_ompio_buffer.h" +#include "ompi/mca/io/io.h" +#include "ompi/mca/common/ompio/common_ompio_request.h" +#include "math.h" +#include "ompi/mca/pml/pml.h" +#include "opal/mca/accelerator/accelerator.h" +#include +#define DEBUG_ON 0 +#define NOT_AGGR_INDEX -1 + +static int shuffle_init (int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs); + +static int read_init (ompio_file_t *fh, int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *aggr_data, + int read_syncType, ompi_request_t **request, + bool is_accelerator_buffer); int mca_fcoll_vulcan_file_read_all (struct ompio_file_t *fh, void *buf, @@ -34,7 +57,888 @@ int mca_fcoll_vulcan_file_read_all (struct ompio_file_t *fh, struct ompi_datatype_t *datatype, ompi_status_public_t *status) { - return mca_common_ompio_base_file_read_all (fh, buf, count, datatype, status); + int index = 0; + int cycles = 0; + int ret =0, l, i, j, bytes_per_cycle; + uint32_t iov_count = 0; + struct iovec *decoded_iov = NULL; + struct iovec *local_iov_array=NULL; + uint32_t total_fview_count = 0; + int local_count = 0; + ompi_request_t **reqs = NULL; + ompi_request_t *req_iread = MPI_REQUEST_NULL; + ompi_request_t *req_tmp = MPI_REQUEST_NULL; + mca_io_ompio_aggregator_data **aggr_data=NULL; + + ptrdiff_t *displs = NULL; + int vulcan_num_io_procs; + size_t max_data = 0; + + struct iovec **broken_iov_arrays=NULL; + struct iovec **broken_decoded_iovs=NULL; + int *broken_counts=NULL; + int *broken_iov_counts=NULL; + MPI_Aint *broken_total_lengths=NULL; + + int aggr_index = NOT_AGGR_INDEX; + int read_sync_type = 2; + int *result_counts=NULL; + + ompi_count_array_t fview_count_desc; + ompi_disp_array_t displs_desc; + int is_gpu, is_managed; + bool use_accelerator_buffer = false; + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0; + double comm_time = 0.0, start_comm_time = 0.0, end_comm_time = 0.0; + double exch_read = 0.0, start_exch = 0.0, end_exch = 0.0; + mca_common_ompio_print_entry nentry; +#endif + + vulcan_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators")); + if (OMPI_ERR_MAX == vulcan_num_io_procs) { + ret = OMPI_ERROR; + goto exit; + } + bytes_per_cycle = fh->f_bytes_per_agg; + + if ((1 == mca_fcoll_vulcan_async_io) && (NULL == fh->f_fbtl->fbtl_ipreadv)) { + opal_output (1, "vulcan_read_all: fbtl Does NOT support ipreadv() (asynchronous read) \n"); + ret = MPI_ERR_UNSUPPORTED_OPERATION; + goto exit; + } + + mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); + if (is_gpu && !is_managed && + fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { + use_accelerator_buffer = true; + } + /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what + the user requested */ + bytes_per_cycle = bytes_per_cycle/2; + + /************************************************************************** + ** 1. Decode user buffer into an iovec + **************************************************************************/ + ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh, + datatype, count, buf, &max_data, + fh->f_mem_convertor, &decoded_iov, + &iov_count); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + if (MPI_STATUS_IGNORE != status) { + status->_ucount = max_data; + } + + ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, max_data); + if (OMPI_SUCCESS != ret){ + goto exit; + } + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Using %d aggregators for the read_all operation \n", fh->f_num_aggrs); + + aggr_data = (mca_io_ompio_aggregator_data **) malloc (fh->f_num_aggrs * + sizeof(mca_io_ompio_aggregator_data*)); + if (NULL == aggr_data) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (i = 0; i < fh->f_num_aggrs; i++) { + // At this point we know the number of aggregators. If there is a correlation between + // number of aggregators and number of IO nodes, we know how many aggr_data arrays we need + // to allocate. + aggr_data[i] = (mca_io_ompio_aggregator_data *) calloc (1, sizeof(mca_io_ompio_aggregator_data)); + aggr_data[i]->procs_per_group = fh->f_procs_per_group; + aggr_data[i]->procs_in_group = fh->f_procs_in_group; + aggr_data[i]->comm = fh->f_comm; + // Identify if the process is an aggregator. + // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays. + if (fh->f_aggr_list[i] == fh->f_rank) { + aggr_index = i; + } + } + + /********************************************************************* + *** 2. Generate the local offsets/lengths array corresponding to + *** this read operation + ********************************************************************/ + ret = fh->f_generate_current_file_view ((struct ompio_file_t *) fh, + max_data, &local_iov_array, + &local_count); + if (ret != OMPI_SUCCESS) { + goto exit; + } + + /************************************************************************* + ** 2b. Separate the local_iov_array entries based on the number of aggregators + *************************************************************************/ + // Modifications for the even distribution: + long domain_size; + ret = mca_fcoll_vulcan_minmax (fh, local_iov_array, local_count, fh->f_num_aggrs, &domain_size); + + // broken_iov_arrays[0] contains broken_counts[0] entries to aggregator 0, + // broken_iov_arrays[1] contains broken_counts[1] entries to aggregator 1, etc. + ret = mca_fcoll_vulcan_break_file_view (decoded_iov, iov_count, + local_iov_array, local_count, + &broken_decoded_iovs, &broken_iov_counts, + &broken_iov_arrays, &broken_counts, + &broken_total_lengths, + fh->f_num_aggrs, domain_size); + + /************************************************************************** + ** 3. Determine the total amount of data to be read and no. of cycles + **************************************************************************/ +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, broken_total_lengths, + fh->f_num_aggrs, MPI_LONG, MPI_SUM, + fh->f_comm, + fh->f_comm->c_coll->coll_allreduce_module); + if (OMPI_SUCCESS != ret) { + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + cycles=0; + for (i = 0; i < fh->f_num_aggrs; i++) { +#if DEBUG_ON + printf("%d: Overall broken_total_lengths[%d] = %ld\n", fh->f_rank, i, broken_total_lengths[i]); +#endif + if (ceil((double)broken_total_lengths[i]/bytes_per_cycle) > cycles) { + cycles = ceil((double)broken_total_lengths[i]/bytes_per_cycle); + } + } + + result_counts = (int *) malloc (fh->f_num_aggrs * fh->f_procs_per_group * sizeof(int)); + if (NULL == result_counts) { + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + ret = fh->f_comm->c_coll->coll_allgather (broken_counts, fh->f_num_aggrs, MPI_INT, + result_counts, fh->f_num_aggrs, MPI_INT, + fh->f_comm, + fh->f_comm->c_coll->coll_allgather_module); + if (OMPI_SUCCESS != ret) { + goto exit; + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + /************************************************************* + *** 4. Allgather the offset/lengths array from all processes + *************************************************************/ + for (i = 0; i < fh->f_num_aggrs; i++) { + aggr_data[i]->total_bytes = broken_total_lengths[i]; + aggr_data[i]->decoded_iov = broken_decoded_iovs[i]; + aggr_data[i]->fview_count = (size_t *)malloc (fh->f_procs_per_group * sizeof (size_t)); + if (NULL == aggr_data[i]->fview_count) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (j = 0; j < fh->f_procs_per_group; j++) { + aggr_data[i]->fview_count[j] = result_counts[fh->f_num_aggrs*j+i]; + } + + displs = (ptrdiff_t *)malloc (fh->f_procs_per_group * sizeof (ptrdiff_t)); + if (NULL == displs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + displs[0] = 0; + total_fview_count = (uint32_t) aggr_data[i]->fview_count[0]; + for (j = 1 ; j < fh->f_procs_per_group ; j++) { + total_fview_count += aggr_data[i]->fview_count[j]; + displs[j] = displs[j-1] + aggr_data[i]->fview_count[j-1]; + } + +#if DEBUG_ON + printf("total_fview_count : %d\n", total_fview_count); + if (fh->f_aggr_list[i] == fh->f_rank) { + for (j=0 ; jf_procs_per_group ; i++) { + printf ("%d: PROCESS: %d ELEMENTS: %ld DISPLS: %ld\n", + fh->f_rank, j, + aggr_data[i]->fview_count[j], + displs[j]); + } + } +#endif + + /* allocate the global iovec */ + if (0 != total_fview_count) { + aggr_data[i]->global_iov_array = (struct iovec*) malloc (total_fview_count * + sizeof(struct iovec)); + if (NULL == aggr_data[i]->global_iov_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_comm_time = MPI_Wtime(); +#endif + OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); + OMPI_DISP_ARRAY_INIT(&displs_desc, displs); + ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + fview_count_desc, + displs_desc, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll->coll_allgatherv_module ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_comm_time = MPI_Wtime(); + comm_time += (end_comm_time - start_comm_time); +#endif + + /**************************************************************************************** + *** 5. Sort the global offset/lengths list based on the offsets. + *** The result of the sort operation is the 'sorted', an integer array, + *** which contains the indexes of the global_iov_array based on the offset. + *** For example, if global_iov_array[x].offset is followed by global_iov_array[y].offset + *** in the file, and that one is followed by global_iov_array[z].offset, than + *** sorted[0] = x, sorted[1]=y and sorted[2]=z; + ******************************************************************************************/ + if (0 != total_fview_count) { + aggr_data[i]->sorted = (int *)malloc (total_fview_count * sizeof(int)); + if (NULL == aggr_data[i]->sorted) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + ompi_fcoll_base_sort_iovec (aggr_data[i]->global_iov_array, total_fview_count, + aggr_data[i]->sorted); + } + + if (NULL != local_iov_array) { + free(local_iov_array); + local_iov_array = NULL; + } + + if (NULL != displs) { + free(displs); + displs=NULL; + } + +#if DEBUG_ON + if (fh->f_aggr_list[i] == fh->f_rank) { + uint32_t tv=0; + for (tv = 0 ; tv < total_fview_count ; tv++) { + printf("%d: OFFSET: %lu LENGTH: %ld\n", + fh->f_rank, + (uint64_t)aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_base, + aggr_data[i]->global_iov_array[aggr_data[i]->sorted[tv]].iov_len); + } + } +#endif + /************************************************************* + *** 6. Determine the number of cycles required to execute this + *** operation + *************************************************************/ + aggr_data[i]->bytes_per_cycle = bytes_per_cycle; + + if (fh->f_aggr_list[i] == fh->f_rank) { + aggr_data[i]->disp_index = (int *)malloc (fh->f_procs_per_group * sizeof (int)); + if (NULL == aggr_data[i]->disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->max_disp_index = (int *)calloc (fh->f_procs_per_group, sizeof (int)); + if (NULL == aggr_data[i]->max_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->blocklen_per_process = (int **)calloc (fh->f_procs_per_group, sizeof (int*)); + if (NULL == aggr_data[i]->blocklen_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + aggr_data[i]->displs_per_process = (MPI_Aint **)calloc (fh->f_procs_per_group, sizeof (MPI_Aint*)); + if (NULL == aggr_data[i]->displs_per_process) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + if (use_accelerator_buffer) { + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Allocating GPU device buffer for aggregation\n"); + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->prev_global_buf, + bytes_per_cycle); + if (OPAL_SUCCESS != ret) { + opal_output(1, "Could not allocate accelerator memory"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } else { + aggr_data[i]->global_buf = (char *) malloc (bytes_per_cycle); + aggr_data[i]->prev_global_buf = (char *) malloc (bytes_per_cycle); + if (NULL == aggr_data[i]->global_buf || NULL == aggr_data[i]->prev_global_buf){ + opal_output(1, "OUT OF MEMORY"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + } + + aggr_data[i]->recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + aggr_data[i]->prev_recvtype = (ompi_datatype_t **) malloc (fh->f_procs_per_group * + sizeof(ompi_datatype_t *)); + if (NULL == aggr_data[i]->recvtype || NULL == aggr_data[i]->prev_recvtype) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + for(l=0;lf_procs_per_group;l++){ + aggr_data[i]->recvtype[l] = MPI_DATATYPE_NULL; + aggr_data[i]->prev_recvtype[l] = MPI_DATATYPE_NULL; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_exch = MPI_Wtime(); +#endif + } + + reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *)); + if (NULL == reqs) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + for (l = 0, i = 0; i < fh->f_num_aggrs; i++) { + for (j=0; j< (fh->f_procs_per_group+1); j++) { + reqs[l] = MPI_REQUEST_NULL; + l++; + } + } + + if( (1 == mca_fcoll_vulcan_async_io) || + ( (0 == mca_fcoll_vulcan_async_io) && (NULL != fh->f_fbtl->fbtl_ipreadv) && (2 < cycles))) { + read_sync_type = 1; + } + + if (cycles > 0) { + if (NOT_AGGR_INDEX != aggr_index) { + // Register progress function that should be used by ompi_request_wait + mca_common_ompio_register_progress (); + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_read_time = MPI_Wtime(); +#endif + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = read_init (fh, 0, cycles, fh->f_aggr_list[i], fh->f_rank, + aggr_data[i], read_sync_type, &req_tmp, + use_accelerator_buffer); + if (OMPI_SUCCESS != ret) { + goto exit; + } + if (fh->f_aggr_list[i] == fh->f_rank) { + req_iread = req_tmp; + } + } + + if (NOT_AGGR_INDEX != aggr_index) { + ret = ompi_request_wait(&req_iread, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_read_time = MPI_Wtime(); + read_time += end_read_time - start_read_time; +#endif + } + + for (index = 1; index < cycles; index++) { + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = shuffle_init (index-1, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + } + + SWAP_AGGR_POINTERS(aggr_data, fh->f_num_aggrs); +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + start_read_time = MPI_Wtime(); +#endif + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = read_init (fh, index, cycles, fh->f_aggr_list[i], fh->f_rank, + aggr_data[i], read_sync_type, + &req_tmp, use_accelerator_buffer); + if (OMPI_SUCCESS != ret){ + goto exit; + } + if (fh->f_aggr_list[i] == fh->f_rank) { + req_iread = req_tmp; + } + } +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_read_time = MPI_Wtime(); + read_time += end_read_time - start_read_time; +#endif + ret = ompi_request_wait_all ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs, + reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + + if (NOT_AGGR_INDEX != aggr_index) { + ret = ompi_request_wait (&req_iread, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } /* end for (index = 1; index < cycles; index++) */ + + if (cycles > 0) { + for (i = 0; i < fh->f_num_aggrs; i++) { + ret = shuffle_init (index-1, cycles, fh->f_aggr_list[i], fh->f_rank, aggr_data[i], + &reqs[i*(fh->f_procs_per_group + 1)] ); + if (OMPI_SUCCESS != ret) { + goto exit; + } + } + ret = ompi_request_wait_all ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs, + reqs, MPI_STATUS_IGNORE); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + +#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN + end_exch = MPI_Wtime(); + exch_read += end_exch - start_exch; + nentry.time[0] = read_time; + nentry.time[1] = comm_time; + nentry.time[2] = exch_read; + nentry.aggregator = 0; + for ( i=0; if_num_aggrs; i++ ) { + if (fh->f_aggr_list[i] == fh->f_rank) + nentry.aggregator = 1; + } + nentry.nprocs_for_coll = fh->f_num_aggrs; + if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){ + mca_common_ompio_register_print_entry(fh->f_coll_read_time, + nentry); + } +#endif + +exit : + if (NULL != aggr_data) { + + for (i = 0; i < fh->f_num_aggrs; i++) { + if (fh->f_aggr_list[i] == fh->f_rank) { + if (NULL != aggr_data[i]->recvtype){ + for (j = 0; j < aggr_data[i]->procs_per_group; j++) { + if (MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j]) { + ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); + } + if (MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j]) { + ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]); + } + } + free(aggr_data[i]->recvtype); + free(aggr_data[i]->prev_recvtype); + } + + free (aggr_data[i]->disp_index); + free (aggr_data[i]->max_disp_index); + if (use_accelerator_buffer) { + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); + } else { + free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); + } + for (l = 0;l < aggr_data[i]->procs_per_group; l++) { + free (aggr_data[i]->blocklen_per_process[l]); + free (aggr_data[i]->displs_per_process[l]); + } + + free (aggr_data[i]->blocklen_per_process); + free (aggr_data[i]->displs_per_process); + } + free (aggr_data[i]->sorted); + free (aggr_data[i]->global_iov_array); + free (aggr_data[i]->fview_count); + free (aggr_data[i]->decoded_iov); + + free (aggr_data[i]); + } + free (aggr_data); + } + free(displs); + free(decoded_iov); + free(broken_counts); + free(broken_total_lengths); + free(broken_iov_counts); + free(broken_decoded_iovs); // decoded_iov arrays[i] were freed as aggr_data[i]->decoded_iov; + if (NULL != broken_iov_arrays) { + for (i = 0; i < fh->f_num_aggrs; i++) { + free(broken_iov_arrays[i]); + } + } + free(broken_iov_arrays); + free(fh->f_procs_in_group); + free(fh->f_aggr_list); + fh->f_procs_in_group=NULL; + fh->f_procs_per_group=0; + fh->f_aggr_list=NULL; + free(result_counts); + free(reqs); + + return ret; +} + +static int read_init (ompio_file_t *fh, int index, int cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, + int read_syncType, ompi_request_t **request, + bool is_accelerator_buffer) +{ + int ret = OMPI_SUCCESS; + ssize_t ret_temp = 0; + mca_ompio_request_t *ompio_req = NULL; + int i, j, l; + int entries_per_aggregator=0; + mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; + MPI_Aint *memory_displacements=NULL; + int* blocklength_proc=NULL; + ptrdiff_t* displs_proc=NULL; + int *sorted_file_offsets=NULL; + + /********************************************************************** + *** 7a. Getting ready for next cycle: initializing and freeing buffers + **********************************************************************/ + data->bytes_sent = 0; + + if (aggregator == rank) { + if (NULL != data->recvtype){ + for (i = 0; i < data->procs_per_group; i++) { + if (MPI_DATATYPE_NULL != data->recvtype[i]) { + ompi_datatype_destroy(&data->recvtype[i]); + data->recvtype[i] = MPI_DATATYPE_NULL; + } + } + } + + for (l = 0; l < data->procs_per_group; l++) { + data->disp_index[l] = 0; + + if (data->max_disp_index[l] == 0) { + data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int)); + data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint)); + if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ + opal_output (1, "OUT OF MEMORY for displs\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + data->max_disp_index[l] = INIT_LEN; + } else { + memset (data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int)); + memset (data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint)); + } + } + } /* rank == aggregator */ + + /************************************************************************** + *** 7b. Determine the number of bytes to be actually read in this cycle + **************************************************************************/ + int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle); + if (index < (local_cycles -1)) { + data->bytes_to_write_in_cycle = data->bytes_per_cycle; + } else if ( index == (local_cycles -1)) { + data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index; + } else { + data->bytes_to_write_in_cycle = 0; + } + data->bytes_to_write = data->bytes_to_write_in_cycle; + +#if DEBUG_ON + if (aggregator == rank) { + printf ("****%d: CYCLE %d Bytes %d**********\n", + rank, index, data->bytes_to_write_in_cycle); + } +#endif + + /***************************************************************** + *** 7c. Calculate how much data will be sent to each process in + *** this cycle + *****************************************************************/ + mca_fcoll_vulcan_calc_blocklen_disps(data, aggregator, rank, &data->bytes_sent); + + /************************************************************************* + *** 7d. Calculate the displacement + *************************************************************************/ + if (rank == aggregator) { + for (i = 0; i < data->procs_per_group; i++){ + for (j = 0; j < data->disp_index[i]; j++){ + if (data->blocklen_per_process[i][j] > 0) + entries_per_aggregator++ ; + } + } + } +#if DEBUG_ON + if (aggregator == rank) { + printf("%d : Entries per aggregator : %d\n", rank, entries_per_aggregator); + } +#endif + + if (entries_per_aggregator > 0) { + file_offsets_for_agg = (mca_io_ompio_local_io_array *) malloc (entries_per_aggregator + * sizeof(mca_io_ompio_local_io_array)); + memory_displacements = (MPI_Aint *) malloc (entries_per_aggregator * sizeof(MPI_Aint)); + sorted_file_offsets = (int *) malloc (entries_per_aggregator*sizeof(int)); + if (NULL == file_offsets_for_agg || NULL == memory_displacements || + NULL == sorted_file_offsets) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + ret = mca_fcoll_vulcan_calc_file_offsets(data, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, entries_per_aggregator, + rank, index); + if (OMPI_SUCCESS != ret) { + goto exit; + } + + /********************************************************** + *** 7f. Create the io array + *********************************************************/ + fh->f_io_array = (mca_common_ompio_io_array_t *) malloc (entries_per_aggregator + * sizeof (mca_common_ompio_io_array_t)); + if (NULL == fh->f_io_array) { + opal_output(1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + mca_fcoll_vulcan_calc_io_array(fh->f_io_array, &fh->f_num_of_io_entries, entries_per_aggregator, + (char*)data->global_buf, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, rank); + } + + if (rank == aggregator && fh->f_num_of_io_entries) { + mca_common_ompio_request_alloc (&ompio_req, MCA_OMPIO_REQUEST_READ); + + if (1 == read_syncType) { + if (is_accelerator_buffer) { + ret = mca_common_ompio_file_iread_pregen(fh, (ompi_request_t *) ompio_req); + if (0 > ret) { + opal_output (1, "vulcan_read_all: mca_common_ompio_iread_pregen failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } + } else { + ret = fh->f_fbtl->fbtl_ipreadv(fh, (ompi_request_t *) ompio_req); + if (0 > ret) { + opal_output (1, "vulcan_read_all: fbtl_ipreadv failed\n"); + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = 0; + } + } + } + else { + ret_temp = fh->f_fbtl->fbtl_preadv(fh); + if (0 > ret_temp) { + opal_output (1, "vulcan_read_all: fbtl_preadv failed\n"); + ret = ret_temp; + ret_temp = 0; + } + + ompio_req->req_ompi.req_status.MPI_ERROR = ret; + ompio_req->req_ompi.req_status._ucount = ret_temp; + ompi_request_complete (&ompio_req->req_ompi, false); + } + + free(fh->f_io_array); + } + +#if DEBUG_ON + printf("************Cycle: %d, Aggregator: %d ***************\n", + index, rank); + for (i = 0; i < data->procs_per_group; i++) { + for (j = 0; j < data->disp_index[i]; j++) { + if (data->blocklen_per_process[i][j] > 0) { + printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], rank); + } + } + } +#endif + +exit: + free(sorted_file_offsets); + free(file_offsets_for_agg); + free(memory_displacements); + free(blocklength_proc); + free(displs_proc); + + fh->f_io_array = NULL; + fh->f_num_of_io_entries = 0; + + *request = (ompi_request_t *) ompio_req; + return ret; } +static int shuffle_init (int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, + ompi_request_t **reqs) +{ + int i, ret = OMPI_SUCCESS; + int* blocklength_proc=NULL; + ptrdiff_t* displs_proc=NULL; + + /************************************************************************* + *** 7e. Perform the actual communication + *************************************************************************/ + if (aggregator == rank ) { + for (i = 0; i < data->procs_per_group; i++) { + size_t datatype_size; + reqs[i] = MPI_REQUEST_NULL; + if (0 < data->disp_index[i]) { + ompi_datatype_create_hindexed (data->disp_index[i], + data->blocklen_per_process[i], + data->displs_per_process[i], + MPI_BYTE, + &data->recvtype[i]); + ompi_datatype_commit (&data->recvtype[i]); + opal_datatype_type_size (&data->recvtype[i]->super, &datatype_size); + if (datatype_size){ + ret = MCA_PML_CALL(isend(data->global_buf, + 1, data->recvtype[i], + data->procs_in_group[i], + FCOLL_VULCAN_SHUFFLE_TAG+index, + MCA_PML_BASE_SEND_STANDARD, + data->comm, &reqs[i])); + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } + } + // } /* end if (entries_per_aggr > 0 ) */ + }/* end if (aggregator == rank ) */ + + reqs[data->procs_per_group] = MPI_REQUEST_NULL; + if (data->bytes_sent) { + size_t remaining = data->bytes_sent; + int block_index = -1; + int blocklength_size = INIT_LEN; + + ptrdiff_t recv_mem_address = 0; + ompi_datatype_t *newType = MPI_DATATYPE_NULL; + blocklength_proc = (int *) calloc (blocklength_size, sizeof (int)); + displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t)); + + if (NULL == blocklength_proc || NULL == displs_proc ) { + opal_output (1, "OUT OF MEMORY\n"); + ret = OMPI_ERR_OUT_OF_RESOURCE; + goto exit; + } + + while (remaining) { + block_index++; + + if(0 == block_index) { + recv_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position; + } + else { + // Reallocate more memory if blocklength_size is not enough + if(0 == block_index % INIT_LEN) { + blocklength_size += INIT_LEN; + blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int)); + displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t)); + } + displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + + data->current_position - recv_mem_address; + } + + if (remaining >= + (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { + + blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - + data->current_position; + remaining = remaining - (data->decoded_iov[data->iov_index].iov_len - + data->current_position); + data->iov_index = data->iov_index + 1; + data->current_position = 0; + } else { + blocklength_proc[block_index] = remaining; + data->current_position += remaining; + remaining = 0; + } + } + + data->total_bytes_written += data->bytes_sent; + + if (0 <= block_index) { + ompi_datatype_create_hindexed (block_index+1, + blocklength_proc, + displs_proc, + MPI_BYTE, + &newType); + ompi_datatype_commit (&newType); + + ret = MCA_PML_CALL(irecv((char *)recv_mem_address, + 1, + newType, + aggregator, + FCOLL_VULCAN_SHUFFLE_TAG+index, + data->comm, + &reqs[data->procs_per_group])); + if (MPI_DATATYPE_NULL != newType) { + ompi_datatype_destroy(&newType); + } + if (OMPI_SUCCESS != ret){ + goto exit; + } + } + } +exit: + return ret; +} diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c index 5f89fba8d01..b6e9be6d2ca 100644 --- a/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_file_write_all.c @@ -1,3 +1,4 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ /* * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana * University Research and Technology @@ -25,6 +26,7 @@ #include "ompi_config.h" #include "fcoll_vulcan.h" +#include "fcoll_vulcan_internal.h" #include "mpi.h" #include "ompi/constants.h" @@ -42,85 +44,16 @@ #define DEBUG_ON 0 #define NOT_AGGR_INDEX -1 -/*Used for loading file-offsets per aggregator*/ -typedef struct mca_io_ompio_local_io_array{ - OMPI_MPI_OFFSET_TYPE offset; - MPI_Aint length; - int process_id; -}mca_io_ompio_local_io_array; - -typedef struct mca_io_ompio_aggregator_data { - int *disp_index, *sorted, n; - size_t *fview_count; - int *max_disp_index; - int **blocklen_per_process; - MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; - MPI_Comm comm; - char *buf, *global_buf, *prev_global_buf; - ompi_datatype_t **recvtype, **prev_recvtype; - struct iovec *global_iov_array; - int current_index, current_position; - int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; - int *procs_in_group, iov_index; - int bytes_sent, prev_bytes_sent; - struct iovec *decoded_iov; - int bytes_to_write, prev_bytes_to_write; - mca_common_ompio_io_array_t *io_array, *prev_io_array; - int num_io_entries, prev_num_io_entries; -} mca_io_ompio_aggregator_data; - - -#define SWAP_REQUESTS(_r1,_r2) { \ - ompi_request_t **_t=_r1; \ - _r1=_r2; \ - _r2=_t;} - -#define SWAP_AGGR_POINTERS(_aggr,_num) { \ - int _i; \ - char *_t; \ - for (_i=0; _i<_num; _i++ ) { \ - _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ - _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ - _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ - _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ - _t=_aggr[_i]->prev_global_buf; \ - _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ - _aggr[_i]->global_buf=_t; \ - _t=(char *)_aggr[_i]->recvtype; \ - _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ - _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ -} -static int shuffle_init ( int index, int cycles, int aggregator, int rank, - mca_io_ompio_aggregator_data *data, - ompi_request_t **reqs ); +static int shuffle_init (int index, int num_cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs); + static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, - int write_chunksize, int write_synchType, ompi_request_t **request, + int write_syncType, ompi_request_t **request, bool is_accelerator_buffer); -int mca_fcoll_vulcan_break_file_view ( struct iovec *decoded_iov, int iov_count, - struct iovec *local_iov_array, int local_count, - struct iovec ***broken_decoded_iovs, int **broken_iov_counts, - struct iovec ***broken_iov_arrays, int **broken_counts, - MPI_Aint **broken_total_lengths, - int stripe_count, size_t stripe_size); - - -int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, - int num_groups, size_t max_data); - static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, - int *sorted); - -int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *work_array, - int num_entries, int *last_array_pos, int *last_pos_in_field, - int chunk_size ); - - -static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, - long *new_stripe_size); - + int num_entries, int *sorted); int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, const void *buf, @@ -143,7 +76,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, ptrdiff_t *displs = NULL; int vulcan_num_io_procs; size_t max_data = 0; - MPI_Aint *total_bytes_per_process = NULL; struct iovec **broken_iov_arrays=NULL; struct iovec **broken_decoded_iovs=NULL; @@ -153,7 +85,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, int aggr_index = NOT_AGGR_INDEX; int write_synch_type = 2; - int write_chunksize, *result_counts=NULL; + int *result_counts=NULL; ompi_count_array_t fview_count_desc; ompi_disp_array_t displs_desc; @@ -186,13 +118,12 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, mca_common_ompio_check_gpu_buf (fh, buf, &is_gpu, &is_managed); if (is_gpu && !is_managed && - fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { - use_accelerator_buffer = true; + fh->f_get_mca_parameter_value ("use_accelerator_buffers", strlen("use_accelerator_buffers"))) { + use_accelerator_buffer = true; } /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what the user requested */ bytes_per_cycle =bytes_per_cycle/2; - write_chunksize = bytes_per_cycle; ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *) fh, datatype, @@ -207,14 +138,15 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, } if ( MPI_STATUS_IGNORE != status ) { - status->_ucount = max_data; + status->_ucount = max_data; } - - ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, mca_fcoll_vulcan_num_groups, max_data); + ret = mca_fcoll_vulcan_get_configuration (fh, vulcan_num_io_procs, max_data); if (OMPI_SUCCESS != ret){ - goto exit; + goto exit; } + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Using %d aggregators for the write_all operation \n", fh->f_num_aggrs); aggr_data = (mca_io_ompio_aggregator_data **) malloc ( fh->f_num_aggrs * sizeof(mca_io_ompio_aggregator_data*)); @@ -227,7 +159,6 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, aggr_data[i]->procs_per_group = fh->f_procs_per_group; aggr_data[i]->procs_in_group = fh->f_procs_in_group; aggr_data[i]->comm = fh->f_comm; - aggr_data[i]->buf = (char *)buf; // should not be used in the new version. // Identify if the process is an aggregator. // If so, aggr_index would be its index in "aggr_data" and "aggregators" arrays. if(fh->f_aggr_list[i] == fh->f_rank) { @@ -240,11 +171,11 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, *** this write operation ********************************************************************/ ret = fh->f_generate_current_file_view( (struct ompio_file_t *) fh, - max_data, - &local_iov_array, - &local_count); + max_data, + &local_iov_array, + &local_count); if (ret != OMPI_SUCCESS){ - goto exit; + goto exit; } /************************************************************************* @@ -270,52 +201,15 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, - broken_total_lengths, - fh->f_num_aggrs, - MPI_LONG, - MPI_SUM, - fh->f_comm, - fh->f_comm->c_coll->coll_allreduce_module); - if( OMPI_SUCCESS != ret){ - goto exit; - } - - } - else { - total_bytes_per_process = (MPI_Aint*)malloc - (fh->f_num_aggrs * fh->f_procs_per_group*sizeof(MPI_Aint)); - if (NULL == total_bytes_per_process) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - ret = ompi_fcoll_base_coll_allgather_array (broken_total_lengths, - fh->f_num_aggrs, - MPI_LONG, - total_bytes_per_process, - fh->f_num_aggrs, - MPI_LONG, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - if( OMPI_SUCCESS != ret){ - goto exit; - } - - for ( i=0; if_num_aggrs; i++ ) { - broken_total_lengths[i] = 0; - for (j=0 ; jf_procs_per_group ; j++) { - broken_total_lengths[i] += total_bytes_per_process[j*fh->f_num_aggrs + i]; - } - } - if (NULL != total_bytes_per_process) { - free (total_bytes_per_process); - total_bytes_per_process = NULL; - } + ret = fh->f_comm->c_coll->coll_allreduce (MPI_IN_PLACE, + broken_total_lengths, + fh->f_num_aggrs, + MPI_LONG, + MPI_SUM, + fh->f_comm, + fh->f_comm->c_coll->coll_allreduce_module); + if( OMPI_SUCCESS != ret){ + goto exit; } #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN @@ -342,28 +236,14 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - ret = fh->f_comm->c_coll->coll_allgather(broken_counts, - fh->f_num_aggrs, - MPI_INT, - result_counts, - fh->f_num_aggrs, - MPI_INT, - fh->f_comm, - fh->f_comm->c_coll->coll_allgather_module); - } - else { - ret = ompi_fcoll_base_coll_allgather_array (broken_counts, - fh->f_num_aggrs, - MPI_INT, - result_counts, - fh->f_num_aggrs, - MPI_INT, - 0, - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - } + ret = fh->f_comm->c_coll->coll_allgather(broken_counts, + fh->f_num_aggrs, + MPI_INT, + result_counts, + fh->f_num_aggrs, + MPI_INT, + fh->f_comm, + fh->f_comm->c_coll->coll_allgather_module); if( OMPI_SUCCESS != ret){ goto exit; } @@ -428,32 +308,17 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_comm_time = MPI_Wtime(); #endif - if ( 1 == mca_fcoll_vulcan_num_groups ) { - OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); - OMPI_DISP_ARRAY_INIT(&displs_desc, displs); - ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - fview_count_desc, - displs_desc, - fh->f_iov_type, - fh->f_comm, - fh->f_comm->c_coll->coll_allgatherv_module ); - } - else { - ret = ompi_fcoll_base_coll_allgatherv_array (broken_iov_arrays[i], - broken_counts[i], - fh->f_iov_type, - aggr_data[i]->global_iov_array, - aggr_data[i]->fview_count, - displs, - fh->f_iov_type, - fh->f_aggr_list[i], - fh->f_procs_in_group, - fh->f_procs_per_group, - fh->f_comm); - } + OMPI_COUNT_ARRAY_INIT(&fview_count_desc, aggr_data[i]->fview_count); + OMPI_DISP_ARRAY_INIT(&displs_desc, displs); + ret = fh->f_comm->c_coll->coll_allgatherv (broken_iov_arrays[i], + broken_counts[i], + fh->f_iov_type, + aggr_data[i]->global_iov_array, + fview_count_desc, + displs_desc, + fh->f_iov_type, + fh->f_comm, + fh->f_comm->c_coll->coll_allgatherv_module ); if (OMPI_SUCCESS != ret){ goto exit; } @@ -539,8 +404,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, } if (use_accelerator_buffer) { - opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, - "Allocating GPU device buffer for aggregation\n"); + opal_output_verbose(10, ompi_fcoll_base_framework.framework_output, + "Allocating GPU device buffer for aggregation\n"); ret = opal_accelerator.mem_alloc(MCA_ACCELERATOR_NO_DEVICE_ID, (void**)&aggr_data[i]->global_buf, bytes_per_cycle); if (OPAL_SUCCESS != ret) { @@ -583,10 +448,9 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN start_exch = MPI_Wtime(); #endif - } + } reqs = (ompi_request_t **)malloc ((fh->f_procs_per_group + 1 )*fh->f_num_aggrs *sizeof(ompi_request_t *)); - if ( NULL == reqs ) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; @@ -632,7 +496,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); + write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -672,7 +536,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, start_write_time = MPI_Wtime(); #endif ret = write_init (fh, fh->f_aggr_list[aggr_index], aggr_data[aggr_index], - write_chunksize, write_synch_type, &req_iwrite, use_accelerator_buffer); + write_synch_type, &req_iwrite, use_accelerator_buffer); if (OMPI_SUCCESS != ret){ goto exit; } @@ -699,7 +563,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, nentry.aggregator = 0; for ( i=0; if_num_aggrs; i++ ) { if (fh->f_aggr_list[i] == fh->f_rank) - nentry.aggregator = 1; + nentry.aggregator = 1; } nentry.nprocs_for_coll = fh->f_num_aggrs; if (!mca_common_ompio_full_print_queue(fh->f_coll_write_time)){ @@ -707,15 +571,13 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh, nentry); } #endif - - + exit : - + if ( NULL != aggr_data ) { - - for ( i=0; i< fh->f_num_aggrs; i++ ) { + for ( i=0; i< fh->f_num_aggrs; i++ ) { if (fh->f_aggr_list[i] == fh->f_rank) { - if (NULL != aggr_data[i]->recvtype){ + if (NULL != aggr_data[i]->recvtype) { for (j =0; j< aggr_data[i]->procs_per_group; j++) { if ( MPI_DATATYPE_NULL != aggr_data[i]->recvtype[j] ) { ompi_datatype_destroy(&aggr_data[i]->recvtype[j]); @@ -723,26 +585,25 @@ exit : if ( MPI_DATATYPE_NULL != aggr_data[i]->prev_recvtype[j] ) { ompi_datatype_destroy(&aggr_data[i]->prev_recvtype[j]); } - } free(aggr_data[i]->recvtype); free(aggr_data[i]->prev_recvtype); } - + free (aggr_data[i]->disp_index); free (aggr_data[i]->max_disp_index); - if (use_accelerator_buffer) { - opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); - opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); - } else { - free (aggr_data[i]->global_buf); - free (aggr_data[i]->prev_global_buf); - } + if (use_accelerator_buffer) { + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->global_buf); + opal_accelerator.mem_release(MCA_ACCELERATOR_NO_DEVICE_ID, aggr_data[i]->prev_global_buf); + } else { + free (aggr_data[i]->global_buf); + free (aggr_data[i]->prev_global_buf); + } for(l=0;lprocs_per_group;l++){ free (aggr_data[i]->blocklen_per_process[l]); free (aggr_data[i]->displs_per_process[l]); } - + free (aggr_data[i]->blocklen_per_process); free (aggr_data[i]->displs_per_process); } @@ -750,7 +611,6 @@ exit : free (aggr_data[i]->global_iov_array); free (aggr_data[i]->fview_count); free (aggr_data[i]->decoded_iov); - free (aggr_data[i]); } free (aggr_data); @@ -774,37 +634,40 @@ exit : fh->f_aggr_list=NULL; free(result_counts); free(reqs); - + return OMPI_SUCCESS; } static int write_init (ompio_file_t *fh, int aggregator, mca_io_ompio_aggregator_data *aggr_data, - int write_chunksize, - int write_synchType, + int write_syncType, ompi_request_t **request, bool is_accelerator_buffer) { int ret = OMPI_SUCCESS; ssize_t ret_temp = 0; - int last_array_pos = 0; - int last_pos = 0; + int i; mca_ompio_request_t *ompio_req = NULL; mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_WRITE ); if (aggr_data->prev_num_io_entries) { - /* In this case, aggr_data->prev_num_io_entries is always == 1. - Therefore we can write the data of size aggr_data->prev_bytes_to_write in one iteration. - In fact, aggr_data->prev_bytes_to_write <= write_chunksize. - */ - mca_fcoll_vulcan_split_iov_array (fh, aggr_data->prev_io_array, - aggr_data->prev_num_io_entries, - &last_array_pos, &last_pos, - write_chunksize); + fh->f_num_of_io_entries = aggr_data->prev_num_io_entries; + fh->f_io_array = (mca_common_ompio_io_array_t *) malloc (fh->f_num_of_io_entries * + sizeof(mca_common_ompio_io_array_t)); + if ( NULL == fh->f_io_array ){ + opal_output (1,"Could not allocate memory\n"); + return -1; + } + + for (i = 0; i < fh->f_num_of_io_entries; i++) { + fh->f_io_array[i].memory_address = aggr_data->prev_io_array[i].memory_address; + fh->f_io_array[i].offset = aggr_data->prev_io_array[i].offset; + fh->f_io_array[i].length = aggr_data->prev_io_array[i].length; + } - if (1 == write_synchType) { + if (1 == write_syncType) { if (is_accelerator_buffer) { ret = mca_common_ompio_file_iwrite_pregen(fh, (ompi_request_t *) ompio_req); if(0 > ret) { @@ -853,21 +716,16 @@ static int write_init (ompio_file_t *fh, return ret; } -static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_io_ompio_aggregator_data *data, - ompi_request_t **reqs ) +static int shuffle_init (int index, int num_cycles, int aggregator, int rank, + mca_io_ompio_aggregator_data *data, ompi_request_t **reqs) { - int bytes_sent = 0; - int blocks=0, temp_pindex; - int i, j, l, ret; - int entries_per_aggregator=0; + size_t bytes_sent = 0; + int i, j, l; + int ret = OMPI_SUCCESS; + int entries_per_aggregator = 0; mca_io_ompio_local_io_array *file_offsets_for_agg=NULL; int *sorted_file_offsets=NULL; - int temp_index=0; MPI_Aint *memory_displacements=NULL; - int *temp_disp_index=NULL; -#if DEBUG_ON - MPI_Aint global_count = 0; -#endif int* blocklength_proc=NULL; ptrdiff_t* displs_proc=NULL; @@ -879,21 +737,19 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i *** 7a. Getting ready for next cycle: initializing and freeing buffers **********************************************************************/ if (aggregator == rank) { - if (NULL != data->recvtype){ for (i =0; i< data->procs_per_group; i++) { - if ( MPI_DATATYPE_NULL != data->recvtype[i] ) { + if (MPI_DATATYPE_NULL != data->recvtype[i]) { ompi_datatype_destroy(&data->recvtype[i]); data->recvtype[i] = MPI_DATATYPE_NULL; } } } - - for(l=0;lprocs_per_group;l++){ + for(l = 0; l < data->procs_per_group; l++){ data->disp_index[l] = 0; - - if ( data->max_disp_index[l] == 0 ) { + + if (data->max_disp_index[l] == 0) { data->blocklen_per_process[l] = (int *) calloc (INIT_LEN, sizeof(int)); data->displs_per_process[l] = (MPI_Aint *) calloc (INIT_LEN, sizeof(MPI_Aint)); if (NULL == data->displs_per_process[l] || NULL == data->blocklen_per_process[l]){ @@ -902,25 +758,22 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i goto exit; } data->max_disp_index[l] = INIT_LEN; - } - else { - memset ( data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int) ); - memset ( data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint) ); + } else { + memset (data->blocklen_per_process[l], 0, data->max_disp_index[l]*sizeof(int)); + memset (data->displs_per_process[l], 0, data->max_disp_index[l]*sizeof(MPI_Aint)); } } } /* (aggregator == rank */ - + /************************************************************************** *** 7b. Determine the number of bytes to be actually written in this cycle **************************************************************************/ int local_cycles= ceil((double)data->total_bytes / data->bytes_per_cycle); - if ( index < (local_cycles -1) ) { + if (index < (local_cycles -1)) { data->bytes_to_write_in_cycle = data->bytes_per_cycle; - } - else if ( index == (local_cycles -1)) { + } else if (index == (local_cycles -1)) { data->bytes_to_write_in_cycle = data->total_bytes - data->bytes_per_cycle*index ; - } - else { + } else { data->bytes_to_write_in_cycle = 0; } data->bytes_to_write = data->bytes_to_write_in_cycle; @@ -928,309 +781,57 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i #if DEBUG_ON if (aggregator == rank) { printf ("****%d: CYCLE %d Bytes %lld**********\n", - rank, - index, - data->bytes_to_write_in_cycle); + rank, index, data->bytes_to_write_in_cycle); } #endif - /********************************************************** - **Gather the Data from all the processes at the writers ** - *********************************************************/ - -#if DEBUG_ON - printf("bytes_to_write_in_cycle: %ld, cycle : %d\n", data->bytes_to_write_in_cycle, - index); -#endif - + /***************************************************************** *** 7c. Calculate how much data will be contributed in this cycle *** by each process *****************************************************************/ - - /* The blocklen and displs calculation only done at aggregators!*/ - while (data->bytes_to_write_in_cycle) { + mca_fcoll_vulcan_calc_blocklen_disps (data, aggregator, rank, &bytes_sent); - /* This next block identifies which process is the holder - ** of the sorted[current_index] element; - */ - blocks = data->fview_count[0]; - for (j=0 ; jprocs_per_group ; j++) { - if (data->sorted[data->current_index] < blocks) { - data->n = j; - break; - } - else { - blocks += data->fview_count[j+1]; - } - } - - if (data->bytes_remaining) { - /* Finish up a partially used buffer from the previous cycle */ - - if (data->bytes_remaining <= data->bytes_to_write_in_cycle) { - /* The data fits completely into the block */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + - (data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_remaining); - - data->disp_index[data->n] += 1; - - /* In this cases the length is consumed so allocating for - next displacement and blocklength*/ - if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { - data->max_disp_index[data->n] *= 2; - - data->blocklen_per_process[data->n] = (int *) realloc - ((void *)data->blocklen_per_process[data->n], - (data->max_disp_index[data->n])*sizeof(int)); - data->displs_per_process[data->n] = (MPI_Aint *) realloc - ((void *)data->displs_per_process[data->n], - (data->max_disp_index[data->n])*sizeof(MPI_Aint)); - } - data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; - data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_remaining; - } - data->current_index ++; - data->bytes_to_write_in_cycle -= data->bytes_remaining; - data->bytes_remaining = 0; - } - else { - /* the remaining data from the previous cycle is larger than the - data->bytes_to_write_in_cycle, so we have to segment again */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + - (data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_remaining); - data->disp_index[data->n] += 1; - } - - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_to_write_in_cycle; - } - data->bytes_remaining -= data->bytes_to_write_in_cycle; - data->bytes_to_write_in_cycle = 0; - break; - } - } - else { - /* No partially used entry available, have to start a new one */ - if (data->bytes_to_write_in_cycle < - (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) { - /* This entry has more data than we can sendin one cycle */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; - data->displs_per_process[data->n][data->disp_index[data->n]] = - (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ; - data->disp_index[data->n] += 1; - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->bytes_to_write_in_cycle; - - } - data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len - - data->bytes_to_write_in_cycle; - data->bytes_to_write_in_cycle = 0; - break; - } - else { - /* Next data entry is less than data->bytes_to_write_in_cycle */ - if (aggregator == rank) { - data->blocklen_per_process[data->n][data->disp_index[data->n]] = - data->global_iov_array[data->sorted[data->current_index]].iov_len; - data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t) - data->global_iov_array[data->sorted[data->current_index]].iov_base; - - data->disp_index[data->n] += 1; - - /*realloc for next blocklength - and assign this displacement and check for next displs as - the total length of this entry has been consumed!*/ - if ( data->disp_index[data->n] == data->max_disp_index[data->n] ) { - data->max_disp_index[data->n] *=2 ; - data->blocklen_per_process[data->n] = (int *) realloc ( - (void *)data->blocklen_per_process[data->n], - (data->max_disp_index[data->n]*sizeof(int))); - data->displs_per_process[data->n] = (MPI_Aint *)realloc ( - (void *)data->displs_per_process[data->n], - (data->max_disp_index[data->n]*sizeof(MPI_Aint))); - } - data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; - data->displs_per_process[data->n][data->disp_index[data->n]] = 0; - } - if (data->procs_in_group[data->n] == rank) { - bytes_sent += data->global_iov_array[data->sorted[data->current_index]].iov_len; - } - data->bytes_to_write_in_cycle -= - data->global_iov_array[data->sorted[data->current_index]].iov_len; - data->current_index ++; - } - } - } - - /************************************************************************* - *** 7d. Calculate the displacement on where to put the data and allocate - *** the receive buffer (global_buf) + *** 7d. Calculate the displacement on where to put the data *************************************************************************/ if (aggregator == rank) { entries_per_aggregator=0; - for (i=0;iprocs_per_group; i++){ - for (j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0) + for (i = 0; i < data->procs_per_group; i++){ + for (j = 0; j < data->disp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0) { entries_per_aggregator++ ; + } } } - #if DEBUG_ON - printf("%d: cycle: %d, bytes_sent: %d\n ",rank,index, - bytes_sent); - printf("%d : Entries per aggregator : %d\n",rank,entries_per_aggregator); + printf("%d : Entries per aggregator : %d\n", rank, entries_per_aggregator); #endif - - if (entries_per_aggregator > 0){ - file_offsets_for_agg = (mca_io_ompio_local_io_array *) - malloc(entries_per_aggregator*sizeof(mca_io_ompio_local_io_array)); - if (NULL == file_offsets_for_agg) { + + if (entries_per_aggregator > 0) { + file_offsets_for_agg = (mca_io_ompio_local_io_array *) malloc(entries_per_aggregator * + sizeof(mca_io_ompio_local_io_array)); + memory_displacements = (MPI_Aint *) malloc (entries_per_aggregator * sizeof(MPI_Aint)); + sorted_file_offsets = (int *) malloc (entries_per_aggregator*sizeof(int)); + if (NULL == memory_displacements || NULL == file_offsets_for_agg || + NULL == sorted_file_offsets) { opal_output (1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - - sorted_file_offsets = (int *) - malloc (entries_per_aggregator*sizeof(int)); - if (NULL == sorted_file_offsets){ - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; - goto exit; - } - - /*Moving file offsets to an IO array!*/ - temp_index = 0; - - for (i=0;iprocs_per_group; i++){ - for(j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0){ - file_offsets_for_agg[temp_index].length = - data->blocklen_per_process[i][j]; - file_offsets_for_agg[temp_index].process_id = i; - file_offsets_for_agg[temp_index].offset = - data->displs_per_process[i][j]; - temp_index++; - -#if DEBUG_ON - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - - printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", - data->procs_in_group[i],j, - data->blocklen_per_process[i][j],j, - data->displs_per_process[i][j], - rank); -#endif - } - } - } - - /* Sort the displacements for each aggregator*/ - local_heap_sort (file_offsets_for_agg, - entries_per_aggregator, - sorted_file_offsets); - - /*create contiguous memory displacements - based on blocklens on the same displs array - and map it to this aggregator's actual - file-displacements (this is in the io-array created above)*/ - memory_displacements = (MPI_Aint *) malloc - (entries_per_aggregator * sizeof(MPI_Aint)); - - memory_displacements[sorted_file_offsets[0]] = 0; - for (i=1; iprocs_per_group * sizeof (int)); - if (NULL == temp_disp_index) { - opal_output (1, "OUT OF MEMORY\n"); - ret = OMPI_ERR_OUT_OF_RESOURCE; + + ret = mca_fcoll_vulcan_calc_file_offsets(data, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, entries_per_aggregator, + rank, index); + if (OMPI_SUCCESS != ret) { goto exit; } - - /*Now update the displacements array with memory offsets*/ -#if DEBUG_ON - global_count = 0; -#endif - for (i=0;idispls_per_process[temp_pindex][temp_disp_index[temp_pindex]] = - memory_displacements[sorted_file_offsets[i]]; - if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) - temp_disp_index[temp_pindex] += 1; - else{ - printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", - temp_pindex, temp_disp_index[temp_pindex], - temp_pindex, data->disp_index[temp_pindex]); - } -#if DEBUG_ON - global_count += - file_offsets_for_agg[sorted_file_offsets[i]].length; -#endif - } - - if (NULL != temp_disp_index){ - free(temp_disp_index); - temp_disp_index = NULL; - } - -#if DEBUG_ON - - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0;iprocs_per_group; i++){ - for(j=0;jdisp_index[i];j++){ - if (data->blocklen_per_process[i][j] > 0){ - printf("%d sends blocklen[%d]: %d, disp[%d]: %ld to %d\n", - data->procs_in_group[i],j, - data->blocklen_per_process[i][j],j, - data->displs_per_process[i][j], - rank); - - } - } - } - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0; iprocs_per_group; i++) { + for (i = 0; i < data->procs_per_group; i++) { size_t datatype_size; reqs[i] = MPI_REQUEST_NULL; - if ( 0 < data->disp_index[i] ) { + if (0 < data->disp_index[i]) { ompi_datatype_create_hindexed(data->disp_index[i], data->blocklen_per_process[i], data->displs_per_process[i], @@ -1239,7 +840,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i ompi_datatype_commit(&data->recvtype[i]); opal_datatype_type_size(&data->recvtype[i]->super, &datatype_size); - if (datatype_size){ + if (datatype_size) { ret = MCA_PML_CALL(irecv(data->global_buf, 1, data->recvtype[i], @@ -1278,8 +879,7 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i if(0 == block_index) { send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) + data->current_position; - } - else { + } else { // Reallocate more memory if blocklength_size is not enough if(0 == block_index % INIT_LEN) { blocklength_size += INIT_LEN; @@ -1290,17 +890,14 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i data->current_position - send_mem_address; } - if (remaining >= - (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { - + if (remaining >= (data->decoded_iov[data->iov_index].iov_len - data->current_position)) { blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len - data->current_position; remaining = remaining - (data->decoded_iov[data->iov_index].iov_len - data->current_position); data->iov_index = data->iov_index + 1; data->current_position = 0; - } - else { + } else { blocklength_proc[block_index] = remaining; data->current_position += remaining; remaining = 0; @@ -1335,77 +932,23 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i } } -#if DEBUG_ON - if (aggregator == rank){ - printf("************Cycle: %d, Aggregator: %d ***************\n", - index+1,rank); - for (i=0 ; iglobal_buf)[i]); - } -#endif - -//#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN -// end_comm_time = MPI_Wtime(); -// comm_time += (end_comm_time - start_comm_time); -//#endif /********************************************************** *** 7f. Create the io array, and pass it to fbtl *********************************************************/ - - if (aggregator == rank && entries_per_aggregator>0) { - - - data->io_array = (mca_common_ompio_io_array_t *) malloc - (entries_per_aggregator * sizeof (mca_common_ompio_io_array_t)); + if (aggregator == rank && entries_per_aggregator > 0) { + data->io_array = (mca_common_ompio_io_array_t *) malloc (entries_per_aggregator * + sizeof (mca_common_ompio_io_array_t)); if (NULL == data->io_array) { opal_output(1, "OUT OF MEMORY\n"); ret = OMPI_ERR_OUT_OF_RESOURCE; goto exit; } - - data->num_io_entries = 0; - /*First entry for every aggregator*/ - data->io_array[0].offset = - (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[0]].offset; - data->io_array[0].length = - file_offsets_for_agg[sorted_file_offsets[0]].length; - data->io_array[0].memory_address = - data->global_buf+memory_displacements[sorted_file_offsets[0]]; - data->num_io_entries++; - - for (i=1;iio_array[data->num_io_entries - 1].length += - file_offsets_for_agg[sorted_file_offsets[i]].length; - } - else { - data->io_array[data->num_io_entries].offset = - (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_file_offsets[i]].offset; - data->io_array[data->num_io_entries].length = - file_offsets_for_agg[sorted_file_offsets[i]].length; - data->io_array[data->num_io_entries].memory_address = - data->global_buf+memory_displacements[sorted_file_offsets[i]]; - data->num_io_entries++; - } - - } - -#if DEBUG_ON - printf("*************************** %d\n", num_of_io_entries); - for (i=0 ; iio_array, &data->num_io_entries, entries_per_aggregator, + (char*)data->global_buf, file_offsets_for_agg, sorted_file_offsets, + memory_displacements, rank); } - + exit: free(sorted_file_offsets); free(file_offsets_for_agg); @@ -1413,10 +956,10 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i free(blocklength_proc); free(displs_proc); - return OMPI_SUCCESS; + return ret; } -static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, long *new_stripe_size) +int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, int num_aggregators, long *new_stripe_size) { long min, max, globalmin, globalmax; long stripe_size; @@ -1430,12 +973,10 @@ static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int io max = 0; } fh->f_comm->c_coll->coll_allreduce ( &min, &globalmin, 1, MPI_LONG, MPI_MIN, - fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); + fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); fh->f_comm->c_coll->coll_allreduce ( &max, &globalmax, 1, MPI_LONG, MPI_MAX, - fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); - - // if ( fh->f_rank < 10 ) printf("[%d]: min=%ld max=%ld globalmin=%ld, globalmax=%ld num_aggregators=%d\n", fh->f_rank, min, max, globalmin, globalmax, num_aggregators); + fh->f_comm, fh->f_comm->c_coll->coll_allreduce_module); stripe_size = (globalmax - globalmin)/num_aggregators; if ( (globalmax - globalmin) % num_aggregators ) { @@ -1443,13 +984,9 @@ static int mca_fcoll_vulcan_minmax ( ompio_file_t *fh, struct iovec *iov, int io } *new_stripe_size = stripe_size; - // if ( fh->f_rank == 0 ) - // printf(" partition size is %ld\n", stripe_size); return OMPI_SUCCESS; } - - int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count, struct iovec *file_iov, int file_count, @@ -1678,21 +1215,11 @@ int mca_fcoll_vulcan_break_file_view ( struct iovec *mem_iov, int mem_count, return ret; } - -int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int num_groups, - size_t max_data) +int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, size_t max_data) { int i, ret; - ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data); - /* Note: as of this version of the vulcan component, we are not using yet - the num_groups parameter to split the aggregators (and processes) into - distinct subgroups. This will however hopefullty be done in a second step - as well, allowing to keep communication just to individual subgroups of processes, - each subgroup using however the classic two-phase collective I/O algorithm - with multiple aggregators and even partitioning internally. - - For now, logically all processes are in a single group. */ + ret = mca_common_ompio_set_aggregator_props (fh, num_io_procs, max_data); fh->f_procs_per_group = fh->f_size; if ( NULL != fh->f_procs_in_group ) { @@ -1708,63 +1235,269 @@ int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, int return ret; } - -int mca_fcoll_vulcan_split_iov_array ( ompio_file_t *fh, mca_common_ompio_io_array_t *io_array, int num_entries, - int *ret_array_pos, int *ret_pos, int chunk_size ) +void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, + int rank, size_t *bytes_comm) { + size_t bytes_tmp = *bytes_comm; + int blocks = 0; + int j; - int array_pos = *ret_array_pos; - int pos = *ret_pos; - size_t bytes_written = 0; - size_t bytes_to_write = chunk_size; + /* The blocklen and displs calculation only done at aggregators */ + while (data->bytes_to_write_in_cycle) { - if ( 0 == array_pos && 0 == pos ) { - fh->f_io_array = (mca_common_ompio_io_array_t *) malloc ( num_entries * sizeof(mca_common_ompio_io_array_t)); - if ( NULL == fh->f_io_array ){ - opal_output (1,"Could not allocate memory\n"); - return -1; + /* This next block identifies which process is the holder + ** of the sorted[current_index] element; + */ + blocks = data->fview_count[0]; + for (j = 0 ; j < data->procs_per_group ; j++) { + if (data->sorted[data->current_index] < blocks) { + data->n = j; + break; + } else { + blocks += data->fview_count[j+1]; + } + } + + if (data->bytes_remaining) { + /* Finish up a partially used buffer from the previous cycle */ + + if (data->bytes_remaining <= data->bytes_to_write_in_cycle) { + /* The data fits completely into the block */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_remaining; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + + data->disp_index[data->n] += 1; + + /* In this cases the length is consumed so allocating for + next displacement and blocklength*/ + if (data->disp_index[data->n] == data->max_disp_index[data->n]) { + data->max_disp_index[data->n] *= 2; + + data->blocklen_per_process[data->n] = (int *) realloc + ((void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(int)); + data->displs_per_process[data->n] = (MPI_Aint *) realloc + ((void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n])*sizeof(MPI_Aint)); + } + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_remaining; + } + data->current_index ++; + data->bytes_to_write_in_cycle -= data->bytes_remaining; + data->bytes_remaining = 0; + } else { + /* the remaining data from the previous cycle is larger than the + data->bytes_to_write_in_cycle, so we have to segment again */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base + + (data->global_iov_array[data->sorted[data->current_index]].iov_len + - data->bytes_remaining); + data->disp_index[data->n] += 1; + } + + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_to_write_in_cycle; + } + data->bytes_remaining -= data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } + } else { + /* No partially used entry available, have to start a new one */ + if (data->bytes_to_write_in_cycle < + (MPI_Aint) data->global_iov_array[data->sorted[data->current_index]].iov_len) { + /* This entry has more data than we can sendin one cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = data->bytes_to_write_in_cycle; + data->displs_per_process[data->n][data->disp_index[data->n]] = + (ptrdiff_t)data->global_iov_array[data->sorted[data->current_index]].iov_base ; + data->disp_index[data->n] += 1; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->bytes_to_write_in_cycle; + } + data->bytes_remaining = data->global_iov_array[data->sorted[data->current_index]].iov_len - + data->bytes_to_write_in_cycle; + data->bytes_to_write_in_cycle = 0; + break; + } else { + /* Next data entry is less than data->bytes_to_write_in_cycle */ + if (aggregator == rank) { + data->blocklen_per_process[data->n][data->disp_index[data->n]] = + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->displs_per_process[data->n][data->disp_index[data->n]] = (ptrdiff_t) + data->global_iov_array[data->sorted[data->current_index]].iov_base; + + data->disp_index[data->n] += 1; + + /* realloc for next blocklength and assign this displacement + ** and check for next displs as the total length of this entry + ** has been consumed */ + if (data->disp_index[data->n] == data->max_disp_index[data->n] ) { + data->max_disp_index[data->n] *=2 ; + data->blocklen_per_process[data->n] = (int *) realloc ( + (void *)data->blocklen_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(int))); + data->displs_per_process[data->n] = (MPI_Aint *)realloc ( + (void *)data->displs_per_process[data->n], + (data->max_disp_index[data->n]*sizeof(MPI_Aint))); + } + data->blocklen_per_process[data->n][data->disp_index[data->n]] = 0; + data->displs_per_process[data->n][data->disp_index[data->n]] = 0; + } + if (data->procs_in_group[data->n] == rank) { + bytes_tmp += data->global_iov_array[data->sorted[data->current_index]].iov_len; + } + data->bytes_to_write_in_cycle -= + data->global_iov_array[data->sorted[data->current_index]].iov_len; + data->current_index ++; + } } } - - int i=0; - while (bytes_to_write > 0 ) { - fh->f_io_array[i].memory_address = &(((char *)io_array[array_pos].memory_address)[pos]); - fh->f_io_array[i].offset = &(((char *)io_array[array_pos].offset)[pos]); - if ( (io_array[array_pos].length - pos ) >= bytes_to_write ) { - fh->f_io_array[i].length = bytes_to_write; + *bytes_comm = bytes_tmp; +} + +int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_file_offsets, MPI_Aint *memory_displacements, int entries_per_aggregator, + int rank, int index) +{ + int *temp_disp_index; + int temp_index = 0; + int temp_pindex; + int i, j; + + /* Moving file offsets to an IO array */ + for (i = 0; i < data->procs_per_group; i++){ + for(j = 0; j < data->disp_index[i];j++){ + if (data->blocklen_per_process[i][j] > 0){ + file_offsets_for_agg[temp_index].length = + data->blocklen_per_process[i][j]; + file_offsets_for_agg[temp_index].process_id = i; + file_offsets_for_agg[temp_index].offset = + data->displs_per_process[i][j]; + temp_index++; + } + } + } + + /* Sort the displacements for each aggregator */ + local_heap_sort (file_offsets_for_agg, entries_per_aggregator, + sorted_file_offsets); + + /* create contiguous memory displacements based on blocklens + ** on the same displs array and map it to this aggregator's actual + ** file-displacements */ + memory_displacements[sorted_file_offsets[0]] = 0; + for (i = 1; i < entries_per_aggregator; i++){ + memory_displacements[sorted_file_offsets[i]] = + memory_displacements[sorted_file_offsets[i-1]] + + file_offsets_for_agg[sorted_file_offsets[i-1]].length; + } + + temp_disp_index = (int *)calloc (1, data->procs_per_group * sizeof (int)); + if (NULL == temp_disp_index) { + opal_output (1, "OUT OF MEMORY\n"); + return OMPI_ERR_OUT_OF_RESOURCE; + } + + /* Now update the displacements array with memory offsets */ + for (i = 0; i < entries_per_aggregator;i++) { + temp_pindex = file_offsets_for_agg[sorted_file_offsets[i]].process_id; + data->displs_per_process[temp_pindex][temp_disp_index[temp_pindex]] = + memory_displacements[sorted_file_offsets[i]]; + if (temp_disp_index[temp_pindex] < data->disp_index[temp_pindex]) { + temp_disp_index[temp_pindex] += 1; } else { - fh->f_io_array[i].length = io_array[array_pos].length - pos; + printf("temp_disp_index[%d]: %d is greater than disp_index[%d]: %d\n", + temp_pindex, temp_disp_index[temp_pindex], + temp_pindex, data->disp_index[temp_pindex]); } + } - pos += fh->f_io_array[i].length; - bytes_written += fh->f_io_array[i].length; - bytes_to_write-= fh->f_io_array[i].length; - i++; + free(temp_disp_index); - if ( pos == (int)io_array[array_pos].length ) { - pos = 0; - if ((array_pos + 1) < num_entries) { - array_pos++; - } - else { - break; +#if DEBUG_ON + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1, rank); + for (i = 0; i < data->procs_per_group; i++){ + for(j = 0; j < data->disp_index[i]; j++){ + if (data->blocklen_per_process[i][j] > 0){ + printf("%d communicate blocklen[%d]: %d, disp[%d]: %ld to %d\n", + data->procs_in_group[i],j, + data->blocklen_per_process[i][j],j, + data->displs_per_process[i][j], + rank); } } } - - fh->f_num_of_io_entries = i; - *ret_array_pos = array_pos; - *ret_pos = pos; - return bytes_written; + printf("************Cycle: %d, Aggregator: %d ***************\n", + index+1, rank); + for (i = 0; i < entries_per_aggregator;i++){ + printf("%d: OFFSET: %lld LENGTH: %ld, Mem-offset: %ld\n", + file_offsets_for_agg[sorted_file_offsets[i]].process_id, + file_offsets_for_agg[sorted_file_offsets[i]].offset, + file_offsets_for_agg[sorted_file_offsets[i]].length, + memory_displacements[sorted_file_offsets[i]]); + } +#endif + + return OMPI_SUCCESS; +} + +void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_entries, + char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_offsets, MPI_Aint *memory_displacements, int rank) +{ + int i; + int num_entries; + + /* First entry for every aggregator */ + io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_offsets[0]].offset; + io_array[0].length = file_offsets_for_agg[sorted_offsets[0]].length; + io_array[0].memory_address = global_buf + memory_displacements[sorted_offsets[0]]; + num_entries = 1; + + /* If the entries are contiguous merge them, else add a new entry */ + for (i = 1; i < max_io_entries; i++) { + if (file_offsets_for_agg[sorted_offsets[i-1]].offset + + file_offsets_for_agg[sorted_offsets[i-1]].length == + file_offsets_for_agg[sorted_offsets[i]].offset) { + io_array[num_entries - 1].length += file_offsets_for_agg[sorted_offsets[i]].length; + } else { + io_array[num_entries].offset = (IOVBASE_TYPE *)(intptr_t)file_offsets_for_agg[sorted_offsets[i]].offset; + io_array[num_entries].length = file_offsets_for_agg[sorted_offsets[i]].length; + io_array[num_entries].memory_address = global_buf + memory_displacements[sorted_offsets[i]]; + num_entries++; + } + } + + *num_io_entries = num_entries; +#if DEBUG_ON + printf("*************************** %d\n", num_entries); + for (i = 0; i < num_entries; i++) { + printf(" AGGREGATOR %d ADDRESS: %p OFFSET: %ld LENGTH: %ld\n", + rank, io_array[i].memory_address, + (ptrdiff_t)io_array[i].offset, + io_array[i].length); + } +#endif } - static int local_heap_sort (mca_io_ompio_local_io_array *io_array, - int num_entries, - int *sorted) + int num_entries, int *sorted) { int i = 0; int j = 0; @@ -1864,5 +1597,3 @@ static int local_heap_sort (mca_io_ompio_local_io_array *io_array, } return OMPI_SUCCESS; } - - diff --git a/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h b/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h new file mode 100644 index 00000000000..76402297044 --- /dev/null +++ b/ompi/mca/fcoll/vulcan/fcoll_vulcan_internal.h @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef MCA_FCOLL_VULCAN_INTERNAL_H +#define MCA_FCOLL_VULCAN_INTERNAL_H + +#include "ompi_config.h" + + +BEGIN_C_DECLS +/* Used for loading file-offsets per aggregator*/ +typedef struct mca_io_ompio_local_io_array{ + OMPI_MPI_OFFSET_TYPE offset; + MPI_Aint length; + int process_id; +}mca_io_ompio_local_io_array; + +typedef struct mca_io_ompio_aggregator_data { + int *disp_index, *sorted, n; + size_t *fview_count; + int *max_disp_index; + int **blocklen_per_process; + MPI_Aint **displs_per_process, total_bytes, bytes_per_cycle, total_bytes_written; + MPI_Comm comm; + char *global_buf, *prev_global_buf; + ompi_datatype_t **recvtype, **prev_recvtype; + struct iovec *global_iov_array; + int current_index, current_position; + int bytes_to_write_in_cycle, bytes_remaining, procs_per_group; + int *procs_in_group, iov_index; + size_t bytes_sent, prev_bytes_sent; + struct iovec *decoded_iov; + int bytes_to_write, prev_bytes_to_write; + mca_common_ompio_io_array_t *io_array, *prev_io_array; + int num_io_entries, prev_num_io_entries; +} mca_io_ompio_aggregator_data; + + +#define SWAP_REQUESTS(_r1,_r2) { \ + ompi_request_t **_t=_r1; \ + _r1=_r2; \ + _r2=_t;} + +#define SWAP_AGGR_POINTERS(_aggr,_num) { \ + int _i; \ + char *_t; \ + for (_i=0; _i<_num; _i++ ) { \ + _aggr[_i]->prev_io_array=_aggr[_i]->io_array; \ + _aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \ + _aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \ + _aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \ + _t=_aggr[_i]->prev_global_buf; \ + _aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \ + _aggr[_i]->global_buf=_t; \ + _t=(char *)_aggr[_i]->recvtype; \ + _aggr[_i]->recvtype=_aggr[_i]->prev_recvtype; \ + _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \ +} + +int mca_fcoll_vulcan_break_file_view (struct iovec *decoded_iov, int iov_count, + struct iovec *local_iov_array, int local_count, + struct iovec ***broken_decoded_iovs, int **broken_iov_counts, + struct iovec ***broken_iov_arrays, int **broken_counts, + MPI_Aint **broken_total_lengths, + int stripe_count, size_t stripe_size); + +int mca_fcoll_vulcan_get_configuration (ompio_file_t *fh, int num_io_procs, + size_t max_data); + +int mca_fcoll_vulcan_minmax (ompio_file_t *fh, struct iovec *iov, int iov_count, + int num_aggregators, long *new_stripe_size); + +void mca_fcoll_vulcan_calc_blocklen_disps (mca_io_ompio_aggregator_data *data, int aggregator, + int rank, size_t *bytes_comm); + +int mca_fcoll_vulcan_calc_file_offsets(mca_io_ompio_aggregator_data *data, + mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_file_offsets, MPI_Aint *memory_displacements, + int entries_per_aggregator, int rank, int index); + +void mca_fcoll_vulcan_calc_io_array(mca_common_ompio_io_array_t *io_array, int *num_io_entries, int max_io_arrays, + char *global_buf, mca_io_ompio_local_io_array *file_offsets_for_agg, + int *sorted_offsets, MPI_Aint *memory_displacements, int rank); + +END_C_DECLS + +#endif /* MCA_FCOLL_VULCAN_INTERNAL_H */