Skip to content

Commit

Permalink
Merge pull request #595 from lukemartinlogan/hermes-1.1
Browse files Browse the repository at this point in the history
Put blob + get blob updates
  • Loading branch information
lukemartinlogan authored Sep 16, 2023
2 parents e6b2e8a + 82490f4 commit 9bb593c
Show file tree
Hide file tree
Showing 7 changed files with 70 additions and 34 deletions.
2 changes: 0 additions & 2 deletions tasks/hermes_adapters/filesystem/filesystem_mdm.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,5 @@ class MetadataManager {
hshm::Singleton<hermes::adapter::fs::MetadataManager>::GetInstance()
#define HERMES_FS_METADATA_MANAGER_T hermes::adapter::fs::MetadataManager*

#define HERMES_FS_THREAD_POOL \
hshm::EasySingleton<hermes::ThreadPool>::GetInstance()

#endif // HERMES_ADAPTER_METADATA_MANAGER_H
5 changes: 2 additions & 3 deletions tasks/hermes_adapters/mpiio/mpiio_api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@

bool mpiio_intercepted = true;

#include <hermes.h>
#include <bucket.h>
#include <hermes/hermes.h>
#include <hermes/bucket.h>

#include "mpiio_api.h"
#include "mpiio_fs_api.h"
Expand All @@ -24,7 +24,6 @@ bool mpiio_intercepted = true;
/**
* Namespace declarations
*/
using hermes::ThreadPool;
using hermes::adapter::fs::MetadataManager;
using hermes::adapter::fs::File;
using hermes::adapter::fs::AdapterStat;
Expand Down
10 changes: 6 additions & 4 deletions tasks/hermes_adapters/mpiio/mpiio_fs_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ class MpiioFs : public Filesystem {
MPI_Datatype datatype, MPI_Request *request,
FsIoOptions opts) {
HILOG(kDebug, "Starting an asynchronous write")
auto mdm = HERMES_FS_METADATA_MANAGER;
// TODO(llogan): FIX
/*auto mdm = HERMES_FS_METADATA_MANAGER;
auto pool = HERMES_FS_THREAD_POOL;
Task* hreq = new HermesRequest();
auto lambda = [](MpiioFs *fs, File &f, AdapterStat &stat, const void *ptr,
Expand All @@ -161,12 +162,13 @@ class MpiioFs : public Filesystem {
auto func = std::bind(lambda, this, f, stat, ptr, count, datatype,
&hreq->io_status.mpi_status_, opts);
hreq->return_future = pool->run(func);
mdm->request_map.emplace(reinterpret_cast<size_t>(request), hreq);
mdm->request_map.emplace(reinterpret_cast<size_t>(request), hreq);*/
return MPI_SUCCESS;
}

int Wait(MPI_Request *req, MPI_Status *status) {
auto mdm = HERMES_FS_METADATA_MANAGER;
// TODO(llogan): FIX
/*auto mdm = HERMES_FS_METADATA_MANAGER;
auto real_api = HERMES_MPIIO_API;
auto iter = mdm->request_map.find(reinterpret_cast<size_t>(req));
if (iter != mdm->request_map.end()) {
Expand All @@ -176,7 +178,7 @@ class MpiioFs : public Filesystem {
mdm->request_map.erase(iter);
delete (hreq);
return MPI_SUCCESS;
}
}*/
return real_api->MPI_Wait(req, status);
}

Expand Down
53 changes: 38 additions & 15 deletions tasks/hermes_blob_mdm/src/hermes_blob_mdm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,42 +186,52 @@ class Server : public TaskLib {
// Stage in blob data from FS
task->data_ptr_.ptr_ = LABSTOR_CLIENT->GetPrivatePointer<char>(task->data_);
task->data_ptr_.shm_ = task->data_;
task->data_off_ = 0;
if (task->filename_->size() > 0 && blob_info.blob_size_ == 0) {
adapter::BlobPlacement plcmnt;
plcmnt.DecodeBlobName(*task->blob_name_);
HILOG(kDebug, "Attempting to stage {} bytes from the backend file {} at offset {}",
task->page_size_, task->filename_->str(), plcmnt.bucket_off_);
LPointer<char> new_data_ptr = LABSTOR_CLIENT->AllocateBuffer(task->page_size_);
int fd = HERMES_POSIX_API->open(task->filename_->c_str(), O_RDONLY);
int ret = HERMES_POSIX_API->pread(fd, new_data_ptr.ptr_, task->page_size_, plcmnt.bucket_off_);
if (fd < 0) {
HELOG(kError, "Failed to open file {}", task->filename_->str());
}
int ret = HERMES_POSIX_API->pread(fd, new_data_ptr.ptr_, task->page_size_, (off_t)plcmnt.bucket_off_);
if (ret < 0) {
// TODO(llogan): ret != page_size_ will require knowing file size before-hand
HELOG(kError, "Failed to stage in {} bytes from {}", task->page_size_, task->filename_->str());
}
HERMES_POSIX_API->close(fd);
memcpy(new_data_ptr.ptr_ + plcmnt.blob_off_, task->data_ptr_.ptr_, task->page_size_);
memcpy(new_data_ptr.ptr_ + plcmnt.blob_off_, task->data_ptr_.ptr_, task->data_size_);
task->data_ptr_ = new_data_ptr;
task->blob_off_ = 0;
task->data_size_ = task->page_size_;
task->data_size_ = ret;
task->data_off_ = plcmnt.bucket_off_ + task->blob_off_ + task->data_size_;
task->flags_.SetBits(HERMES_DID_STAGE_IN);
HILOG(kDebug, "Staged {} bytes from the backend file {}",
task->data_size_, task->filename_->str());
}

// Determine amount of additional buffering space needed
Context ctx;
size_t needed_space = task->blob_off_ + task->data_size_;
size_t size_diff = 0;
if (needed_space > blob_info.max_blob_size_) {
size_diff = needed_space - blob_info.max_blob_size_;
}
if (!task->flags_.Any(HERMES_DID_STAGE_IN)) {
task->data_off_ = size_diff;
}
blob_info.blob_size_ += size_diff;
HILOG(kDebug, "The size diff is {} bytes", size_diff)


// Initialize archives
HSHM_MAKE_AR0(task->schema_, nullptr);
HSHM_MAKE_AR0(task->bdev_writes_, nullptr);

// Use DPE
if (size_diff > 0) {
Context ctx;
auto *dpe = DpeFactory::Get(ctx.dpe_);
dpe->Placement({size_diff}, targets_, ctx, *task->schema_);
task->phase_ = PutBlobPhase::kAllocate;
Expand Down Expand Up @@ -289,8 +299,10 @@ class Server : public TaskLib {
size_t blob_off = 0, buf_off = 0;
HILOG(kDebug, "Number of buffers {}", blob_info.buffers_.size());
for (BufferInfo &buf : blob_info.buffers_) {
if (task->blob_off_ <= blob_off) {
size_t rel_off = blob_off - task->blob_off_;
size_t blob_left = blob_off;
size_t blob_right = blob_off + buf.t_size_;
if (blob_left <= task->blob_off_ && task->blob_off_ < blob_right) {
size_t rel_off = task->blob_off_ - blob_off;
size_t tgt_off = buf.t_off_ + rel_off;
size_t buf_size = buf.t_size_ - rel_off;
if (blob_off + buf_size > task->blob_off_ + task->data_size_) {
Expand All @@ -306,7 +318,7 @@ class Server : public TaskLib {
}
blob_off += buf.t_size_;
}
if (blob_off == 0) {
if (blob_off < task->data_size_) {
HELOG(kFatal, "Something annoying happened");
}
blob_info.max_blob_size_ = blob_off;
Expand All @@ -332,11 +344,14 @@ class Server : public TaskLib {
LABSTOR_CLIENT->FreeBuffer(task->data_ptr_);
}
// Update the bucket statistics
if (task->data_off_) {
bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1,
task->tag_id_,
task->data_off_);
int update_mode = bucket_mdm::UpdateSizeMode::kAdd;
if (task->flags_.Any(HERMES_DID_STAGE_IN)) {
update_mode = bucket_mdm::UpdateSizeMode::kCap;
}
bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1,
task->tag_id_,
task->data_off_,
update_mode);
if (task->flags_.Any(HERMES_BLOB_DID_CREATE)) {
bkt_mdm_.AsyncTagAddBlob(task->task_node_ + 1,
task->tag_id_,
Expand All @@ -358,20 +373,23 @@ class Server : public TaskLib {
}

void GetBlobGetPhase(GetBlobTask *task) {
HILOG(kDebug, "GetBlobTask start");
BlobInfo &blob_info = blob_map_[task->blob_id_];
HSHM_MAKE_AR0(task->bdev_reads_, nullptr);
std::vector<bdev::ReadTask*> &read_tasks = *task->bdev_reads_;
read_tasks.reserve(blob_info.buffers_.size());
if (task->data_size_ < 0) {
task->data_size_ = (ssize_t)(blob_info.blob_size_ - task->blob_off_);
}
HILOG(kDebug, "Getting blob {} of size {} starting at offset {} (total_blob_size={}, buffers={})",
task->blob_id_, task->data_size_, task->blob_off_, blob_info.blob_size_, blob_info.buffers_.size());
size_t blob_off = 0, buf_off = 0;
hipc::mptr<char> blob_data_mptr(task->data_);
char *blob_buf = blob_data_mptr.get();
for (BufferInfo &buf : blob_info.buffers_) {
if (task->blob_off_ <= blob_off) {
size_t rel_off = blob_off - task->blob_off_;
size_t blob_left = blob_off;
size_t blob_right = blob_off + buf.t_size_;
if (blob_left <= task->blob_off_ && task->blob_off_ < blob_right) {
size_t rel_off = task->blob_off_ - blob_off;
size_t tgt_off = buf.t_off_ + rel_off;
size_t buf_size = buf.t_size_ - rel_off;
if (blob_off + buf_size > task->blob_off_ + task->data_size_) {
Expand Down Expand Up @@ -593,6 +611,11 @@ class Server : public TaskLib {
LABSTOR_CLIENT->DelTask(free_task);
free_tasks.pop_back();
}
BlobInfo &blob_info = blob_map_[task->blob_id_];
bkt_mdm_.AsyncUpdateSize(task->task_node_ + 1,
task->tag_id_,
-(ssize_t)blob_info.blob_size_,
bucket_mdm::UpdateSizeMode::kAdd);
HSHM_DESTROY_AR(task->free_tasks_);
blob_map_.erase(task->blob_id_);
task->SetModuleComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,11 @@ class Client : public TaskLibClient {
void AsyncUpdateSizeConstruct(UpdateSizeTask *task,
const TaskNode &task_node,
TagId tag_id,
size_t update, bitfield32_t flags = bitfield32_t(0)) {
size_t update,
int mode) {
LABSTOR_CLIENT->ConstructTask<UpdateSizeTask>(
task, task_node, DomainId::GetNode(tag_id.node_id_), id_,
tag_id, update, flags);
tag_id, update, mode);
}
LABSTOR_TASK_NODE_PUSH_ROOT(UpdateSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,17 @@ struct SetBlobMdmTask : public Task, TaskFlags<TF_SRL_SYM | TF_REPLICA> {
void ReplicateEnd() {}
};

class UpdateSizeMode {
public:
TASK_METHOD_T kAdd = 0;
TASK_METHOD_T kCap = 0;
};

/** Update bucket size */
struct UpdateSizeTask : public Task, TaskFlags<TF_SRL_SYM> {
IN TagId tag_id_;
IN size_t update_;
IN bitfield32_t flags_;
IN ssize_t update_;
IN int mode_;

/** SHM default constructor */
HSHM_ALWAYS_INLINE explicit
Expand All @@ -143,8 +149,8 @@ struct UpdateSizeTask : public Task, TaskFlags<TF_SRL_SYM> {
const DomainId &domain_id,
const TaskStateId &state_id,
const TagId &tag_id,
size_t update,
bitfield32_t flags) : Task(alloc) {
ssize_t update,
int mode) : Task(alloc) {
// Initialize task
task_node_ = task_node;
lane_hash_ = tag_id.unique_;
Expand All @@ -157,7 +163,7 @@ struct UpdateSizeTask : public Task, TaskFlags<TF_SRL_SYM> {
// Custom params
tag_id_ = tag_id;
update_ = update;
flags_ = flags;
mode_ = mode;
}

/** Destructor */
Expand All @@ -167,7 +173,7 @@ struct UpdateSizeTask : public Task, TaskFlags<TF_SRL_SYM> {
template<typename Ar>
void SerializeStart(Ar &ar) {
task_serialize<Ar>(ar);
ar(tag_id_, update_, flags_);
ar(tag_id_, update_, mode_);
}

/** (De)serialize message return */
Expand Down
11 changes: 9 additions & 2 deletions tasks/hermes_bucket_mdm/src/hermes_bucket_mdm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,13 @@ class Server : public TaskLib {
/** Update the size of the bucket */
void UpdateSize(UpdateSizeTask *task) {
TagInfo &tag_info = tag_map_[task->tag_id_];
tag_info.internal_size_ = std::max(task->update_,
tag_info.internal_size_);
ssize_t internal_size = (ssize_t) tag_info.internal_size_;
if (task->mode_ == UpdateSizeMode::kAdd) {
internal_size += task->update_;
} else {
internal_size = std::max(task->update_, internal_size);
}
tag_info.internal_size_ = (size_t) internal_size;
task->SetModuleComplete();
}

Expand All @@ -69,6 +74,8 @@ class Server : public TaskLib {
size_t update_size = task->page_size_ - cur_page_off;
size_t max_pages = task->data_size_ / task->page_size_ + 1;
size_t cur_size = 0;
HILOG(kDebug, "(node {}) Bucket size {}, page_size {}, cur_page {} (task_node={})",
LABSTOR_CLIENT->node_id_, bucket_size, task->page_size_, cur_page, task->task_node_)
HSHM_MAKE_AR0(task->append_info_, nullptr);
std::vector<AppendInfo> &append_info = *task->append_info_;
append_info.reserve(max_pages);
Expand Down

0 comments on commit 9bb593c

Please sign in to comment.