diff --git a/ompi/mca/part/base/part_base_frame.c b/ompi/mca/part/base/part_base_frame.c index f9da3548456..07b5ed10705 100644 --- a/ompi/mca/part/base/part_base_frame.c +++ b/ompi/mca/part/base/part_base_frame.c @@ -138,6 +138,7 @@ static int mca_part_base_open(mca_base_open_flag_t flags) mca_part_base_selected_component.partm_finalize = NULL; /* Currently this uses a default with no selection criteria as there is only 1 module. */ + opal_pointer_array_add(&mca_part_base_part, strdup("direct")); opal_pointer_array_add(&mca_part_base_part, strdup("persist")); return OMPI_SUCCESS; diff --git a/ompi/mca/part/base/part_base_select.c b/ompi/mca/part/base/part_base_select.c index 71525aaec8a..01caf206b6c 100644 --- a/ompi/mca/part/base/part_base_select.c +++ b/ompi/mca/part/base/part_base_select.c @@ -129,6 +129,7 @@ int mca_part_base_select(bool enable_progress_threads, continue; } + opal_output_verbose( 10, ompi_part_base_framework.framework_output, "select: init returned priority %d", priority ); if (priority > best_priority) { @@ -216,7 +217,7 @@ int mca_part_base_select(bool enable_progress_threads, /* This base function closes, unloads, and removes from the available list all unselected components. The available list will contain only the selected component. */ - + mca_base_components_close(ompi_part_base_framework.framework_output, &ompi_part_base_framework.framework_components, (mca_base_component_t *) best_component); @@ -227,7 +228,7 @@ int mca_part_base_select(bool enable_progress_threads, } /* All done */ - + return OMPI_SUCCESS; } diff --git a/ompi/mca/part/direct/Makefile.am b/ompi/mca/part/direct/Makefile.am new file mode 100644 index 00000000000..1db43df3bdf --- /dev/null +++ b/ompi/mca/part/direct/Makefile.am @@ -0,0 +1,53 @@ +# +# Copyright (c) 2004-2006 The Regents of the University of California. +# All rights reserved. +# Copyright (c) 2009 High Performance Computing Center Stuttgart, +# University of Stuttgart. All rights reserved. +# Copyright (c) 2010 Cisco Systems, Inc. All rights reserved. +# Copyright (c) 2017 IBM Corporation. All rights reserved. +# Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. +# $COPYRIGHT$ +# +# Additional copyrights may follow +# +# $HEADER$ +# + +# Make the output library in this directory, and name it either +# mca__.la (for DSO builds) or libmca__.la +# (for static builds). + +EXTRA_DIST = post_configure.sh + +if MCA_BUILD_ompi_part_direct_DSO +component_noinst = +component_install = mca_part_direct.la +else +component_noinst = libmca_part_direct.la +component_install = +endif + +local_sources = \ + part_direct.c \ + part_direct.h \ + part_direct_component.c \ + part_direct_component.h \ + part_direct_recvreq.h \ + part_direct_recvreq.c \ + part_direct_request.h \ + part_direct_request.c \ + part_direct_sendreq.h \ + part_direct_sendreq.c + +mcacomponentdir = $(ompilibdir) +mcacomponent_LTLIBRARIES = $(component_install) +mca_part_direct_la_SOURCES = $(local_sources) +mca_part_direct_la_LIBADD = $(top_builddir)/ompi/lib@OMPI_LIBMPI_NAME@.la \ + $(part_direct_LIBS) +mca_part_direct_la_LDFLAGS = -module -avoid-version $(part_direct_LDFLAGS) + +noinst_LTLIBRARIES = $(component_noinst) +libmca_part_direct_la_SOURCES = $(local_sources) +libmca_part_direct_la_LIBADD = $(part_direct_LIBS) +libmca_part_direct_la_LDFLAGS = -module -avoid-version $(part_direct_LDFLAGS) + diff --git a/ompi/mca/part/direct/part_direct.c b/ompi/mca/part/direct/part_direct.c new file mode 100644 index 00000000000..726c0d3839f --- /dev/null +++ b/ompi/mca/part/direct/part_direct.c @@ -0,0 +1,47 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2011-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/communicator/communicator.h" +#include "ompi/mca/part/base/part_base_prequest.h" +#include "ompi/mca/part/base/base.h" + +#include "ompi/mca/part/direct/part_direct.h" +#include "ompi/mca/part/direct/part_direct_sendreq.h" +#include "ompi/mca/part/direct/part_direct_recvreq.h" + +ompi_part_direct_t ompi_part_direct = { + .super = { + .part_progress = mca_part_direct_progress, + .part_precv_init = mca_part_direct_precv_init, + .part_psend_init = mca_part_direct_psend_init, + .part_start = mca_part_direct_start, + .part_pready = mca_part_direct_pready, + .part_parrived = mca_part_direct_parrived, + } +}; + + +OBJ_CLASS_INSTANCE(mca_part_direct_list_t, + opal_list_item_t, + NULL, + NULL); + diff --git a/ompi/mca/part/direct/part_direct.h b/ompi/mca/part/direct/part_direct.h new file mode 100644 index 00000000000..36150d55634 --- /dev/null +++ b/ompi/mca/part/direct/part_direct.h @@ -0,0 +1,498 @@ +/* + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2019-2021 The University of Tennessee at Chattanooga and The University + * of Tennessee Research Foundation. All rights reserved. + * Copyright (c) 2019-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2021 University of Alabama at Birmingham. All rights reserved. + * Copyright (c) 2021 Tennessee Technological University. All rights reserved. + * Copyright (c) 2021 Cisco Systems, Inc. All rights reserved + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_DIRECT_H +#define PART_DIRECT_H + +#ifdef HAVE_ALLOCA_H +#include +#endif + +#include + +#include "ompi_config.h" +#include "ompi/request/request.h" +#include "ompi/mca/part/part.h" +#include "ompi/mca/part/base/base.h" +#include "ompi/datatype/ompi_datatype.h" +#include "ompi/communicator/communicator.h" +#include "ompi/request/request.h" +#include "opal/sys/atomic.h" + +#include "ompi/mca/part/direct/part_direct_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" +#include "ompi/mca/part/direct/part_direct_recvreq.h" +#include "ompi/mca/part/direct/part_direct_sendreq.h" +#include "ompi/message/message.h" +#include "ompi/mca/pml/pml.h" +BEGIN_C_DECLS + +typedef struct mca_part_direct_list_t { + opal_list_item_t super; + mca_part_direct_request_t *item; +} mca_part_direct_list_t; + +OPAL_DECLSPEC OBJ_CLASS_DECLARATION(mca_part_direct_list_t); + + +struct ompi_part_direct_t { + mca_part_base_module_t super; + int free_list_num; + int free_list_max; + int free_list_inc; + opal_list_t *progress_list; + + opal_atomic_int32_t block_entry; + opal_mutex_t lock; +}; +typedef struct ompi_part_direct_t ompi_part_direct_t; +extern ompi_part_direct_t ompi_part_direct; + + +/** + * This is a helper function that frees a request. This requires ompi_part_direct.lock be held before calling. + */ +__opal_attribute_always_inline__ static inline int +mca_part_direct_free_req(struct mca_part_direct_request_t* req) +{ + int err = OMPI_SUCCESS; + size_t i; + opal_list_remove_item(ompi_part_direct.progress_list, (opal_list_item_t*)req->progress_elem); + OBJ_RELEASE(req->progress_elem); + + MPI_Win_free(&req->window); + MPI_Win_free(&req->window_flags); + MPI_Comm_free(&req->comm); + free(req->flags); + + if( MCA_PART_DIRECT_REQUEST_PRECV == req->req_type ) { + MCA_PART_DIRECT_PRECV_REQUEST_RETURN(req); + } else { + MCA_PART_DIRECT_PSEND_REQUEST_RETURN(req); + } + return err; +} + + +__opal_attribute_always_inline__ static inline void mca_part_direct_init_lists(void) +{ + opal_free_list_init (&mca_part_base_precv_requests, + sizeof(mca_part_direct_precv_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_direct_precv_request_t), + 0,opal_cache_line_size, + ompi_part_direct.free_list_num, + ompi_part_direct.free_list_max, + ompi_part_direct.free_list_inc, + NULL, 0, NULL, NULL, NULL); + opal_free_list_init (&mca_part_base_psend_requests, + sizeof(mca_part_direct_psend_request_t), + opal_cache_line_size, + OBJ_CLASS(mca_part_direct_psend_request_t), + 0,opal_cache_line_size, + ompi_part_direct.free_list_num, + ompi_part_direct.free_list_max, + ompi_part_direct.free_list_inc, + NULL, 0, NULL, NULL, NULL); + ompi_part_direct.progress_list = OBJ_NEW(opal_list_t); +} + +__opal_attribute_always_inline__ static inline void +mca_part_direct_complete(struct mca_part_direct_request_t* request) +{ + if(MCA_PART_DIRECT_REQUEST_PRECV == request->req_type) { + request->req_ompi.req_status.MPI_SOURCE = request->req_peer; + } else { + request->req_ompi.req_status.MPI_SOURCE = request->req_comm->c_my_rank; + } + request->req_ompi.req_complete_cb = NULL; + request->req_ompi.req_status.MPI_TAG = request->req_tag; + request->req_ompi.req_status._ucount = request->req_bytes; + request->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + request->req_part_complete = true; + ompi_request_complete(&(request->req_ompi), true ); +} + +/** + * mca_part_direct_progress is the progress function that will be registered. It handles + * both send and recv request testing and completion. It also handles freeing requests, + * after MPI_Free is called and the requests have become inactive. + */ +__opal_attribute_always_inline__ static inline int +mca_part_direct_progress(void) +{ + mca_part_direct_list_t *current; + int err; + size_t i; + + /* prevent re-entry, */ + int block_entry = opal_atomic_add_fetch_32(&(ompi_part_direct.block_entry), 1); + if(1 < block_entry) + { + block_entry = opal_atomic_add_fetch_32(&(ompi_part_direct.block_entry), -1); + return OMPI_SUCCESS; + } + + OPAL_THREAD_LOCK(&ompi_part_direct.lock); + + mca_part_direct_request_t* to_delete = NULL; + + OPAL_LIST_FOREACH(current, ompi_part_direct.progress_list, mca_part_direct_list_t) { + mca_part_direct_request_t *req = (mca_part_direct_request_t *) current->item; + if(MCA_PART_DIRECT_REQUEST_PSEND == req->req_type) + { + if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state && req->round == req->tround) { + for(i = 0; i < req->parts; i++) { + /* Check to see if partition is queued for being started. Only applicable to sends. */ + if(-2 == req->flags[i]) { + err = MPI_Put(req->buf + i*req->part_bytes, req->part_bytes, MPI_CHAR, 1, + i*req->part_bytes, req->part_bytes, MPI_CHAR, req->window); + assert(MPI_SUCCESS == err); + + req->flags[i] = 0; + req->done_count++; + } + } + + /* Check for completion and complete the requests */ + if(req->done_count == req->parts) + { + /* Incriment round on reciever */ + MPI_Win_flush(1,req->window); + MPI_Put(&req->round, 1, MPI_INT, 1, 0, 1, MPI_INT, req->window_flags); + MPI_Win_flush(1,req->window_flags); + + mca_part_direct_complete(req); + } + } + } else { + if(false == req->req_part_complete && REQUEST_COMPLETED != req->req_ompi.req_complete && OMPI_REQUEST_ACTIVE == req->req_ompi.req_state) { + if(req->round == req->tround) { + mca_part_direct_complete(req); + } + } + } + + if(true == req->req_free_called && true == req->req_part_complete && REQUEST_COMPLETED == req->req_ompi.req_complete && OMPI_REQUEST_INACTIVE == req->req_ompi.req_state) { + to_delete = req; + } + } + OPAL_THREAD_UNLOCK(&ompi_part_direct.lock); + block_entry = opal_atomic_add_fetch_32(&(ompi_part_direct.block_entry), -1); + if(to_delete) { + err = mca_part_direct_free_req(to_delete); + if (OMPI_SUCCESS != err) { + return OMPI_ERROR; + } + } + + return OMPI_SUCCESS; +} + +__opal_attribute_always_inline__ static inline void +mca_part_direct_create_partition_communicator(MPI_Comm comm, + int rank_count, + const int ranks[], + MPI_Comm* new_comm) +{ + int err = MPI_SUCCESS; + MPI_Group group_super, group_sub; + + err = MPI_Comm_group(comm, &group_super); + assert(MPI_SUCCESS == err); + + err = MPI_Group_incl(group_super, rank_count, ranks, &group_sub); + assert(MPI_SUCCESS == err); + + err = MPI_Comm_create_group(comm, group_sub, 0, new_comm); + assert(MPI_SUCCESS == err); +} + + +__opal_attribute_always_inline__ static inline int +mca_part_direct_precv_init(void *buf, + size_t parts, + size_t count, + ompi_datatype_t * datatype, + int src, + int tag, + struct ompi_communicator_t *comm, + struct ompi_info_t * info, + struct ompi_request_t **request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_direct_list_t* new_progress_elem = NULL; + + mca_part_direct_precv_request_t *recvreq; + + + /* Allocate a new request */ + MCA_PART_DIRECT_PRECV_REQUEST_ALLOC(recvreq); + if (OPAL_UNLIKELY(NULL == recvreq)) return OMPI_ERR_OUT_OF_RESOURCE; + + MCA_PART_DIRECT_PRECV_REQUEST_INIT(recvreq, ompi_proc, comm, tag, src, + datatype, buf, parts, count, flags); + + mca_part_direct_request_t *req = (mca_part_direct_request_t *) recvreq; + + /* Set lazy initializion flags */ + req->flags = NULL; + /* Non-blocking recive on setup info */ + + /* Compute total number of bytes */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + + + req->round = 0; + req->tround = 0; + + int rank_super; + err = MPI_Comm_rank(comm, &rank_super); + int rank_count = 2; + int ranks[rank_count]; + ranks[0] = src; + ranks[1] = rank_super; + mca_part_direct_create_partition_communicator(comm, rank_count, ranks, &req->comm); + + err = MPI_Win_create(buf, + parts * count * dt_size, + 1, + MPI_INFO_NULL, + req->comm, + &req->window); + assert(MPI_SUCCESS == err); + + err = MPI_Win_lock_all(1, req->window); fflush(stdout); + assert(MPI_SUCCESS == err); + + err = MPI_Win_create(&req->tround, + 1, + sizeof(int32_t), + MPI_INFO_NULL, + req->comm, + &req->window_flags); + assert(MPI_SUCCESS == err); + + err = MPI_Win_lock_all(1, req->window_flags); + assert(MPI_SUCCESS == err); + + + /* Set ompi request initial values */ + req->req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_type = OMPI_REQUEST_PART; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* Add element to progress engine */ + new_progress_elem = OBJ_NEW(mca_part_direct_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_direct.lock); + opal_list_append(ompi_part_direct.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_direct.lock); + + /* set return values */ + *request = (ompi_request_t*) recvreq; + return err; +} + + +__opal_attribute_always_inline__ static inline int +mca_part_direct_psend_init(const void* buf, + size_t parts, + size_t count, + ompi_datatype_t* datatype, + int dst, + int tag, + ompi_communicator_t* comm, + struct ompi_info_t * info, + ompi_request_t** request) +{ + int err = OMPI_SUCCESS; + size_t dt_size_; + int dt_size; + mca_part_direct_list_t* new_progress_elem = NULL; + mca_part_direct_psend_request_t *sendreq; + + /* Create new request object */ + MCA_PART_DIRECT_PSEND_REQUEST_ALLOC(sendreq, comm, dst, ompi_proc); + if (OPAL_UNLIKELY(NULL == sendreq)) return OMPI_ERR_OUT_OF_RESOURCE; + MCA_PART_DIRECT_PSEND_REQUEST_INIT(sendreq, ompi_proc, comm, tag, dst, + datatype, buf, parts, count, flags); + mca_part_direct_request_t *req = (mca_part_direct_request_t *) sendreq; + + + /* Determine total bytes to send. */ + err = opal_datatype_type_size(&(req->req_datatype->super), &dt_size_); + if(OMPI_SUCCESS != err) return OMPI_ERROR; + dt_size = (dt_size_ > (size_t) INT_MAX) ? MPI_UNDEFINED : (int) dt_size_; + req->req_bytes = parts * count * dt_size; + req->part_bytes = count * dt_size; + req->datatype = datatype; + + req->parts = parts; + req->count = count; + req->buf = (uint8_t*)buf; + + req->flags = (int*) calloc(req->parts, sizeof(int)); + + req->round = 0; + req->tround = 0; + + + int rank_super; + err = MPI_Comm_rank(comm, &rank_super); + int rank_count = 2; + int ranks[rank_count]; + ranks[0] = rank_super; + ranks[1] = dst; + mca_part_direct_create_partition_communicator(comm, rank_count, ranks, &req->comm); + + err = MPI_Win_create((void*)buf, + 0, + 1, + MPI_INFO_NULL, + req->comm, + &req->window); + assert(MPI_SUCCESS == err); + + err = MPI_Win_lock_all(1, req->window); fflush(stdout); + assert(MPI_SUCCESS == err); + + err = MPI_Win_create(&req->tround, + 1, + sizeof(int32_t), + MPI_INFO_NULL, + req->comm, + &req->window_flags); + assert(MPI_SUCCESS == err); + + err = MPI_Win_lock_all(1, req->window_flags); + assert(MPI_SUCCESS == err); + + /* Initilaize completion variables */ + sendreq->req_base.req_ompi.req_persistent = true; + req->req_part_complete = true; + req->req_ompi.req_type = OMPI_REQUEST_PART; + req->req_ompi.req_complete = REQUEST_COMPLETED; + req->req_ompi.req_state = OMPI_REQUEST_INACTIVE; + + /* add element to progress queue */ + new_progress_elem = OBJ_NEW(mca_part_direct_list_t); + new_progress_elem->item = req; + req->progress_elem = new_progress_elem; + OPAL_THREAD_LOCK(&ompi_part_direct.lock); + opal_list_append(ompi_part_direct.progress_list, (opal_list_item_t*)new_progress_elem); + OPAL_THREAD_UNLOCK(&ompi_part_direct.lock); + + /* Set return values */ + *request = (ompi_request_t*) sendreq; + return err; +} + +__opal_attribute_always_inline__ static inline int +mca_part_direct_start(size_t count, ompi_request_t** requests) +{ + int err = OMPI_SUCCESS; + size_t _count = count; + size_t i; + + for(i = 0; i < _count && OMPI_SUCCESS == err; i++) { + mca_part_direct_request_t *req = (mca_part_direct_request_t *)(requests[i]); + req->round++; + if(MCA_PART_DIRECT_REQUEST_PSEND == req->req_type) { + req->done_count = 0; + for(i = 0; i < req->parts && OMPI_SUCCESS == err; i++) { + req->flags[i] = -1; + } + } else { + req->done_count = 0; + /* Increment round on sender */ + MPI_Put(&req->round, 1, MPI_INT, 0, 0, 1, MPI_INT, req->window_flags); + MPI_Win_flush(0,req->window_flags); + } + req->req_ompi.req_state = OMPI_REQUEST_ACTIVE; + req->req_ompi.req_status.MPI_TAG = MPI_ANY_TAG; + req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS; + req->req_ompi.req_status._cancelled = 0; + req->req_part_complete = false; + req->req_ompi.req_complete = false; + OPAL_ATOMIC_SWAP_PTR(&req->req_ompi.req_complete, REQUEST_PENDING); + } + + return err; +} + +__opal_attribute_always_inline__ static inline int +mca_part_direct_pready(size_t min_part, + size_t max_part, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + size_t i; + + mca_part_direct_request_t *req = (mca_part_direct_request_t *)(request); + for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) { + req->flags[i] = -2; /* Mark partition as queued */ + } + return err; +} + +__opal_attribute_always_inline__ static inline int +mca_part_direct_parrived(size_t min_part, + size_t max_part, + int* flag, + ompi_request_t* request) +{ + int err = OMPI_SUCCESS; + mca_part_direct_request_t *req = (mca_part_direct_request_t *)request; + + *flag = (req->round == req->tround); /* Rationale: RMA is performant implementation for n->1 partitions, and this is an opt-in performance version, we implement all partitioned communications as n->1 for this module. */ + return err; +} + + +/** + * mca_part_direct_free marks an entry as free called and sets the request to + * MPI_REQUEST_NULL. Note: requests get freed in the progress engine. + */ +__opal_attribute_always_inline__ static inline int +mca_part_direct_free(ompi_request_t** request) +{ + mca_part_direct_request_t* req = *(mca_part_direct_request_t**)request; + + if(true == req->req_free_called) return OMPI_ERROR; + req->req_free_called = true; + + *request = MPI_REQUEST_NULL; + return OMPI_SUCCESS; +} + +END_C_DECLS + +#endif /* PART_DIRECT_H_HAS_BEEN_INCLUDED */ diff --git a/ompi/mca/part/direct/part_direct_component.c b/ompi/mca/part/direct/part_direct_component.c new file mode 100644 index 00000000000..f0aa0076142 --- /dev/null +++ b/ompi/mca/part/direct/part_direct_component.c @@ -0,0 +1,134 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2006-2007 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2010-2012 Cisco Systems, Inc. All rights reserved. + * Copyright (c) 2013-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/direct/part_direct.h" + +#include "ompi/mca/part/direct/part_direct_sendreq.h" +#include "ompi/mca/part/direct/part_direct_recvreq.h" +#include "ompi/mca/part/direct/part_direct_component.h" + +static int mca_part_direct_component_register(void); +static int mca_part_direct_component_open(void); +static int mca_part_direct_component_close(void); +static mca_part_base_module_t* mca_part_direct_component_init( int* priority, + bool enable_progress_threads, bool enable_mpi_threads); +static int mca_part_direct_component_fini(void); + +mca_part_base_component_4_0_0_t mca_part_direct_component = { + /* First, the mca_base_component_t struct containing meta + * information about the component itself */ + + .partm_version = { + MCA_PART_BASE_VERSION_2_0_0, + + .mca_component_name = "direct", + MCA_BASE_MAKE_VERSION(component, OMPI_MAJOR_VERSION, OMPI_MINOR_VERSION, + OMPI_RELEASE_VERSION), + .mca_open_component = mca_part_direct_component_open, + .mca_close_component = mca_part_direct_component_close, + .mca_register_component_params = mca_part_direct_component_register, + }, + .partm_data = { + /* This component is not checkpoint ready */ + MCA_BASE_METADATA_PARAM_NONE + }, + + .partm_init = mca_part_direct_component_init, + .partm_finalize = mca_part_direct_component_fini, +}; + +static int +mca_part_direct_component_register(void) +{ + ompi_part_direct.free_list_num = 4; + (void) mca_base_component_var_register(&mca_part_direct_component.partm_version, "free_list_num", + "Initial size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_direct.free_list_num); + + ompi_part_direct.free_list_max = -1; + (void) mca_base_component_var_register(&mca_part_direct_component.partm_version, "free_list_max", + "Maximum size of request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_direct.free_list_max); + + ompi_part_direct.free_list_inc = 64; + (void) mca_base_component_var_register(&mca_part_direct_component.partm_version, "free_list_inc", + "Number of elements to add when growing request free lists", + MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, + OPAL_INFO_LVL_9, + MCA_BASE_VAR_SCOPE_READONLY, + &ompi_part_direct.free_list_inc); + + + return OPAL_SUCCESS; +} + +static int +mca_part_direct_component_open(void) +{ + OBJ_CONSTRUCT(&ompi_part_direct.lock, opal_mutex_t); + + //fprintf(stderr, "Open the direct component?\n"); + + mca_part_direct_init_lists(); + + ompi_part_direct.block_entry = 0; + return OMPI_SUCCESS; +} + + +static int +mca_part_direct_component_close(void) +{ + OBJ_DESTRUCT(&ompi_part_direct.lock); + return OMPI_SUCCESS; +} + + +static mca_part_base_module_t* +mca_part_direct_component_init(int* priority, + bool enable_progress_threads, + bool enable_mpi_threads) +{ + *priority = 1; + + //fprintf(stderr, "Init the direct component?\n"); + + opal_output_verbose( 10, 0, + "in direct part priority is %d\n", *priority); + + return &ompi_part_direct.super; +} + + +static int +mca_part_direct_component_fini(void) +{ + return OMPI_SUCCESS; +} + diff --git a/ompi/mca/part/direct/part_direct_component.h b/ompi/mca/part/direct/part_direct_component.h new file mode 100644 index 00000000000..b6598fcd3dc --- /dev/null +++ b/ompi/mca/part/direct/part_direct_component.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ +/** + * @file + */ + +#ifndef MCA_PART_DIRECT_COMPONENT_H +#define MCA_PART_DIRECT_COMPONENT_H + +BEGIN_C_DECLS + +/* + * PART module functions. + */ +OMPI_DECLSPEC extern mca_part_base_component_4_0_0_t mca_part_direct_component; + +END_C_DECLS + +#endif diff --git a/ompi/mca/part/direct/part_direct_recvreq.c b/ompi/mca/part/direct/part_direct_recvreq.c new file mode 100644 index 00000000000..a3e97ad5778 --- /dev/null +++ b/ompi/mca/part/direct/part_direct_recvreq.c @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/direct/part_direct.h" +#include "ompi/mca/part/direct/part_direct_recvreq.h" + + +static void +mca_part_direct_precv_request_construct(mca_part_direct_precv_request_t* recvreq) +{ + recvreq->req_base.req_ompi.req_start = mca_part_direct_start; + recvreq->req_base.req_ompi.req_free = mca_part_direct_free; + recvreq->req_base.req_ompi.req_cancel = NULL; + recvreq->req_base.req_ompi.req_persistent = true; + OBJ_CONSTRUCT( &(recvreq->req_base.req_convertor), opal_convertor_t ); +} + + +OBJ_CLASS_INSTANCE(mca_part_direct_precv_request_t, + mca_part_direct_request_t, + mca_part_direct_precv_request_construct, + NULL); + diff --git a/ompi/mca/part/direct/part_direct_recvreq.h b/ompi/mca/part/direct/part_direct_recvreq.h new file mode 100644 index 00000000000..9736ccf422e --- /dev/null +++ b/ompi/mca/part/direct/part_direct_recvreq.h @@ -0,0 +1,103 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2012-2021 Sandia National Laboratories. All rights reserved. + * Copyright (c) 2015 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_DIRECT_RECVREQ_H +#define PART_DIRECT_RECVREQ_H + +#include "ompi/mca/part/direct/part_direct_request.h" +#include "ompi/mca/part/base/part_base_precvreq.h" + +struct mca_part_direct_precv_request_t { + mca_part_direct_request_t req_base; +}; +typedef struct mca_part_direct_precv_request_t mca_part_direct_precv_request_t; +OBJ_CLASS_DECLARATION(mca_part_direct_precv_request_t); + +/** + * Allocate a recv request from the modules free list. + * + * @param rc (OUT) OMPI_SUCCESS or error status on failure. + * @return Receive request. + */ +#define MCA_PART_DIRECT_PRECV_REQUEST_ALLOC(precvreq) \ +do { \ + precvreq = (mca_part_direct_precv_request_t*) \ + opal_free_list_get (&mca_part_base_precv_requests); \ + precvreq->req_base.req_type = MCA_PART_DIRECT_REQUEST_PRECV; \ + } while (0) + +/** + * Initialize a receive request with call parameters. + * + * @param request (IN) Receive request. + * @param addr (IN) User buffer. + * @param count (IN) Number of elements of indicated datatype. + * @param datatype (IN) User defined datatype. + * @param src (IN) Source rank w/in the communicator. + * @param comm (IN) Communicator. + * @param directent (IN) Is this a ersistent request. + */ +#define MCA_PART_DIRECT_PRECV_REQUEST_INIT( request, \ + ompi_proc, \ + comm, \ + tag, \ + src, \ + datatype, \ + addr, \ + parts, \ + count, \ + flags ) \ +do { \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (request)->req_base.req_comm = comm; \ + (request)->req_base.req_datatype = datatype; \ + (request)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (request)->req_base.req_ompi.req_status.MPI_SOURCE = src; \ + (request)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (request)->req_base.req_part_complete = true; \ + (request)->req_base.req_ompi.req_status._ucount = count; \ + (request)->req_base.req_free_called = false; \ + (request)->req_base.req_addr = addr; /**< pointer to application buffer */\ + (request)->req_base.req_parts = parts; /**< number of partitions */\ + (request)->req_base.req_count = count; /**< count of user datatype elements */\ + (request)->req_base.req_peer = src; /**< peer process - rank w/in this communicator */\ + (request)->req_base.req_tag = tag; \ +} while(0) + +/** + * Free the PART receive request + */ +#define MCA_PART_DIRECT_PRECV_REQUEST_RETURN(recvreq) \ +{ \ + OBJ_RELEASE((recvreq)->req_comm); \ + OMPI_DATATYPE_RELEASE((recvreq)->req_datatype); \ + OMPI_REQUEST_FINI(&(recvreq)->req_ompi); \ + opal_convertor_cleanup( &((recvreq)->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_precv_requests, \ + (opal_free_list_item_t*)(recvreq)); \ +} + +#endif + +; diff --git a/ompi/mca/part/direct/part_direct_request.c b/ompi/mca/part/direct/part_direct_request.c new file mode 100644 index 00000000000..63ab623d38c --- /dev/null +++ b/ompi/mca/part/direct/part_direct_request.c @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2006 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/direct/part_direct.h" +#include "ompi/mca/part/direct/part_direct_request.h" + + +static void mca_part_direct_request_construct( mca_part_direct_request_t* req) { + OBJ_CONSTRUCT(&req->req_convertor, opal_convertor_t); + req->req_ompi.req_type = OMPI_REQUEST_PART; +} + +static void mca_part_direct_request_destruct( mca_part_direct_request_t* req) { + OBJ_DESTRUCT(&req->req_convertor); +} + +OBJ_CLASS_INSTANCE(mca_part_direct_request_t, + ompi_request_t, + mca_part_direct_request_construct, + mca_part_direct_request_destruct); diff --git a/ompi/mca/part/direct/part_direct_request.h b/ompi/mca/part/direct/part_direct_request.h new file mode 100644 index 00000000000..e06d12613f9 --- /dev/null +++ b/ompi/mca/part/direct/part_direct_request.h @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2016 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_DIRECT_REQUEST_H +#define PART_DIRECT_REQUEST_H + +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/sys/atomic.h" +/** + * Type of request. + */ +typedef enum { + MCA_PART_DIRECT_REQUEST_PSEND, + MCA_PART_DIRECT_REQUEST_PRECV, + MCA_PART_DIRECT_REQUEST_NULL +} mca_part_persist_request_type_t; + +struct mca_part_direct_list_t; + +struct ompi_mca_direct_setup_t { + int world_rank; + int start_tag; + int setup_tag; + size_t num_parts; + size_t count; +}; + + +/** + * Base type for PART DIRECT requests + */ +struct mca_part_direct_request_t { + +/* START: These fields have to match the definition of the mca_part_direct_request_t */ + ompi_request_t req_ompi; /**< base request */ + volatile int32_t req_part_complete; /**< flag indicating if the pt-2-pt layer is done with this request */ + volatile int32_t req_free_called; /**< flag indicating if the user has freed this request */ + mca_part_persist_request_type_t req_type; /**< MPI request type - used for test */ + struct ompi_communicator_t *req_comm; /**< communicator pointer */ + struct ompi_datatype_t *req_datatype; /**< pointer to data type */ + opal_convertor_t req_convertor; /**< always need the convertor */ + + const void *req_addr; /**< pointer to application buffer */ + size_t req_parts; /**< number of partitions */ + size_t req_count; /**< count of user datatype elements */ + int32_t req_peer; /**< peer process - rank w/in this communicator */ + int32_t req_tag; /**< user defined tag */ + struct ompi_proc_t* req_proc; /**< peer process */ + +/* END: These fields have to match the definition of the mca_part_direct_request_t */ + + size_t req_bytes; /**< bytes for completion status */ + + size_t part_bytes; /**< used for offset in put */ + + uint8_t * buf; + + size_t count; + size_t parts; + size_t part_size; + ompi_datatype_t* datatype; + + int32_t round; /**< This is a simple counter pair to match for the flag window */ + int32_t tround; + + MPI_Comm comm; /**< To limit window create to two processes, we need a per request communicator. */ + MPI_Win window; /**< RMA Window for Data Transfer */ + MPI_Win window_flags; /**< RMA Window for completion flags. And RTS Flag. */ + + int32_t req_partitions_send; /**< Send side number of partitions */ + int32_t req_partitions_recv; /**< Recv side number of partitions */ + + int32_t world_peer; /**< peer's rank in MPI_COMM_WORLD */ + + size_t done_count; /**< counter for the number of partitions marked ready */ + + int32_t *flags; /**< array of flags to determine whether a partition has arrived */ + + struct mca_part_direct_list_t* progress_elem; /**< pointer to progress list element for removal durring free. */ + +}; +typedef struct mca_part_direct_request_t mca_part_direct_request_t; +OBJ_CLASS_DECLARATION(mca_part_direct_request_t); + +#endif diff --git a/ompi/mca/part/direct/part_direct_sendreq.c b/ompi/mca/part/direct/part_direct_sendreq.c new file mode 100644 index 00000000000..5a883591368 --- /dev/null +++ b/ompi/mca/part/direct/part_direct_sendreq.c @@ -0,0 +1,39 @@ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2007 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#include "ompi_config.h" + +#include "ompi/mca/part/direct/part_direct.h" +#include "ompi/mca/part/direct/part_direct_sendreq.h" + + +static void mca_part_direct_psend_request_construct(mca_part_direct_psend_request_t* sendreq) +{ + /* no need to reinit for every send -- never changes */ + sendreq->req_base.req_ompi.req_start = mca_part_direct_start; + sendreq->req_base.req_ompi.req_free = mca_part_direct_free; + sendreq->req_base.req_ompi.req_persistent = true; + sendreq->req_base.req_ompi.req_cancel = NULL; +} + +OBJ_CLASS_INSTANCE(mca_part_direct_psend_request_t, + mca_part_direct_request_t, + mca_part_direct_psend_request_construct, + NULL); + diff --git a/ompi/mca/part/direct/part_direct_sendreq.h b/ompi/mca/part/direct/part_direct_sendreq.h new file mode 100644 index 00000000000..85e8b7ae543 --- /dev/null +++ b/ompi/mca/part/direct/part_direct_sendreq.h @@ -0,0 +1,94 @@ +/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */ +/* + * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana + * University Research and Technology + * Corporation. All rights reserved. + * Copyright (c) 2004-2013 The University of Tennessee and The University + * of Tennessee Research Foundation. All rights + * reserved. + * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart, + * University of Stuttgart. All rights reserved. + * Copyright (c) 2004-2006 The Regents of the University of California. + * All rights reserved. + * Copyright (c) 2015-2017 Los Alamos National Security, LLC. All rights + * reserved. + * Copyright (c) 2015 Research Organization for Information Science + * and Technology (RIST). All rights reserved. + * Copyright (c) 2017 Intel, Inc. All rights reserved + * Copyright (c) 2020-2021 Sandia National Laboratories. All rights reserved. + * $COPYRIGHT$ + * + * Additional copyrights may follow + * + * $HEADER$ + */ + +#ifndef PART_DIRECT_PSENDREQ_H +#define PART_DIRECT_PSENDREQ_H + +#include "ompi/mca/part/direct/part_direct_request.h" +#include "ompi/mca/part/base/part_base_psendreq.h" +#include "ompi/mca/part/part.h" +#include "opal/prefetch.h" + +struct mca_part_direct_psend_request_t { + mca_part_direct_request_t req_base; +}; +typedef struct mca_part_direct_psend_request_t mca_part_direct_psend_request_t; +OBJ_CLASS_DECLARATION(mca_part_direct_psend_request_t); + + +#define MCA_PART_DIRECT_PSEND_REQUEST_ALLOC(sendreq, comm, dst, \ + ompi_proc) \ +do { \ + sendreq = (mca_part_direct_psend_request_t*) \ + opal_free_list_wait (&mca_part_base_psend_requests); \ + sendreq->req_base.req_type = MCA_PART_DIRECT_REQUEST_PSEND; \ +} while(0) + +#define MCA_PART_DIRECT_PSEND_REQUEST_INIT( req_send, \ + ompi_proc, \ + comm, \ + tag, \ + dst, \ + datatype, \ + buf, \ + parts, \ + count, \ + flags ) \ + do { \ + OMPI_REQUEST_INIT(&(sendreq->req_base.req_ompi), \ + false); \ + OBJ_RETAIN(comm); \ + OMPI_DATATYPE_RETAIN(datatype); \ + (req_send)->req_base.req_comm = comm; \ + (req_send)->req_base.req_datatype = datatype; \ + (req_send)->req_base.req_ompi.req_mpi_object.comm = comm; \ + (req_send)->req_base.req_ompi.req_status.MPI_SOURCE = \ + comm->c_my_rank; \ + (req_send)->req_base.req_ompi.req_status.MPI_TAG = tag; \ + (req_send)->req_base.req_part_complete = true; \ + (req_send)->req_base.req_ompi.req_status._ucount = count; \ + (req_send)->req_base.req_free_called = false; \ + (req_send)->req_base.req_addr = buf; /**< pointer to application buffer */\ + (req_send)->req_base.req_parts = parts; /**< number of partitions */\ + (req_send)->req_base.req_count = count; /**< count of user datatype elements */\ + (req_send)->req_base.req_peer = dst; /**< peer process - rank w/in this communicator */\ + (req_send)->req_base.req_tag = tag; /**< user defined tag */\ + } while(0) + +/* + * Release resources associated with a request + */ +#define MCA_PART_DIRECT_PSEND_REQUEST_RETURN(sendreq) \ + { \ + /* Let the base handle the reference counts */ \ + OMPI_DATATYPE_RETAIN(sendreq->req_datatype); \ + OBJ_RELEASE(sendreq->req_comm); \ + OMPI_REQUEST_FINI(&sendreq->req_ompi); \ + opal_convertor_cleanup( &(sendreq->req_convertor) ); \ + opal_free_list_return ( &mca_part_base_psend_requests, \ + (opal_free_list_item_t*)sendreq); \ + } + +#endif diff --git a/ompi/mca/part/persist/part_persist_component.c b/ompi/mca/part/persist/part_persist_component.c index 919284476b9..824e3c9d873 100644 --- a/ompi/mca/part/persist/part_persist_component.c +++ b/ompi/mca/part/persist/part_persist_component.c @@ -122,7 +122,7 @@ mca_part_persist_component_init(int* priority, bool enable_progress_threads, bool enable_mpi_threads) { - *priority = 1; + *priority = 2; opal_output_verbose( 10, 0, "in persist part priority is %d\n", *priority); diff --git a/ompi/mpi/c/start.c b/ompi/mpi/c/start.c index 5bf202385f8..38f36b7c433 100644 --- a/ompi/mpi/c/start.c +++ b/ompi/mpi/c/start.c @@ -28,6 +28,7 @@ #include "ompi/runtime/params.h" #include "ompi/communicator/communicator.h" #include "ompi/mca/pml/pml.h" +#include "ompi/mca/part/part.h" #include "ompi/request/request.h" #include "ompi/errhandler/errhandler.h" #include "ompi/memchecker.h" @@ -77,7 +78,6 @@ int MPI_Start(MPI_Request *request) switch((*request)->req_type) { case OMPI_REQUEST_PML: case OMPI_REQUEST_COLL: - case OMPI_REQUEST_PART: if ( MPI_PARAM_CHECK && !((*request)->req_persistent && OMPI_REQUEST_INACTIVE == (*request)->req_state)) { return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_REQUEST, FUNC_NAME); @@ -87,6 +87,18 @@ int MPI_Start(MPI_Request *request) OMPI_ERRHANDLER_NOHANDLE_RETURN(ret, ret, FUNC_NAME); + + case OMPI_REQUEST_PART: + if ( MPI_PARAM_CHECK && !((*request)->req_persistent && + OMPI_REQUEST_INACTIVE == (*request)->req_state)) { + return OMPI_ERRHANDLER_NOHANDLE_INVOKE(MPI_ERR_REQUEST, FUNC_NAME); + } + + ret = mca_part.part_start(1, request); + + OMPI_ERRHANDLER_NOHANDLE_RETURN(ret, ret, FUNC_NAME); + + case OMPI_REQUEST_NOOP: /** * We deal with a MPI_PROC_NULL request. If the request is