Skip to content

Commit

Permalink
Merge pull request #597 from lukemartinlogan/hermes-1.1
Browse files Browse the repository at this point in the history
Improve asynchronous behavior. Temporarily disable freeing tasks.
  • Loading branch information
lukemartinlogan authored Sep 25, 2023
2 parents 9bb593c + 2eb22d1 commit b93a60a
Show file tree
Hide file tree
Showing 71 changed files with 1,644 additions and 1,108 deletions.
13 changes: 11 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cmake_minimum_required(VERSION 3.10)
project(labstor)
project(hermes)

#-----------------------------------------------------------------------------
# Define Options
Expand Down Expand Up @@ -93,6 +93,14 @@ if(thallium_FOUND)
message(STATUS "found thallium at ${thallium_DIR}")
endif()

# Boost
find_package(Boost REQUIRED COMPONENTS regex system filesystem fiber REQUIRED)
if (Boost_FOUND)
message(STATUS "found boost at ${Boost_INCLUDE_DIRS}")
endif()
include_directories(${Boost_INCLUDE_DIRS})
message("Boost: ${Boost_LIBRARIES}")

#------------------------------------------------------------------------------
# Setup CMake Environment
#------------------------------------------------------------------------------
Expand Down Expand Up @@ -158,7 +166,8 @@ set(Labstor_CLIENT_DEPS
labstor_client)
set(Labstor_RUNTIME_LIBRARIES
${Labstor_CLIENT_LIBRARIES}
labstor_runtime)
labstor_runtime
${Boost_LIBRARIES})
set(Labstor_RUNTIME_DEPS
labstor_client labstor_runtime)

