Skip to content

Commit

Permalink
Fix issue with data stager registration
Browse files Browse the repository at this point in the history
  • Loading branch information
lukemartinlogan committed Dec 30, 2023
1 parent 0e887a0 commit 6efc86f
Show file tree
Hide file tree
Showing 9 changed files with 53 additions and 32 deletions.
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ option(BUILD_MPI_TESTS "Build tests which depend on MPI" ON)
option(BUILD_OpenMP_TESTS "Build tests which depend on OpenMP" ON)
option(HERMES_ENABLE_COVERAGE "Check how well tests cover code" OFF)
option(HERMES_ENABLE_DOXYGEN "Check how well the code is documented" OFF)
option(HERMES_REMOTE_DEBUG "Enable remote debug mode on hrun" OFF)

option(HERMES_ENABLE_POSIX_ADAPTER "Build the Hermes POSIX adapter." ON)
option(HERMES_ENABLE_STDIO_ADAPTER "Build the Hermes stdio adapter." OFF)
Expand Down
13 changes: 10 additions & 3 deletions hrun/include/hrun/work_orchestrator/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,8 @@ class Worker {
}
// Get task properties
bool is_remote = task->domain_id_.IsRemote(HRUN_RPC->GetNumHosts(), HRUN_CLIENT->node_id_);
// #define REMOTE_DEBUG
#ifdef REMOTE_DEBUG
// #define HERMES_REMOTE_DEBUG
#ifdef HERMES_REMOTE_DEBUG
if (task->task_state_ != HRUN_QM_CLIENT->admin_task_state_ &&
!task->task_flags_.Any(TASK_REMOTE_DEBUG_MARK) &&
task->method_ != TaskMethod::kConstruct &&
Expand Down Expand Up @@ -487,7 +487,14 @@ class Worker {
task->SetStarted();
}
} else {
exec->Run(task->method_, task, rctx);
try {
exec->Run(task->method_, task, rctx);
} catch (std::exception &e) {
HELOG(kError, "(node {}) Worker {} caught an exception: {}", HRUN_CLIENT->node_id_, id_, e.what());
} catch (...) {
HELOG(kError, "(node {}) Worker {} caught an unknown exception", HRUN_CLIENT->node_id_, id_);

}
task->SetStarted();
}
task->DidRun(work_entry.cur_time_);
Expand Down
4 changes: 3 additions & 1 deletion hrun/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ add_library(hrun_runtime
hrun_runtime.cc)
add_dependencies(hrun_runtime ${Hermes_CLIENT_DEPS})
target_link_libraries(hrun_runtime thallium ${Hermes_CLIENT_LIBRARIES})

