Skip to content

Commit

Permalink
Merge pull request #599 from lukemartinlogan/hermes-1.1
Browse files Browse the repository at this point in the history
Fix various memory issues
  • Loading branch information
lukemartinlogan authored Sep 29, 2023
2 parents b93a60a + 3c16457 commit 0e1afa2
Show file tree
Hide file tree
Showing 32 changed files with 711 additions and 365 deletions.
1 change: 0 additions & 1 deletion benchmark/test_init.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "test_init.h"

void MainPretest() {
TRANSPARENT_LABSTOR();
}

void MainPosttest() {
Expand Down
16 changes: 16 additions & 0 deletions benchmark/test_latency.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ void TestWorkerIterationLatency(u32 num_queues, u32 num_lanes) {

/** Time for worker to process a request */
TEST_CASE("TestWorkerLatency") {
TRANSPARENT_LABSTOR();
TestWorkerIterationLatency(1, 16);
TestWorkerIterationLatency(5, 16);
TestWorkerIterationLatency(10, 16);
Expand All @@ -251,6 +252,7 @@ TEST_CASE("TestWorkerLatency") {

/** Time to process a request */
TEST_CASE("TestRoundTripLatency") {
TRANSPARENT_LABSTOR();
HERMES->ClientInit();
labstor::small_message::Client client;
LABSTOR_ADMIN->RegisterTaskLibRoot(labstor::DomainId::GetLocal(), "small_message");
Expand All @@ -276,6 +278,20 @@ TEST_CASE("TestRoundTripLatency") {
HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

TEST_CASE("TestTimespecLatency") {
size_t ops = (1 << 20);
hshm::Timer t;

t.Resume();
for (size_t i = 0; i < ops; ++i) {
struct timespec ts;
timespec_get(&ts, TIME_UTC);
}
t.Pause();

HILOG(kInfo, "Latency: {} MOps", ops / t.GetUsec());
}

/** Time to process a request */
//TEST_CASE("TestHermesGetBlobIdLatency") {
// HERMES->ClientInit();
Expand Down
18 changes: 16 additions & 2 deletions codegen/refresh_methods
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ def refresh_methods(TASK_ROOT):
lines += [' }']
lines += ['}']

## Create the Del method
lines += ['/** Delete a task */',
'void Del(u32 method, Task *task) override {',
' switch (method) {']
for method_enum_name, method_off in methods:
method_name = method_enum_name.replace('k', '', 1)
task_name = method_name + "Task"
lines += [f' case Method::{method_enum_name}: {{',
f' LABSTOR_CLIENT->DelTask(reinterpret_cast<{task_name} *>(task));',
f' break;',
f' }}']
lines += [' }']
lines += ['}']

## Create the ReplicateStart method
lines += ['/** Ensure there is space to store replicated outputs */',
'void ReplicateStart(u32 method, u32 count, Task *task) override {',
Expand Down Expand Up @@ -119,8 +133,8 @@ def refresh_methods(TASK_ROOT):
method_name = method_enum_name.replace('k', '', 1)
task_name = method_name + "Task"
lines += [f' case Method::{method_enum_name}: {{',
f' task_ptr.task_ = LABSTOR_CLIENT->NewEmptyTask<{task_name}>(task_ptr.p_);',
f' ar >> *reinterpret_cast<{task_name}*>(task_ptr.task_);',
f' task_ptr.ptr_ = LABSTOR_CLIENT->NewEmptyTask<{task_name}>(task_ptr.shm_);',
f' ar >> *reinterpret_cast<{task_name}*>(task_ptr.ptr_);',
f' break;',
f' }}']
lines += [' }']
Expand Down
42 changes: 37 additions & 5 deletions include/labstor/api/labstor_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,14 +92,22 @@ class Client : public ConfigurationManager {
template<typename TaskT, typename ...Args>
HSHM_ALWAYS_INLINE
TaskT* NewEmptyTask(hipc::Pointer &p) {
return main_alloc_->NewObj<TaskT>(p, main_alloc_);
TaskT *task = main_alloc_->NewObj<TaskT>(p, main_alloc_);
if (task == nullptr) {
throw std::runtime_error("Could not allocate buffer");
}
return task;
}

/** Allocate task */
template<typename TaskT, typename ...Args>
HSHM_ALWAYS_INLINE
hipc::LPointer<TaskT> AllocateTask() {
return main_alloc_->AllocateLocalPtr<TaskT>(sizeof(TaskT));
hipc::LPointer<TaskT> task = main_alloc_->AllocateLocalPtr<TaskT>(sizeof(TaskT));
if (task.ptr_ == nullptr) {
throw std::runtime_error("Could not allocate buffer");
}
return task;
}

/** Construct task */
Expand Down Expand Up @@ -140,16 +148,40 @@ class Client : public ConfigurationManager {
HSHM_ALWAYS_INLINE
void DelTask(TaskT *task) {
// TODO(llogan): verify leak
// main_alloc_->DelObj<TaskT>(task);
task->delcnt_++;
if (task->delcnt_ != 1) {
HELOG(kFatal, "Freed task {} times: node={}, state={}. method={}",
task->delcnt_.load(), task->task_node_, task->task_state_, task->method_)
}
main_alloc_->DelObj<TaskT>(task);
}

/** Destroy a task */
template<typename TaskT>
HSHM_ALWAYS_INLINE
void DelTask(LPointer<TaskT> &task) {
task->delcnt_++;
if (task->delcnt_ != 1) {
HELOG(kFatal, "Freed task {} times: node={}, state={}. method={}",
task->delcnt_.load(), task->task_node_, task->task_state_, task->method_)
}
main_alloc_->DelObjLocal<TaskT>(task);
}

/** Destroy a task */
template<typename TaskStateT, typename TaskT>
HSHM_ALWAYS_INLINE
void DelTask(TaskStateT *exec, TaskT *task) {
exec->Del(task->method_, task);
}

/** Destroy a task */
template<typename TaskStateT, typename TaskT>
HSHM_ALWAYS_INLINE
void DelTask(TaskStateT *exec, LPointer<TaskT> &task) {
exec->Del(task->method_, task);
}

/** Get a queue by its ID */
HSHM_ALWAYS_INLINE
MultiQueue* GetQueue(const QueueId &queue_id) {
Expand Down Expand Up @@ -189,14 +221,14 @@ class Client : public ConfigurationManager {
HSHM_ALWAYS_INLINE
void FreeBuffer(hipc::Pointer &p) {
// TODO(llogan): verify leak
// main_alloc_->Free(p);
main_alloc_->Free(p);
}

/** Free a buffer */
HSHM_ALWAYS_INLINE
void FreeBuffer(LPointer<char> &p) {
// TODO(llogan): verify leak
main_alloc_->FreeLocalPtr(p);
main_alloc_->FreeLocalPtr(p);
}
};

Expand Down
2 changes: 2 additions & 0 deletions include/labstor/api/labstor_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Runtime : public ConfigurationManager {
remote_queue::Client remote_queue_;
RpcContext rpc_;
ThalliumRpc thallium_;
bool remote_created_ = false;

public:
/** Default constructor */
Expand Down Expand Up @@ -121,6 +122,7 @@ class Runtime : public ConfigurationManager {
task_registry_.RegisterTaskLib("remote_queue");
remote_queue_.CreateRoot(DomainId::GetLocal(), "remote_queue",
LABSTOR_CLIENT->MakeTaskStateId());
remote_created_ = true;
}

public:
Expand Down
13 changes: 13 additions & 0 deletions include/labstor/labstor_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,19 @@ struct UniqueId {
}
};

/** A sized data pointer */
template<typename T = char>
struct DataPointer {
hipc::Pointer ptr_;
size_t size_;

/** Serialization */
template<typename Ar>
void serialize(Ar &ar) {
ar(ptr_, size_);
}
};

/** Uniquely identify a task state */
using TaskStateId = UniqueId<1>;
/** Uniquely identify a queue */
Expand Down
11 changes: 10 additions & 1 deletion include/labstor/task_registry/task.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class TaskLib;
#define TASK_COROUTINE BIT_OPT(u32, 16)
/** This task uses argobot wait */
#define TASK_PREEMPTIVE BIT_OPT(u32, 17)
/** This task is apart of remote debugging */
#define TASK_REMOTE_DEBUG_MARK BIT_OPT(u32, 18)

/** Used to define task methods */
#define TASK_METHOD_T static inline const u32
Expand Down Expand Up @@ -213,6 +215,7 @@ constexpr inline void CALL_REPLICA_END(T *task) {
template<u32 FLAGS>
struct TaskFlags : public IsTask {
public:
TASK_FLAG_T IS_LOCAL = FLAGS & TF_LOCAL;
TASK_FLAG_T SUPPORTS_SRL = FLAGS & (TF_SRL_SYM | TF_SRL_ASYM);
TASK_FLAG_T SRL_SYM_START = FLAGS & TF_SRL_SYM_START;
TASK_FLAG_T SRL_SYM_END = FLAGS & TF_SRL_SYM_END;
Expand All @@ -234,7 +237,7 @@ class TaskPrio {
struct RunContext {
u32 lane_id_; /**< The lane id of the task */
bctx::transfer_t jmp_; /**< Current execution state of the task (runtime) */
size_t stack_size_ = KILOBYTES(64); /**< The size of the stack for the task (runtime) */
size_t stack_size_ = KILOBYTES(256); /**< The size of the stack for the task (runtime) */
void *stack_ptr_; /**< The pointer to the stack (runtime) */
TaskLib *exec_;

Expand All @@ -256,6 +259,7 @@ struct Task : public hipc::ShmContainer {
u32 lane_hash_; /**< Determine the lane a task is keyed to */
u32 method_; /**< The method to call in the state */
bitfield32_t task_flags_; /**< Properties of the task */
std::atomic<int> delcnt_ = 0; /**< # of times deltask called */
RunContext ctx_;

/**====================================
Expand Down Expand Up @@ -372,6 +376,11 @@ struct Task : public hipc::ShmContainer {
return task_flags_.Any(TASK_COROUTINE);
}

/** Set this task as blocking */
HSHM_ALWAYS_INLINE void UnsetCoroutine() {
task_flags_.UnsetBits(TASK_COROUTINE);
}

/** Set this task as blocking */
HSHM_ALWAYS_INLINE bool IsPreemptive() {
return task_flags_.Any(TASK_PREEMPTIVE);
Expand Down
43 changes: 4 additions & 39 deletions include/labstor/task_registry/task_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,7 @@

namespace labstor {

struct TaskPointer {
Task *task_;
hipc::Pointer p_;

/** Default constructor */
TaskPointer() : task_(nullptr) {}

/** Task-only constructor */
TaskPointer(Task *task) : task_(task) {}

/** Emplace constructor */
TaskPointer(Task *task, hipc::Pointer p) : task_(task), p_(p) {}

/** Copy constructor */
TaskPointer(const TaskPointer &other) : task_(other.task_), p_(other.p_) {}

/** Copy operator */
TaskPointer &operator=(const TaskPointer &other) {
task_ = other.task_;
p_ = other.p_;
return *this;
}

/** Move constructor */
TaskPointer(TaskPointer &&other) noexcept
: task_(other.task_), p_(other.p_) {
other.task_ = nullptr;
other.p_ = hipc::Pointer();
}

/** Move operator */
TaskPointer &operator=(TaskPointer &&other) noexcept {
task_ = other.task_;
p_ = other.p_;
other.task_ = nullptr;
other.p_ = hipc::Pointer();
return *this;
}
};
typedef LPointer<Task> TaskPointer;

/**
* Represents a custom operation to perform.
Expand Down Expand Up @@ -79,6 +41,9 @@ class TaskLib {
/** Run a method of the task */
virtual void Run(u32 method, Task *task, RunContext &ctx) = 0;

/** Delete a task */
virtual void Del(u32 method, Task *task) = 0;

/** Allow task to store replicas of completion */
virtual void ReplicateStart(u32 method, u32 count, Task *task) = 0;

Expand Down
2 changes: 1 addition & 1 deletion include/labstor/work_orchestrator/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ struct ScheduleTask : public Task, TaskFlags<TF_LOCAL> {
prio_ = TaskPrio::kLongRunning;
task_state_ = state_id;
method_ = SchedulerMethod::kSchedule;
task_flags_.SetBits(TASK_LONG_RUNNING);
task_flags_.SetBits(TASK_LONG_RUNNING | TASK_REMOTE_DEBUG_MARK);
domain_id_ = domain_id;

// Custom params
Expand Down
6 changes: 3 additions & 3 deletions include/labstor/work_orchestrator/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -345,10 +345,10 @@ class Worker {
}

HSHM_ALWAYS_INLINE
void EndTask(Lane *lane, Task *task, int &off) {
void EndTask(Lane *lane, TaskState *exec, Task *task, int &off) {
PopTask(lane, off);
if (task->IsFireAndForget()) {
LABSTOR_CLIENT->DelTask(task);
if (exec && task->IsFireAndForget()) {
LABSTOR_CLIENT->DelTask<TaskState>(exec, task);
} else {
task->SetComplete();
}
Expand Down
17 changes: 14 additions & 3 deletions src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,29 @@ void Worker::PollGrouped(WorkEntry &work_entry) {
HELOG(kFatal, "(node {}) Could not find the task state: {}",
LABSTOR_CLIENT->node_id_, task->task_state_);
entry->complete_ = true;
EndTask(lane, task, off);
EndTask(lane, exec, task, off);
continue;
}
// Attempt to run the task if it's ready and runnable
bool is_remote = task->domain_id_.IsRemote(LABSTOR_RPC->GetNumHosts(), LABSTOR_CLIENT->node_id_);
if (!task->IsRunDisabled() && CheckTaskGroup(task, exec, work_entry.lane_id_, task->task_node_, is_remote)) {
// TODO(llogan): Make a remote debug macro
#ifdef REMOTE_DEBUG
if (task->task_state_ != LABSTOR_QM_CLIENT->admin_task_state_ &&
!task->task_flags_.Any(TASK_REMOTE_DEBUG_MARK) &&
task->method_ != TaskMethod::kConstruct &&
LABSTOR_RUNTIME->remote_created_) {
is_remote = true;
}
task->task_flags_.SetBits(TASK_REMOTE_DEBUG_MARK);
#endif
// Execute or schedule task
if (is_remote) {
auto ids = LABSTOR_RUNTIME->ResolveDomainId(task->domain_id_);
LABSTOR_REMOTE_QUEUE->Disperse(task, exec, ids);
task->DisableRun();
task->SetUnordered();
task->UnsetCoroutine();
} else if (task->IsCoroutine()) {
if (!task->IsStarted()) {
ctx.stack_ptr_ = malloc(ctx.stack_size_);
Expand Down Expand Up @@ -105,12 +116,12 @@ void Worker::PollGrouped(WorkEntry &work_entry) {
entry->complete_ = true;
if (task->IsCoroutine()) {
// TODO(llogan): verify leak
// free(ctx.stack_ptr_);
free(ctx.stack_ptr_);
} else if (task->IsPreemptive()) {
ABT_thread_join(entry->thread_);
}
RemoveTaskGroup(task, exec, work_entry.lane_id_, is_remote);
EndTask(lane, task, off);
EndTask(lane, exec, task, off);
} else {
off += 1;
}
Expand Down
Loading

0 comments on commit 0e1afa2

Please sign in to comment.