Skip to content

Commit

Permalink
support barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
PhilipDeegan committed Aug 3, 2024
1 parent 8933c64 commit a8f3505
Show file tree
Hide file tree
Showing 9 changed files with 154 additions and 79 deletions.
20 changes: 4 additions & 16 deletions inc/mkn/gpu/cli.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,21 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef _MKN_GPU_CLI_HPP_
#define _MKN_GPU_CLI_HPP_

#include <optional>
#include <type_traits>

#include "mkn/kul/env.hpp"
#include "mkn/kul/string.hpp"

namespace mkn::gpu {

template <typename Device>
struct Cli {
//
constexpr static inline char const* MKN_GPU_BX_THREADS = "MKN_GPU_BX_THREADS";

auto bx_threads() const {
char const* ENV = "MKN_GPU_BX_THREADS";
if (mkn::kul::env::EXISTS(ENV)) {
return as<std::int32_t>(mkn::kul::env::GET(ENV));
}
if (kul::env::EXISTS(MKN_GPU_BX_THREADS))
return kul::String::INT32(kul::env::GET(MKN_GPU_BX_THREADS));
return dev.maxThreadsPerBlock;
}

template <typename T>
auto static as(std::string const& from) {
T t;
std::stringstream ss(from);
ss >> t;
return t;
}

Device const& dev;
};

Expand Down
7 changes: 4 additions & 3 deletions inc/mkn/gpu/cpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,13 @@ struct StreamEvent {
~StreamEvent() {}

auto& operator()() { return event; };
void record() { ; }
bool finished() const { return true; }
void reset() {}
void record() { ++stage; }
bool finished() const { return stage == 2; }
void reset() { stage = 0; }

Stream stream;
std::size_t event = 0;
std::uint16_t stage = 0;
};

template <typename T>
Expand Down
41 changes: 29 additions & 12 deletions inc/mkn/gpu/cuda.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include <cooperative_groups.h>
#include "mkn/gpu/def.hpp"

// #define MKN_GPU_ASSERT(x) (KASSERT((x) == cudaSuccess))
//

#define MKN_GPU_ASSERT(ans) \
{ gpuAssert((ans), __FILE__, __LINE__); }
Expand Down Expand Up @@ -110,14 +110,7 @@ struct Stream {

struct StreamEvent {
StreamEvent(Stream& stream_) : stream{stream_} { reset(); }
~StreamEvent() {
if (start) {
MKN_GPU_ASSERT(result = cudaEventDestroy(start))
}
if (stop) {
MKN_GPU_ASSERT(result = cudaEventDestroy(stop))
}
}
~StreamEvent() { clear(); }

StreamEvent(StreamEvent&& that) : stream{that.stream}, start{that.start}, stop{that.stop} {
that.start = nullptr;
Expand All @@ -128,20 +121,33 @@ struct StreamEvent {
StreamEvent& operator=(StreamEvent const&) = delete;

auto& operator()() { return stop; };
void record() {
auto& record() {
if (stage == 0) {
MKN_GPU_ASSERT(result = cudaEventRecord(start, stream()));
++stage;
} else {
MKN_GPU_ASSERT(result = cudaEventRecord(stop, stream()));
++stage;
}
return *this;
}
auto& wait() {
if (stage == 0) {
MKN_GPU_ASSERT(result = cudaStreamWaitEvent(stream(), start));
} else {
MKN_GPU_ASSERT(result = cudaStreamWaitEvent(stream(), stop));
}
return *this;
}

void clear() {
if (start) MKN_GPU_ASSERT(result = cudaEventDestroy(start));
if (stop) MKN_GPU_ASSERT(result = cudaEventDestroy(stop));
}
bool finished() const { return stage == 2 and cudaEventQuery(stop) == cudaSuccess; }
void reset() {
if (start) MKN_GPU_ASSERT(result = cudaEventDestroy(start));
clear();
MKN_GPU_ASSERT(result = cudaEventCreate(&start));
if (stop) MKN_GPU_ASSERT(result = cudaEventDestroy(stop));
MKN_GPU_ASSERT(result = cudaEventCreate(&stop));
stage = 0;
}
Expand Down Expand Up @@ -356,6 +362,17 @@ void inline prinfo(size_t dev = 0) {
KOUT(NON) << " threadsPBlock " << devProp.maxThreadsPerBlock;
}

void print_gpu_mem_used() {
float free_m = 0, total_m = 0, used_m = 0;
std::size_t free_t = 0, total_t = 0;
cudaMemGetInfo(&free_t, &total_t);
free_m = free_t / 1048576.0;
total_m = total_t / 1048576.0;
used_m = total_m - free_m;
printf(" mem free %zu .... %f MB mem total %zu....%f MB mem used %f MB\n", free_t, free_m,
total_t, total_m, used_m);
}

__device__ void grid_sync() {
namespace cg = cooperative_groups;
cg::grid_group grid = cg::this_grid();
Expand Down
75 changes: 52 additions & 23 deletions inc/mkn/gpu/multi_launch.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,21 @@ OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#ifndef _MKN_GPU_MULTI_LAUNCH_HPP_
#define _MKN_GPU_MULTI_LAUNCH_HPP_

#include <cassert>
#include <mutex>
#include <chrono>
#include <iostream>
#include <cstdint>
#include <vector>
#include <thread>
#include <mutex>
#include <vector>
#include <barrier>
#include <cassert>
#include <cstdint>
#include <iostream>
#include <algorithm>

#include "mkn/gpu.hpp"

namespace mkn::gpu {

enum class StreamFunctionMode { HOST_WAIT = 0, DEVICE_WAIT };
enum class StreamFunctionMode { HOST_WAIT = 0, DEVICE_WAIT, BARRIER };
enum class StreamFunctionStatus { HOST_BUSY = 0, DEVICE_BUSY };

template <typename Strat>
Expand Down Expand Up @@ -129,11 +130,14 @@ struct StreamLauncher {
}

void operator()(std::uint32_t const i) {
auto const& step = data_step[i];
auto const step = data_step[i];

assert(step < fns.size());
fns[step]->run(i);
assert(i < events.size());
if (fns[step]->mode == StreamFunctionMode::DEVICE_WAIT) events[i].record();

// if (fns[step]->mode == StreamFunctionMode::HOST_WAIT) events[i].stream.sync();
fns[step]->run(i);
if (fns[step]->mode == StreamFunctionMode::DEVICE_WAIT) events[i].record().wait();
}

bool is_finished() const {
Expand All @@ -145,7 +149,7 @@ struct StreamLauncher {

bool is_finished(std::uint32_t idx) const { return data_step[idx] == fns.size(); }

bool is_fn_finished(std::uint32_t i) {
bool is_fn_finished(std::uint32_t const& i) {
auto const b = [&]() {
auto const& step = data_step[i];
if (fns[step]->mode == StreamFunctionMode::HOST_WAIT) return true;
Expand Down Expand Up @@ -181,6 +185,24 @@ struct AsyncStreamHostFunction : StreamFunction<Strat> {
Fn fn;
};

template <typename Strat>
struct StreamBarrierFunction : StreamFunction<Strat> {
using Super = StreamFunction<Strat>;
using Super::strat;

StreamBarrierFunction(Strat& strat)
: Super{strat, StreamFunctionMode::BARRIER},
sync_point{std::ssize(strat.datas), on_completion} {}

void run(std::uint32_t const /*i*/) override { [[maybe_unused]] auto ret = sync_point.arrive(); }

std::function<void()> on_completion = [&]() {
for (auto& stat : strat.status) stat = SFS::WAIT;
};

std::barrier<decltype(on_completion)> sync_point;
};

template <typename Datas>
struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLauncher<Datas>> {
using This = ThreadedStreamLauncher<Datas>;
Expand All @@ -192,8 +214,9 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
constexpr static std::size_t wait_ms = 1;
constexpr static std::size_t wait_max_ms = 100;

ThreadedStreamLauncher(Datas& datas, std::size_t const _n_threads = 1)
: Super{datas}, n_threads{_n_threads} {
ThreadedStreamLauncher(Datas& datas, std::size_t const _n_threads = 1,
std::size_t const device = 0)
: Super{datas}, n_threads{_n_threads}, device_id{device} {
thread_status.resize(n_threads, SFP::NEXT);
status.resize(datas.size(), SFS::FIRST);
}
Expand All @@ -207,11 +230,16 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
return *this;
}

This& barrier() {
fns.emplace_back(std::make_shared<StreamBarrierFunction<This>>(*this));
return *this;
}

void operator()() { join(); }
Super& super() { return *this; }
void super(std::size_t const& idx) { return super()(idx); }

bool is_fn_finished(std::uint32_t i) {
bool is_fn_finished(std::uint32_t const& i) {
auto const b = [&]() {
if (fns[step[i]]->mode == StreamFunctionMode::HOST_WAIT) return status[i] == SFS::WAIT;
return events[i].finished();
Expand All @@ -222,21 +250,21 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
}
return b;
}

void thread_fn(std::size_t const& /*tid*/) {
// cudaSetDevice(0); // configurable
mkn::gpu::setDevice(device_id);
std::size_t waitms = wait_ms;
while (!done) {
auto const& [ts, idx] = get_work();

if (ts == SFP::WORK) {
waitms = wait_ms;
super(idx);

} else {
std::this_thread::sleep_for(std::chrono::milliseconds(waitms));
waitms = waitms >= wait_max_ms ? wait_max_ms : waitms + 10;
if (check_finished()) done = 1;
continue;
}

std::this_thread::sleep_for(std::chrono::milliseconds(waitms));
waitms = waitms >= wait_max_ms ? wait_max_ms : waitms + 10;
}
}

Expand All @@ -247,12 +275,12 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
}

std::pair<SFP, std::size_t> get_work(std::size_t const& start = 0) {
std::unique_lock<std::mutex> lk(work_);
std::scoped_lock<std::mutex> lk(work_);
for (std::size_t i = start; i < datas.size(); ++i) {
if (status[i] == SFS::BUSY) {
if (is_fn_finished(i)) status[i] = SFS::WAIT;

} else if (status[i] == SFS::WAIT) {
}
if (status[i] == SFS::WAIT) {
++step[i];

if (Super::is_finished(i)) {
Expand All @@ -268,7 +296,7 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
return std::make_pair(SFP::WORK, i);
}
}

if (check_finished()) done = 1;
return std::make_pair(SFP::SKIP, 0);
}

Expand All @@ -291,6 +319,7 @@ struct ThreadedStreamLauncher : public StreamLauncher<Datas, ThreadedStreamLaunc
}

std::size_t const n_threads = 1;
std::size_t const device_id = 0;
std::vector<std::thread> threads;

std::mutex work_;
Expand Down
Loading

0 comments on commit a8f3505

Please sign in to comment.