Expand Down
8 changes: 4 additions & 4 deletions benchmark/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ target_link_libraries(test_performance_exec
# ${Labstor_CLIENT_LIBRARIES} hermes Catch2::Catch2
# MPI::MPI_CXX ${ZMQ_LIBRARIES})

add_executable(test_hermes_api
add_executable(hermes_api_bench
hermes_api_bench.cc)
add_dependencies(test_hermes_api
add_dependencies(hermes_api_bench
${Labstor_CLIENT_DEPS} hermes)
target_link_libraries(test_hermes_api
target_link_libraries(hermes_api_bench
${Labstor_CLIENT_LIBRARIES} hermes
Catch2::Catch2 MPI::MPI_CXX)

Expand All @@ -50,7 +50,7 @@ add_test(NAME test_performance COMMAND
#------------------------------------------------------------------------------
install(TARGETS
test_performance_exec
test_hermes_api
hermes_api_bench
EXPORT
${LABSTOR_EXPORTED_TARGETS}
LIBRARY DESTINATION ${LABSTOR_INSTALL_LIB_DIR}
Expand Down
131 changes: 99 additions & 32 deletions benchmark/hermes_api_bench.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ void PutTest(int nprocs, int rank,
for (size_t i = 0; i < blobs_per_rank; ++i) {
size_t blob_name_int = rank * blobs_per_rank + i;
std::string name = std::to_string(blob_name_int);
bkt.Put(name, blob, ctx);
bkt.AsyncPut(name, blob, ctx);
}
}
t.Pause();
HILOG(kInfo, "Finished PUT")
GatherTimes("Put", nprocs * blobs_per_rank * blob_size * repeat, t);
}

Expand All @@ -67,11 +68,10 @@ void GetTest(int nprocs, int rank,
t.Resume();
for (int j = 0; j < repeat; ++j) {
for (size_t i = 0; i < blobs_per_rank; ++i) {
hermes::Blob ret(blob_size);
size_t blob_name_int = rank * blobs_per_rank + i;
std::string name = std::to_string(blob_name_int);
hermes::Blob ret;
hermes::BlobId blob_id = bkt.GetBlobId(name);
bkt.Get(blob_id, ret, ctx);
bkt.Get(name, ret, ctx);
}
}
t.Pause();
Expand All @@ -86,6 +86,62 @@ void PutGetTest(int nprocs, int rank, int repeat,
GetTest(nprocs, rank, repeat, blobs_per_rank, blob_size);
}

/** Each process PUTS into the same bucket, but with different blob names */
void PartialPutTest(int nprocs, int rank,
int repeat, size_t blobs_per_rank,
size_t blob_size, size_t part_size) {
Timer t;
hermes::Context ctx;
hermes::Bucket bkt("hello", ctx);
hermes::Blob blob(blob_size);
t.Resume();
for (int j = 0; j < repeat; ++j) {
for (size_t i = 0; i < blobs_per_rank; ++i) {
size_t blob_name_int = rank * blobs_per_rank + i;
std::string name = std::to_string(blob_name_int);
for (size_t cur_size = 0; cur_size < blob_size; cur_size += part_size) {
bkt.PartialPut(name, blob, cur_size, ctx);
}
}
}
t.Pause();
GatherTimes("PartialPut", nprocs * blobs_per_rank * blob_size * repeat, t);
}

/**
* Each process GETS from the same bucket, but with different blob names
* MUST run PutTest first.
* */
void PartialGetTest(int nprocs, int rank,
int repeat, size_t blobs_per_rank,
size_t blob_size, size_t part_size) {
Timer t;
hermes::Context ctx;
hermes::Bucket bkt("hello", ctx);
t.Resume();
for (int j = 0; j < repeat; ++j) {
for (size_t i = 0; i < blobs_per_rank; ++i) {
size_t blob_name_int = rank * blobs_per_rank + i;
std::string name = std::to_string(blob_name_int);
hermes::Blob ret(blob_size);
for (size_t cur_size = 0; cur_size < blob_size; cur_size += part_size) {
bkt.PartialGet(name, ret, cur_size, ctx);
}
}
}
t.Pause();
GatherTimes("PartialGet", nprocs * blobs_per_rank * blob_size * repeat, t);
}

/** Each process PUTs then GETs */
void PartialPutGetTest(int nprocs, int rank, int repeat,
size_t blobs_per_rank, size_t blob_size,
size_t part_size) {
PartialPutTest(nprocs, rank, repeat, blobs_per_rank, blob_size, part_size);
MPI_Barrier(MPI_COMM_WORLD);
PartialGetTest(nprocs, rank, repeat, blobs_per_rank, blob_size, part_size);
}

/** Each process creates a set of buckets */
void CreateBucketTest(int nprocs, int rank,
size_t bkts_per_rank) {
Expand Down Expand Up @@ -120,7 +176,7 @@ void GetBucketTest(int nprocs, int rank,
hapi::Bucket bkt(std::to_string(bkt_name), ctx);
}
t.Pause();
GatherTimes("CreateBucket", bkts_per_rank * nprocs, t);
GatherTimes("GetBucket", bkts_per_rank * nprocs, t);
}

/** Each process deletes a number of buckets */
Expand Down Expand Up @@ -172,6 +228,7 @@ void help() {
printf("USAGE: ./api_bench [mode] ...\n");
printf("USAGE: ./api_bench put [blob_size (K/M/G)] [blobs_per_rank]\n");
printf("USAGE: ./api_bench putget [blob_size (K/M/G)] [blobs_per_rank]\n");
printf("USAGE: ./api_bench pputget [blob_size (K/M/G)] [part_size (K/M/G)] [blobs_per_rank]\n");
printf("USAGE: ./api_bench create_bkt [bkts_per_rank]\n");
printf("USAGE: ./api_bench get_bkt [bkts_per_rank]\n");
printf("USAGE: ./api_bench create_blob_1bkt [blobs_per_rank]\n");
Expand Down Expand Up @@ -202,33 +259,43 @@ int main(int argc, char **argv) {
HIPRINT("Beginning {}\n", mode)

// Run tests
if (mode == "put") {
REQUIRE_ARGC(4)
size_t blob_size = hshm::ConfigParse::ParseSize(argv[2]);
size_t blobs_per_rank = atoi(argv[3]);
PutTest(nprocs, rank, 1, blobs_per_rank, blob_size);
} else if (mode == "putget") {
REQUIRE_ARGC(4)
size_t blob_size = hshm::ConfigParse::ParseSize(argv[2]);
size_t blobs_per_rank = atoi(argv[3]);
PutGetTest(nprocs, rank, 1, blobs_per_rank, blob_size);
} else if (mode == "create_bkt") {
REQUIRE_ARGC(3)
size_t bkts_per_rank = atoi(argv[2]);
CreateBucketTest(nprocs, rank, bkts_per_rank);
} else if (mode == "get_bkt") {
REQUIRE_ARGC(3)
size_t bkts_per_rank = atoi(argv[2]);
GetBucketTest(nprocs, rank, bkts_per_rank);
} else if (mode == "del_bkt") {
REQUIRE_ARGC(4)
size_t bkt_per_rank = atoi(argv[2]);
size_t blobs_per_bkt = atoi(argv[3]);
DeleteBucketTest(nprocs, rank, bkt_per_rank, blobs_per_bkt);
} else if (mode == "del_blobs") {
REQUIRE_ARGC(4)
size_t blobs_per_rank = atoi(argv[2]);
DeleteBlobOneBucket(nprocs, rank, blobs_per_rank);
try {
if (mode == "put") {
REQUIRE_ARGC(4)
size_t blob_size = hshm::ConfigParse::ParseSize(argv[2]);
size_t blobs_per_rank = atoi(argv[3]);
PutTest(nprocs, rank, 1, blobs_per_rank, blob_size);
} else if (mode == "putget") {
REQUIRE_ARGC(4)
size_t blob_size = hshm::ConfigParse::ParseSize(argv[2]);
size_t blobs_per_rank = atoi(argv[3]);
PutGetTest(nprocs, rank, 1, blobs_per_rank, blob_size);
} else if (mode == "pputget") {
REQUIRE_ARGC(5)
size_t blob_size = hshm::ConfigParse::ParseSize(argv[2]);
size_t part_size = hshm::ConfigParse::ParseSize(argv[3]);
size_t blobs_per_rank = atoi(argv[4]);
PartialPutGetTest(nprocs, rank, 1, blobs_per_rank, blob_size, part_size);
} else if (mode == "create_bkt") {
REQUIRE_ARGC(3)
size_t bkts_per_rank = atoi(argv[2]);
CreateBucketTest(nprocs, rank, bkts_per_rank);
} else if (mode == "get_bkt") {
REQUIRE_ARGC(3)
size_t bkts_per_rank = atoi(argv[2]);
GetBucketTest(nprocs, rank, bkts_per_rank);
} else if (mode == "del_bkt") {
REQUIRE_ARGC(4)
size_t bkt_per_rank = atoi(argv[2]);
size_t blobs_per_bkt = atoi(argv[3]);
DeleteBucketTest(nprocs, rank, bkt_per_rank, blobs_per_bkt);
} else if (mode == "del_blobs") {
REQUIRE_ARGC(4)
size_t blobs_per_rank = atoi(argv[2]);
DeleteBlobOneBucket(nprocs, rank, blobs_per_rank);
}
} catch (hshm::Error &err) {
HELOG(kFatal, "Error: {}", err.what());
}
MPI_Finalize();
}
Empty file removed ci/hermes/packages/__init__.py
Empty file.
6 changes: 3 additions & 3 deletions ci/hermes/packages/hermes/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ class Hermes(CMakePackage):
variant('vfd', default=False, description='Enable HDF5 VFD')
variant('ares', default=False, description='Enable full libfabric install')
variant('debug', default=False, description='Enable debug mode')
variant('debug', default=False, description='Build shared libraries')
variant('zmq', default=False, description='Build ZeroMQ tests')

depends_on('[email protected]')
depends_on('cereal')
depends_on('[email protected]')
depends_on('[email protected]:')
# depends_on('[email protected]')
depends_on('catch2')
depends_on('[email protected]')
depends_on('yaml-cpp')
depends_on('[email protected]:')
depends_on('hermes_shm')
Expand Down
6 changes: 4 additions & 2 deletions ci/hermes/packages/hermes_shm/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ class HermesShm(CMakePackage):
version('master', branch='master')
depends_on('[email protected]')
depends_on('[email protected]')
depends_on('mpi')
depends_on('[email protected]:')
# depends_on('mpi')
depends_on('[email protected]')
depends_on('[email protected]: +context +fiber')
depends_on('cereal')
depends_on('yaml-cpp')
depends_on('[email protected]')

variant('debug', default=False, description='Build shared libraries')
Expand Down
Loading

0 comments on commit b93a60a

Please sign in to comment.