Skip to content

Commit

Permalink
Merge pull request #630 from HDFGroup/dev
Browse files Browse the repository at this point in the history
Fix deadlock with data stager and data op
  • Loading branch information
lukemartinlogan authored Oct 21, 2023
2 parents 6450085 + 6d4f5aa commit 84aeac3
Show file tree
Hide file tree
Showing 38 changed files with 566 additions and 189 deletions.
5 changes: 5 additions & 0 deletions CMake/HermesConfig.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ if( Hermes_INCLUDE_DIR )
hermes
${Boost_LIBRARIES} ${Hermes_LIBRARY})
set(Hermes_CLIENT_LIBRARIES ${Hermes_LIBRARIES})
set(Hermes_RUNTIME_LIBRARIES
${Hermes_CLIENT_LIBRARIES}
hrun_runtime
${Boost_LIBRARIES})
set(Hermes_RUNTIME_DEPS "")
endif(Hermes_LIBRARY)

else(Hermes_INCLUDE_DIR)
Expand Down
3 changes: 2 additions & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,8 @@ if(thallium_FOUND)
endif()

# Boost
find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
# find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
find_package(Boost REQUIRED COMPONENTS fiber REQUIRED)
if (Boost_FOUND)
message(STATUS "found boost at ${Boost_INCLUDE_DIRS}")
endif()
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Hermes is a heterogeneous-aware, multi-tiered, dynamic, and distributed I/O buff