if (HERMES_REMOTE_DEBUG)
target_compile_definitions(hrun_runtime PUBLIC -DHERMES_REMOTE_DEBUG)
endif()
#------------------------------------------------------------------------------
# Build Hrun Runtime Start Function
#------------------------------------------------------------------------------
Expand Down
48 changes: 25 additions & 23 deletions hrun/tasks_required/remote_queue/src/remote_queue.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class Server : public TaskLib {
entry->thread_ = HRUN_WORK_ORCHESTRATOR->SpawnAsyncThread(
&Server::RunWaitPreemptive, entry);

entry = new AbtWorkerEntry(2, this);
entry->thread_ = HRUN_WORK_ORCHESTRATOR->SpawnAsyncThread(
&Server::RunAckPreemptive, entry);
// entry = new AbtWorkerEntry(2, this);
// entry->thread_ = HRUN_WORK_ORCHESTRATOR->SpawnAsyncThread(
// &Server::RunAckPreemptive, entry);
}
void Construct(ConstructTask *task, RunContext &rctx) {
HILOG(kInfo, "(node {}) Constructing remote queue (task_node={}, task_state={}, method={})",
Expand All @@ -87,8 +87,8 @@ class Server : public TaskLib {
HRUN_CLIENT->server_config_.queue_manager_.queue_depth_);
wait_ = hipc::make_uptr<hipc::mpsc_queue<WaitTask>>(
HRUN_CLIENT->server_config_.queue_manager_.queue_depth_);
ack_ = hipc::make_uptr<hipc::mpsc_queue<AckTask>>(
HRUN_CLIENT->server_config_.queue_manager_.queue_depth_);
// ack_ = hipc::make_uptr<hipc::mpsc_queue<AckTask>>(
// HRUN_CLIENT->server_config_.queue_manager_.queue_depth_);
CreateThreads();
HRUN_THALLIUM->RegisterRpc("RpcPushSmall", [this](
const tl::request &req,
Expand Down Expand Up @@ -494,28 +494,30 @@ class Server : public TaskLib {
size_t task_addr,
int replica,
std::string &ret) {
AckTask ack_task;
ack_task.task_ = (PushTask *) task_addr;
ack_task.replica_ = replica;
ack_task.ret_ = std::move(ret);
ack_->emplace(ack_task);
// AckTask ack_task;
// ack_task.task_ = (PushTask *) task_addr;
// ack_task.replica_ = replica;
// ack_task.ret_ = std::move(ret);
// ack_->emplace(ack_task);
// req.respond(0);
ClientHandlePushReplicaOutput(replica, ret, (PushTask *) task_addr);
req.respond(0);
}

/** An ABT thread to run a PUSH task */
static void RunAckPreemptive(void *data) {
AbtWorkerEntry *entry = (AbtWorkerEntry *) data;
Server *server = entry->server_;
WorkOrchestrator *orchestrator = HRUN_WORK_ORCHESTRATOR;
while (orchestrator->IsAlive()) {
AckTask ack_task;
while (!server->ack_->pop(ack_task).IsNull()) {
server->ClientHandlePushReplicaOutput(
ack_task.replica_, ack_task.ret_, ack_task.task_);
}
ABT_thread_yield();
}
}
// static void RunAckPreemptive(void *data) {
// AbtWorkerEntry *entry = (AbtWorkerEntry *) data;
// Server *server = entry->server_;
// WorkOrchestrator *orchestrator = HRUN_WORK_ORCHESTRATOR;
// while (orchestrator->IsAlive()) {
// AckTask ack_task;
// while (!server->ack_->pop(ack_task).IsNull()) {
// server->ClientHandlePushReplicaOutput(
// ack_task.replica_, ack_task.ret_, ack_task.task_);
// }
// ABT_thread_yield();
// }
// }

/** Handle output from replica PUSH */
void ClientHandlePushReplicaOutput(int replica,
Expand Down
4 changes: 4 additions & 0 deletions include/hermes/config_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ class ConfigurationManager {
is_initialized_ = true;
}

void ServerInit() {
ClientInit();
}

void LoadClientConfig(std::string config_path) {
// Load hermes config
if (config_path.empty()) {
Expand Down
5 changes: 5 additions & 0 deletions include/hermes/hermes.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ class Hermes {
HERMES_CONF->ClientInit();
}

/** Init hermes server */
void ServerInit() {
HERMES_CONF->ServerInit();
}

/** Check if initialized */
bool IsInitialized() {
return HERMES_CONF->is_initialized_;
Expand Down
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
#------------------------------------------------------------------------------
# Build Small Message Task Library
# Build Hermes
#------------------------------------------------------------------------------
add_library(hermes SHARED
hermes_config_manager.cc)
add_dependencies(hermes ${Hermes_RUNTIME_DEPS})
target_link_libraries(hermes ${Hermes_RUNTIME_LIBRARIES})

#------------------------------------------------------------------------------
# Install Small Message Task Library
# Install Hermes Library
#------------------------------------------------------------------------------
install(
TARGETS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ struct GetOrCreateTagTask : public Task, TaskFlags<TF_SRL_SYM> {
template<typename Ar>
void SerializeStart(Ar &ar) {
task_serialize<Ar>(ar);
ar(tag_name_, blob_owner_, traits_, backend_size_, flags_);
ar(tag_name_, blob_owner_, traits_, backend_size_, flags_, params_);
}

/** (De)serialize message return */
Expand Down
4 changes: 2 additions & 2 deletions test/unit/pipelines/test_ior.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ pkgs:
ram: 1g
- pkg_type: hermes_api
pkg_name: hermes_api
posix: true
mpi: true
- pkg_type: ior
pkg_name: ior
api: posix
api: mpiio
out: /tmp/test_hermes/ior.bin
xfer: 1m
block: 32g
Expand Down

0 comments on commit 6efc86f

Please sign in to comment.