```bash
# set location of hermes_file_staging
git clone https://github.com/HDFGroup/hermes
git clone https://github.com/HDFGroup/hermes --recurse-submodules
spack repo add ${HERMES_REPO}/ci/hermes
# Master should include all stable updates
spack install hermes@master
Expand Down
2 changes: 1 addition & 1 deletion ci/hermes/packages/hermes/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Hermes(CMakePackage):
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic')
depends_on('libfabric fabrics=sockets,tcp,udp,rxm,rxd,verbs',
when='+ares')
depends_on('libfabric fabrics=verbs',
Expand Down
2 changes: 1 addition & 1 deletion ci/hermes/packages/hermes_shm/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class HermesShm(CMakePackage):
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber')
depends_on('[email protected]: +context +fiber +filesystem +system +atomic +chrono +serialization +signals +pic +regex')
depends_on('libfabric fabrics=sockets,tcp,udp,rxm,rxd,verbs',
when='+ares')
depends_on('libfabric fabrics=verbs',
Expand Down
18 changes: 18 additions & 0 deletions hrun/include/hrun/api/template/hrun_task_cc.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
extern "C" {
void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(
new TYPE_UNWRAP(TRAIT_CLASS)());
exec->Init(task->id_, state_name);
return exec;
}
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(
new TYPE_UNWRAP(TRAIT_CLASS)());
exec->Init(task->id_, state_name);
RunContext rctx(0);
exec->Run(hrun::TaskMethod::kConstruct, task, rctx);
return exec;
}
const char* get_task_lib_name(void) { return TASK_NAME; }
bool is_hrun_task_ = true;
}
8 changes: 8 additions & 0 deletions hrun/include/hrun/network/serialize.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ class BinaryInputArchive {
ss_.str(std::string((char*)param_xfer.data_, param_xfer.data_size_));
}

/** String constructor */
BinaryInputArchive(const std::string &params) : ar_(ss_) {
xfer_.resize(1);
xfer_[0].data_ = (void*)params.data();
xfer_[0].data_size_ = params.size();
ss_.str(params);
}

/** Deserialize using call */
template<typename T, typename ...Args>
BinaryInputArchive& operator()(T &var, Args &&...args) {
Expand Down
37 changes: 23 additions & 14 deletions hrun/include/hrun/task_registry/task_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,26 +98,35 @@ class TaskLibClient {
};

extern "C" {
/** The two methods provided by all tasks */
/** Allocate a state (no construction) */
typedef TaskState* (*alloc_state_t)(Task *task, const char *state_name);
/** Allocate + construct a state */
typedef TaskState* (*create_state_t)(Task *task, const char *state_name);
/** Get the name of a task */
typedef const char* (*get_task_lib_name_t)(void);
} // extern c

/** Used internally by task source file */
#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME) \
extern "C" { \
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) { \
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>( \
new TYPE_UNWRAP(TRAIT_CLASS)()); \
exec->Init(task->id_, state_name); \
RunContext rctx(0); \
exec->Run(hrun::TaskMethod::kConstruct, task, rctx); \
return exec; \
} \
const char* get_task_lib_name(void) { return TASK_NAME; } \
bool is_hrun_task_ = true; \
}
#define HRUN_TASK_CC(TRAIT_CLASS, TASK_NAME)\
extern "C" {\
void* alloc_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(\
new TYPE_UNWRAP(TRAIT_CLASS)());\
exec->Init(task->id_, state_name);\
return exec;\
}\
void* create_state(hrun::Admin::CreateTaskStateTask *task, const char *state_name) {\
hrun::TaskState *exec = reinterpret_cast<hrun::TaskState*>(\
new TYPE_UNWRAP(TRAIT_CLASS)());\
exec->Init(task->id_, state_name);\
RunContext rctx(0);\
exec->Run(hrun::TaskMethod::kConstruct, task, rctx);\
return exec;\
}\
const char* get_task_lib_name(void) { return TASK_NAME; }\
bool is_hrun_task_ = true;\
}

} // namespace hrun

#endif // HRUN_INCLUDE_HRUN_TASK_TASK_H_
46 changes: 34 additions & 12 deletions hrun/include/hrun/task_registry/task_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace hrun {
/** All information needed to create a trait */
struct TaskLibInfo {
void *lib_; /**< The dlfcn library */
alloc_state_t alloc_state_; /**< The create task function */
create_state_t create_state_; /**< The create task function */
get_task_lib_name_t get_task_lib_name; /**< The get task name function */

Expand All @@ -44,22 +45,27 @@ struct TaskLibInfo {

/** Emplace constructor */
explicit TaskLibInfo(void *lib,
create_state_t create_task,
alloc_state_t alloc_state,
create_state_t create_state,
get_task_lib_name_t get_task_name)
: lib_(lib), create_state_(create_task), get_task_lib_name(get_task_name) {}
: lib_(lib), alloc_state_(alloc_state),
create_state_(create_state), get_task_lib_name(get_task_name) {}

/** Copy constructor */
TaskLibInfo(const TaskLibInfo &other)
: lib_(other.lib_),
alloc_state_(other.alloc_state_),
create_state_(other.create_state_),
get_task_lib_name(other.get_task_lib_name) {}

/** Move constructor */
TaskLibInfo(TaskLibInfo &&other) noexcept
: lib_(other.lib_),
alloc_state_(other.alloc_state_),
create_state_(other.create_state_),
get_task_lib_name(other.get_task_lib_name) {
other.lib_ = nullptr;
other.alloc_state_ = nullptr;
other.create_state_ = nullptr;
other.get_task_lib_name = nullptr;
}
Expand All @@ -82,6 +88,7 @@ class TaskRegistry {
std::unordered_map<TaskStateId, TaskState*> task_states_;
/** A unique identifier counter */
std::atomic<u64> *unique_;
RwLock lock_;

public:
/** Default constructor */
Expand Down Expand Up @@ -156,6 +163,13 @@ class TaskRegistry {
lib_path);
return false;
}
info.alloc_state_ = (alloc_state_t)dlsym(
info.lib_, "alloc_state");
if (!info.alloc_state_) {
HELOG(kError, "The lib {} does not have alloc_state symbol",
lib_path);
return false;
}
info.get_task_lib_name = (get_task_lib_name_t)dlsym(
info.lib_, "get_task_lib_name");
if (!info.get_task_lib_name) {
Expand Down Expand Up @@ -191,6 +205,7 @@ class TaskRegistry {
/** Check if task state exists by ID */
HSHM_ALWAYS_INLINE
bool TaskStateExists(const TaskStateId &state_id) {
ScopedRwReadLock lock(lock_, 0);
auto it = task_states_.find(state_id);
return it != task_states_.end();
}
Expand All @@ -199,15 +214,15 @@ class TaskRegistry {
* Create a task state
* state_id must not be NULL.
* */
bool CreateTaskState(const char *lib_name,
const char *state_name,
const TaskStateId &state_id,
Admin::CreateTaskStateTask *task) {
TaskState* CreateTaskState(const char *lib_name,
const char *state_name,
const TaskStateId &state_id,
Admin::CreateTaskStateTask *task) {
// Ensure state_id is not NULL
if (state_id.IsNull()) {
HILOG(kError, "The task state ID cannot be null");
task->SetModuleComplete();
return false;
return nullptr;
}
// HILOG(kInfo, "(node {}) Creating an instance of {} with name {}",
// HRUN_CLIENT->node_id_, lib_name, state_name)
Expand All @@ -217,38 +232,41 @@ class TaskRegistry {
if (it == libs_.end()) {
HELOG(kError, "Could not find the task lib: {}", lib_name);
task->SetModuleComplete();
return false;
return nullptr;
}

// Ensure the task state does not already exist
if (TaskStateExists(state_id)) {
HELOG(kError, "The task state already exists: {}", state_name);
task->SetModuleComplete();
return true;
return nullptr;
}

// Create the state instance
task->id_ = state_id;
TaskLibInfo &info = it->second;
TaskState *task_state = info.create_state_(task, state_name);
TaskState *task_state;
task_state = info.create_state_(task, state_name);
if (!task_state) {
HELOG(kError, "Could not create the task state: {}", state_name);
task->SetModuleComplete();
return false;
return nullptr;
}

// Add the state to the registry
task_state->id_ = state_id;
task_state->name_ = state_name;
ScopedRwWriteLock lock(lock_, 0);
task_state_ids_.emplace(state_name, state_id);
task_states_.emplace(state_id, task_state);
HILOG(kInfo, "(node {}) Created an instance of {} with name {} and ID {}",
HRUN_CLIENT->node_id_, lib_name, state_name, state_id)
return true;
return task_state;
}

/** Get or create a task state's ID */
TaskStateId GetOrCreateTaskStateId(const std::string &state_name) {
ScopedRwReadLock lock(lock_, 0);
auto it = task_state_ids_.find(state_name);
if (it == task_state_ids_.end()) {
TaskStateId state_id = CreateTaskStateId();
Expand All @@ -260,6 +278,7 @@ class TaskRegistry {

/** Get a task state's ID */
TaskStateId GetTaskStateId(const std::string &state_name) {
ScopedRwReadLock lock(lock_, 0);
auto it = task_state_ids_.find(state_name);
if (it == task_state_ids_.end()) {
return TaskStateId::GetNull();
Expand All @@ -269,6 +288,7 @@ class TaskRegistry {

/** Get a task state instance */
TaskState* GetTaskState(const TaskStateId &task_state_id) {
ScopedRwReadLock lock(lock_, 0);
auto it = task_states_.find(task_state_id);
if (it == task_states_.end()) {
return nullptr;
Expand All @@ -278,6 +298,7 @@ class TaskRegistry {

/** Get task state instance by name OR by ID */
TaskState* GetTaskState(const std::string &task_name, const TaskStateId &task_state_id) {
ScopedRwReadLock lock(lock_, 0);
TaskStateId id = GetTaskStateId(task_name);
if (id.IsNull()) {
id = task_state_id;
Expand All @@ -287,6 +308,7 @@ class TaskRegistry {

/** Destroy a task state */
void DestroyTaskState(const TaskStateId &task_state_id) {
ScopedRwWriteLock lock(lock_, 0);
auto it = task_states_.find(task_state_id);
if (it == task_states_.end()) {
HELOG(kWarning, "Could not find the task state");
Expand Down
13 changes: 11 additions & 2 deletions hrun/src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,18 @@ void Worker::PollGrouped(WorkEntry &work_entry) {
rctx.lane_id_ = work_entry.lane_id_;
rctx.flush_ = &flush_;
// Get the task state
TaskState *&exec = rctx.exec_;
exec = HRUN_TASK_REGISTRY->GetTaskState(task->task_state_);
TaskState *exec = HRUN_TASK_REGISTRY->GetTaskState(task->task_state_);
rctx.exec_ = exec;
if (!exec) {
for (std::pair<std::string, TaskStateId> entries : HRUN_TASK_REGISTRY->task_state_ids_) {
HILOG(kInfo, "Task state: {} id: {} ptr: {} equal: {}",
entries.first, entries.second,
(size_t)HRUN_TASK_REGISTRY->task_states_[entries.second],
entries.second == task->task_state_);
}
bool was_end = HRUN_TASK_REGISTRY->task_states_.find(task->task_state_) ==
HRUN_TASK_REGISTRY->task_states_.end();
HILOG(kInfo, "Was end: {}", was_end);
HELOG(kFatal, "(node {}) Could not find the task state: {}",
HRUN_CLIENT->node_id_, task->task_state_);
entry->complete_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
IN hipc::ShmArchive<hipc::string> state_name_;
IN hipc::ShmArchive<hipc::vector<PriorityInfo>> queue_info_;
INOUT TaskStateId id_;
IN hipc::ShmArchive<hipc::string> custom_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand Down Expand Up @@ -170,6 +171,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_MAKE_AR(state_name_, alloc, state_name);
HSHM_MAKE_AR(lib_name_, alloc, lib_name);
HSHM_MAKE_AR(queue_info_, alloc, queue_info);
HSHM_MAKE_AR(custom_, alloc, "");
id_ = id;
}

Expand All @@ -178,6 +180,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
HSHM_DESTROY_AR(state_name_);
HSHM_DESTROY_AR(lib_name_);
HSHM_DESTROY_AR(queue_info_);
HSHM_DESTROY_AR(custom_);
}

/** Duplicate message */
Expand All @@ -203,7 +206,7 @@ struct CreateTaskStateTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
template<typename Ar>
void SerializeStart(Ar &ar) {
task_serialize<Ar>(ar);
ar(lib_name_, state_name_, id_, queue_info_);
ar(lib_name_, state_name_, id_, queue_info_, custom_);
}

/** (De)serialize message return */
Expand Down
6 changes: 2 additions & 4 deletions hrun/tasks_required/hrun_admin/src/hrun_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,15 @@ class Server : public TaskLib {
QueueId qid(task->id_);
MultiQueue *queue = HRUN_QM_RUNTIME->CreateQueue(
qid, task->queue_info_->vec());
// Run the task state's submethod
// Allocate the task state
task->method_ = Method::kConstruct;
bool ret = HRUN_TASK_REGISTRY->CreateTaskState(
HRUN_TASK_REGISTRY->CreateTaskState(
lib_name.c_str(),
state_name.c_str(),
task->id_,
task);
queue->flags_.SetBits(QUEUE_READY);
task->SetModuleComplete();
HILOG(kInfo, "(node {}) Allocated task state {} with id {}",
HRUN_CLIENT->node_id_, state_name, task->task_state_);
}

void GetTaskStateId(GetTaskStateIdTask *task, RunContext &rctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class Client : public TaskLibClient {
orig_task->task_node_ + 1, id_,
orig_task, exec, orig_task->method_, dups);
MultiQueue *queue = HRUN_CLIENT->GetQueue(queue_id_);
queue->Emplace(orig_task->prio_, orig_task->lane_hash_, dup_task.shm_);
queue->Emplace(TaskPrio::kLowLatency, orig_task->lane_hash_, dup_task.shm_);
}

/** Spawn task to accept new connections */
Expand Down
Loading

0 comments on commit 84aeac3

Please sign in to comment.