From 1413d69b1b1f12386ceb08714a607d2a10dc3196 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 30 Sep 2024 22:15:43 +0000 Subject: [PATCH 1/9] fixed functionality of cpu locked tensor --- csrc/aio/common/deepspeed_aio_utils.cpp | 2 +- csrc/aio/common/deepspeed_aio_utils.h | 2 +- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 2 ++ csrc/aio/py_lib/deepspeed_aio_op_desc.h | 2 ++ csrc/aio/py_lib/deepspeed_cpu_op.cpp | 13 ++++++++----- csrc/aio/py_lib/deepspeed_cpu_op.h | 1 + csrc/aio/py_lib/deepspeed_pin_tensor.cpp | 16 +++++++++++++--- csrc/aio/py_lib/deepspeed_pin_tensor.h | 6 ++++-- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 7 ++++--- csrc/aio/py_lib/deepspeed_py_io_handle.h | 2 +- 10 files changed, 37 insertions(+), 16 deletions(-) diff --git a/csrc/aio/common/deepspeed_aio_utils.cpp b/csrc/aio/common/deepspeed_aio_utils.cpp index 763b2c253a34..8fccb1bf96cd 100644 --- a/csrc/aio/common/deepspeed_aio_utils.cpp +++ b/csrc/aio/common/deepspeed_aio_utils.cpp @@ -103,7 +103,7 @@ int get_file_size(const char* filename, long long int& size) return 0; } -void* ds_page_aligned_alloc(const size_t size, const bool lock) +void* ds_page_aligned_alloc(const long long int size, const bool lock) { void* ptr; int retval; diff --git a/csrc/aio/common/deepspeed_aio_utils.h b/csrc/aio/common/deepspeed_aio_utils.h index 9c58c2286610..ea56cd1de236 100644 --- a/csrc/aio/common/deepspeed_aio_utils.h +++ b/csrc/aio/common/deepspeed_aio_utils.h @@ -74,6 +74,6 @@ struct io_prep_generator { int prep_iocbs(const int n_iocbs, std::vector* iocbs); }; -void* ds_page_aligned_alloc(const size_t size, const bool lock = false); +void* ds_page_aligned_alloc(const long long int size, const bool lock = false); int get_file_size(const char* filename, long long int& size); diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index dc820be528d0..63d2ba022997 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -9,6 +9,7 @@ using namespace std; io_op_desc_t::io_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, @@ -16,6 +17,7 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const bool validate) : _read_op(read_op), _buffer(buffer), + _is_managed(is_managed), _fd(fd), _filename(filename), _file_num_bytes(file_num_bytes), diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 350d28d29d58..42e55cd2e2a0 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -12,6 +12,7 @@ struct io_op_desc_t { const bool _read_op; torch::Tensor _buffer; + const bool _is_managed; int _fd; const std::string _filename; const long long int _file_num_bytes; @@ -22,6 +23,7 @@ struct io_op_desc_t { io_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 41790b99bb88..22b751eef625 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -9,23 +9,25 @@ using namespace std; cpu_op_desc_t::cpu_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, const int num_threads, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, num_threads, validate), + : io_op_desc_t(read_op, buffer, is_managed, fd, filename, file_num_bytes, num_threads, validate), _cpu_buffer(buffer) { // Need to use CPU bounce buffer if buffer is not a page-locked DRAM memory. - _use_bounce_buffer = !(_buffer.is_cpu() && _buffer.is_pinned()); + _use_bounce_buffer = !(_buffer.is_cpu() && (_buffer.is_pinned() || _is_managed)); if (_use_bounce_buffer) { if (_read_op) { auto options = torch::TensorOptions() .dtype(_buffer.dtype()) .layout(_buffer.layout()) - .device(torch::kCPU); - _cpu_buffer = torch::empty(_buffer.nbytes(), options).pin_memory(); + .device(torch::kCPU) + .requires_grad(false); + _cpu_buffer = torch::empty(_buffer.numel(), options).pin_memory(); } else { _cpu_buffer = _buffer.to(torch::kCPU).pin_memory(); } @@ -37,9 +39,10 @@ char* cpu_op_desc_t::data_ptr() const { return (char*)_contiguous_buffer.data_pt void cpu_op_desc_t::finish() { - if (_read_op) { + if (_read_op && _use_bounce_buffer) { if (_buffer.is_cuda()) { _buffer.copy_(_cpu_buffer.to(torch::kCUDA)); } if (_buffer.is_xpu()) { _buffer.copy_(_cpu_buffer.to(torch::kXPU)); } + if (_buffer.is_cpu()) { _buffer.copy_(_cpu_buffer); } #if defined(__ENABLE_CANN__) if (torch_npu::utils::is_npu(_buffer)) { auto device = at::Device("npu:0"); diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index da96dd2b1d50..efc60f97f93c 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -13,6 +13,7 @@ struct cpu_op_desc_t : io_op_desc_t { cpu_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.cpp b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp index 752823dc7dd2..f57b4394fc99 100644 --- a/csrc/aio/py_lib/deepspeed_pin_tensor.cpp +++ b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp @@ -19,7 +19,7 @@ deepspeed_pin_tensor_t::~deepspeed_pin_tensor_t() _locked_tensors.clear(); } -torch::Tensor deepspeed_pin_tensor_t::alloc(const size_t num_elem, const at::ScalarType& elem_type) +torch::Tensor deepspeed_pin_tensor_t::alloc(const long long int num_elem, const at::ScalarType& elem_type) { const auto num_bytes = num_elem * elementSize(elem_type); auto pinned_buffer = ds_page_aligned_alloc(num_bytes, true); @@ -27,9 +27,9 @@ torch::Tensor deepspeed_pin_tensor_t::alloc(const size_t num_elem, const at::Sca _locked_tensors[pinned_buffer] = num_bytes; - auto options = torch::TensorOptions().dtype(elem_type).device(torch::kCPU); + auto options = torch::TensorOptions().dtype(elem_type).device(torch::kCPU).requires_grad(false); - return at::from_blob(pinned_buffer, static_cast(num_bytes), options); + return at::from_blob(pinned_buffer, static_cast(num_elem), options); } bool deepspeed_pin_tensor_t::free(torch::Tensor& locked_tensor) @@ -43,3 +43,13 @@ bool deepspeed_pin_tensor_t::free(torch::Tensor& locked_tensor) return false; } + +bool deepspeed_pin_tensor_t::is_managed(const torch::Tensor& buffer) +{ + auto addr = buffer.data_ptr(); + if (!buffer.is_cpu()){ return false;} + if (_locked_tensors.find(addr) != _locked_tensors.end()) { + return true; + } + return false; +}; diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.h b/csrc/aio/py_lib/deepspeed_pin_tensor.h index 4350a4ac7df6..195696a05833 100644 --- a/csrc/aio/py_lib/deepspeed_pin_tensor.h +++ b/csrc/aio/py_lib/deepspeed_pin_tensor.h @@ -15,13 +15,15 @@ Functionality for managing CPU tensors occupying page-locked memory. #include "deepspeed_py_aio.h" struct deepspeed_pin_tensor_t { - std::map _locked_tensors; + std::map _locked_tensors; deepspeed_pin_tensor_t() = default; ~deepspeed_pin_tensor_t(); - torch::Tensor alloc(const size_t num_elem, const at::ScalarType& elem_type); + torch::Tensor alloc(const long long num_elem, const at::ScalarType& elem_type); bool free(torch::Tensor& locked_tensor); + + bool is_managed(const torch::Tensor& buffer); }; diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index bdf2a858d797..e5e89419269d 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -209,8 +209,9 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( const long long int file_num_bytes, const bool validate) { + bool is_managed = _pinned_tensor_mgr->is_managed(buffer); return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, is_managed, fd, filename, file_num_bytes, _num_threads, validate); } int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, @@ -229,7 +230,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, std::cout << filename << ": buffer nbytes != file bytes " << buffer_bytes << " != " << num_file_bytes << std::endl; } - assert(static_cast(buffer.nbytes()) == num_file_bytes); + assert(buffer_bytes == num_file_bytes); assert((num_file_bytes % _num_threads) == 0); if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; } @@ -288,7 +289,7 @@ int deepspeed_io_handle_t::async_pwrite(const torch::Tensor& buffer, const char* return pwrite(buffer, filename, false, true); } -at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const size_t num_elem, +at::Tensor deepspeed_io_handle_t::new_cpu_locked_tensor(const long long int num_elem, const torch::Tensor& example_tensor) { return _pinned_tensor_mgr->alloc(num_elem, example_tensor.scalar_type()); diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 2974ebe87bfc..e21fd22fc3f7 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -61,7 +61,7 @@ struct deepspeed_io_handle_t { int async_pwrite(const torch::Tensor& buffer, const char* filename); // TODO: Make API's args to be shape and dtype. - torch::Tensor new_cpu_locked_tensor(const size_t num_elem, const torch::Tensor& example_tensor); + torch::Tensor new_cpu_locked_tensor(const long long int num_elem, const torch::Tensor& example_tensor); bool free_cpu_locked_tensor(torch::Tensor&); From c13bb10855bef4e0ede03da7a19b73439d51836f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 1 Oct 2024 18:40:51 +0000 Subject: [PATCH 2/9] enabling cpu locked in unittests, and fixing compilation errors --- csrc/gds/py_lib/deepspeed_gds_op.cpp | 3 ++- csrc/gds/py_lib/deepspeed_gds_op.h | 1 + csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 2 +- tests/unit/ops/aio/test_aio.py | 6 +++--- tests/unit/ops/aio/test_gds.py | 2 +- 5 files changed, 8 insertions(+), 6 deletions(-) diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index c370a448e5a2..c0b3a335e268 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -92,12 +92,13 @@ void gds_op_desc_t::remove_buffer_from_registry(const torch::Tensor& buffer) gds_op_desc_t::gds_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, const int num_threads, const bool validate) - : io_op_desc_t(read_op, buffer, fd, filename, file_num_bytes, num_threads, validate) + : io_op_desc_t(read_op, buffer,is_managed, fd, filename, file_num_bytes, num_threads, validate) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); diff --git a/csrc/gds/py_lib/deepspeed_gds_op.h b/csrc/gds/py_lib/deepspeed_gds_op.h index b7fab64d4054..70c8f7ced4f5 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.h +++ b/csrc/gds/py_lib/deepspeed_gds_op.h @@ -20,6 +20,7 @@ struct gds_op_desc_t : io_op_desc_t { gds_op_desc_t(const bool read_op, const torch::Tensor& buffer, + const bool is_managed, const int fd, const char* filename, const long long int file_num_bytes, diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 15fd516acaae..e65d7cc40cfd 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -107,7 +107,7 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( { if (buffer.is_cuda()) { return std::make_shared( - read_op, buffer, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, false, fd, filename, file_num_bytes, _num_threads, validate); } return deepspeed_io_handle_t::_create_io_op_desc( read_op, buffer, fd, filename, file_num_bytes, validate); diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index e6927efc3824..9d4b12a6daf7 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -78,7 +78,7 @@ def _validate_handle_state(handle, single_submit, overlap_events): assert handle.get_queue_depth() == QUEUE_DEPTH -@pytest.mark.parametrize("use_cuda_pinned_tensor", [True]) # TODO: aio_handle pinned tensor API is broken +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) @pytest.mark.parametrize("single_submit", [True, False]) @pytest.mark.parametrize("overlap_events", [True, False]) class TestRead(DistributedTest): @@ -144,7 +144,7 @@ def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap h.free_cpu_locked_tensor(aio_buffer) -@pytest.mark.parametrize("use_cuda_pinned_tensor", [True]) # TODO: aio_handle pinned tensor API is broken +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) @pytest.mark.parametrize("single_submit", [True, False]) @pytest.mark.parametrize("overlap_events", [True, False]) class TestWrite(DistributedTest): @@ -213,7 +213,7 @@ def test_async_write(self, tmpdir, use_cuda_pinned_tensor, single_submit, overla @pytest.mark.sequential -@pytest.mark.parametrize("use_cuda_pinned_tensor", [True]) # TODO: aio_handle pinned tensor API is broken +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) @pytest.mark.parametrize("cuda_device", [True, False]) class TestAsyncQueue(DistributedTest): world_size = 1 diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index 53655994b560..9612cc339876 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -54,7 +54,7 @@ def _get_test_write_file_and_device_buffer(tmpdir, ref_buffer, gds_handle, index def _validate_handle_state(handle, single_submit, overlap_events): assert handle.get_single_submit() == single_submit assert handle.get_overlap_events() == overlap_events - assert handle.get_thread_count() == IO_PARALLEL + assert handle.get_thread_count() == 1 assert handle.get_block_size() == BLOCK_SIZE assert handle.get_queue_depth() == QUEUE_DEPTH From b909702a02d8873a80f5958002c55bafb21dadaa Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 18:08:04 +0000 Subject: [PATCH 3/9] passing gds tests --- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 13 +++++++------ csrc/gds/py_lib/deepspeed_py_gds_handle.h | 6 +++++- tests/unit/ops/aio/test_gds.py | 2 +- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index e65d7cc40cfd..79d606b14b3f 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -20,20 +20,21 @@ deepspeed_gds_handle_t::deepspeed_gds_handle_t(const int block_size, const bool single_submit, const bool overlap_events, const int num_threads) - : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, 1) + : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, 1), + _num_gpu_threads(num_threads) { - _init_cuFile(block_size, queue_depth, num_threads); + _init_cuFile(block_size,queue_depth); } deepspeed_gds_handle_t::~deepspeed_gds_handle_t() { _close_cuFile(); } -void deepspeed_gds_handle_t::_init_cuFile(const int block_size, - const int queue_depth, - const int num_threads) +const int deepspeed_gds_handle_t::get_thread_count() const { return _num_gpu_threads; } + +void deepspeed_gds_handle_t::_init_cuFile(const int block_size, const int queue_depth) { if (deepspeed_gds_handle_t::s_cuFile_init == 0) { std::string depthStr = std::to_string(queue_depth); - std::string threadsStr = std::to_string(num_threads); + std::string threadsStr = std::to_string(_num_gpu_threads); std::string json1 = R"({"execution": {"max_io_queue_depth": )" + depthStr + ", "; std::string json2 = R"("max_request_parallelism": )" + threadsStr + ", "; std::string json3 = R"("max_io_threads": )" + threadsStr + ", "; diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index f324e6b65e80..3cf49a4db453 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -12,6 +12,8 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. #include "deepspeed_py_io_handle.h" struct deepspeed_gds_handle_t : deepspeed_io_handle_t { + const int _num_gpu_threads; + deepspeed_gds_handle_t(const int block_size, const int queue_depth, const bool single_submit, @@ -29,10 +31,12 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t { bool unpin_device_tensor(const torch::Tensor& buffer); - void _init_cuFile(const int block_size, const int queue_length, const int num_threads); + void _init_cuFile(const int block_size, const int queue_depth); void _close_cuFile(); + const int get_thread_count() const; + std::shared_ptr _create_io_op_desc(const bool read_op, const torch::Tensor& buffer, const int fd, diff --git a/tests/unit/ops/aio/test_gds.py b/tests/unit/ops/aio/test_gds.py index 9612cc339876..53655994b560 100644 --- a/tests/unit/ops/aio/test_gds.py +++ b/tests/unit/ops/aio/test_gds.py @@ -54,7 +54,7 @@ def _get_test_write_file_and_device_buffer(tmpdir, ref_buffer, gds_handle, index def _validate_handle_state(handle, single_submit, overlap_events): assert handle.get_single_submit() == single_submit assert handle.get_overlap_events() == overlap_events - assert handle.get_thread_count() == 1 + assert handle.get_thread_count() == IO_PARALLEL assert handle.get_block_size() == BLOCK_SIZE assert handle.get_queue_depth() == QUEUE_DEPTH From b1ee7118434eedb304a002744980f427e0707322 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 18:42:21 +0000 Subject: [PATCH 4/9] renaming all instances of num_threads --- csrc/aio/py_lib/deepspeed_aio_op_desc.cpp | 6 +++--- csrc/aio/py_lib/deepspeed_aio_op_desc.h | 4 ++-- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 6 +++--- csrc/aio/py_lib/deepspeed_cpu_op.h | 2 +- csrc/aio/py_lib/deepspeed_py_aio_handle.cpp | 4 ++-- csrc/aio/py_lib/deepspeed_py_aio_handle.h | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 14 +++++++------- csrc/aio/py_lib/deepspeed_py_io_handle.h | 4 ++-- csrc/aio/py_lib/py_ds_aio.cpp | 2 +- csrc/gds/py_lib/deepspeed_gds_op.cpp | 6 +++--- csrc/gds/py_lib/deepspeed_gds_op.h | 2 +- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 10 +++++----- csrc/gds/py_lib/deepspeed_py_gds_handle.h | 4 ++-- csrc/gds/py_lib/py_ds_gds.cpp | 2 +- 14 files changed, 34 insertions(+), 34 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp index 63d2ba022997..5abe8b41e1f3 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.cpp @@ -13,7 +13,7 @@ io_op_desc_t::io_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate) : _read_op(read_op), _buffer(buffer), @@ -21,8 +21,8 @@ io_op_desc_t::io_op_desc_t(const bool read_op, _fd(fd), _filename(filename), _file_num_bytes(file_num_bytes), - _num_threads(num_threads), - _num_bytes_per_thread(file_num_bytes / num_threads), + _intra_op_parallelism(intra_op_parallelism), + _num_bytes_per_thread(file_num_bytes / intra_op_parallelism), _validate(validate) { } diff --git a/csrc/aio/py_lib/deepspeed_aio_op_desc.h b/csrc/aio/py_lib/deepspeed_aio_op_desc.h index 42e55cd2e2a0..e3359708e08a 100644 --- a/csrc/aio/py_lib/deepspeed_aio_op_desc.h +++ b/csrc/aio/py_lib/deepspeed_aio_op_desc.h @@ -16,7 +16,7 @@ struct io_op_desc_t { int _fd; const std::string _filename; const long long int _file_num_bytes; - const int _num_threads; + const int _intra_op_parallelism; const long long int _num_bytes_per_thread; torch::Tensor _contiguous_buffer; const bool _validate; @@ -27,7 +27,7 @@ struct io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate); virtual void run(const int tid, diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index 22b751eef625..c03b6aae780a 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -13,9 +13,9 @@ cpu_op_desc_t::cpu_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer, is_managed, fd, filename, file_num_bytes, num_threads, validate), + : io_op_desc_t(read_op, buffer, is_managed, fd, filename, file_num_bytes, intra_op_parallelism, validate), _cpu_buffer(buffer) { // Need to use CPU bounce buffer if buffer is not a page-locked DRAM memory. @@ -61,7 +61,7 @@ void cpu_op_desc_t::run(const int tid, std::unique_ptr& aio_ctxt, deepspeed_aio_config_t* aio_config) { - assert(tid < _num_threads); + assert(tid < _intra_op_parallelism); const auto base_offset = _num_bytes_per_thread * tid; std::unique_ptr xfer_ctxt( diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.h b/csrc/aio/py_lib/deepspeed_cpu_op.h index efc60f97f93c..3dd8f7222b23 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.h +++ b/csrc/aio/py_lib/deepspeed_cpu_op.h @@ -17,7 +17,7 @@ struct cpu_op_desc_t : io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate); void run(const int tid, diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp index c7ca5e82afde..aed87d0c694d 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp @@ -16,8 +16,8 @@ deepspeed_aio_handle_t::deepspeed_aio_handle_t(const int block_size, const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads) - : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, num_threads) + const int intra_op_parallelism) + : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, intra_op_parallelism) { } diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.h b/csrc/aio/py_lib/deepspeed_py_aio_handle.h index eb6b90ea22f0..1398df9a56c9 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.h @@ -16,7 +16,7 @@ struct deepspeed_aio_handle_t : deepspeed_io_handle_t { const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads); + const int intra_op_parallelism); ~deepspeed_aio_handle_t(); }; diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index e5e89419269d..1e896464d7c2 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -18,16 +18,16 @@ deepspeed_io_handle_t::deepspeed_io_handle_t(const int block_size, const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads) + const int intra_op_parallelism) : _aio_ctxt(new aio_context(block_size, queue_depth)), _single_submit(single_submit), _overlap_events(overlap_events), - _num_threads(num_threads), + _intra_op_parallelism(intra_op_parallelism), _aio_config(block_size, queue_depth, single_submit, overlap_events, false), _num_pending_ops(0), _pinned_tensor_mgr(new deepspeed_pin_tensor_t()) { - for (auto i = 0; i < num_threads; ++i) { + for (auto i = 0; i < intra_op_parallelism; ++i) { _thread_contexts.push_back(std::make_shared(i, _aio_config)); } @@ -56,7 +56,7 @@ const bool deepspeed_io_handle_t::get_single_submit() const { return _single_sub const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_events; } -const int deepspeed_io_handle_t::get_thread_count() const { return _num_threads; } +const int deepspeed_io_handle_t::get_thread_count() const { return _intra_op_parallelism; } int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate) { @@ -211,7 +211,7 @@ std::shared_ptr deepspeed_io_handle_t::_create_io_op_desc( { bool is_managed = _pinned_tensor_mgr->is_managed(buffer); return std::make_shared( - read_op, buffer, is_managed, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, is_managed, fd, filename, file_num_bytes, _intra_op_parallelism, validate); } int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, @@ -231,7 +231,7 @@ int deepspeed_io_handle_t::pread(const torch::Tensor& buffer, << " != " << num_file_bytes << std::endl; } assert(buffer_bytes == num_file_bytes); - assert((num_file_bytes % _num_threads) == 0); + assert((num_file_bytes % _intra_op_parallelism) == 0); if (!_is_valid_parallel_aio_op(true, num_file_bytes)) { return -1; } @@ -253,7 +253,7 @@ int deepspeed_io_handle_t::pwrite(const torch::Tensor& buffer, const bool async) { const auto num_write_bytes = static_cast(buffer.nbytes()); - assert((num_write_bytes % _num_threads) == 0); + assert((num_write_bytes % _intra_op_parallelism) == 0); if (!_is_valid_parallel_aio_op(false, num_write_bytes)) { return -1; } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index e21fd22fc3f7..9140d468d4f9 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -16,7 +16,7 @@ struct deepspeed_io_handle_t { std::unique_ptr _aio_ctxt; const bool _single_submit; const bool _overlap_events; - const int _num_threads; + const int _intra_op_parallelism; deepspeed_aio_config_t _aio_config; std::vector> _thread_contexts; @@ -28,7 +28,7 @@ struct deepspeed_io_handle_t { const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads); + const int intra_op_parallelism); virtual ~deepspeed_io_handle_t() = 0; diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index 3171d0c6bf3c..1b0aa17d413f 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -27,7 +27,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "queue_depth"_a = 128, "single_submit"_a = false, "overlap_events"_a = false, - "num_threads"_a = 1) + "intra_op_parallelism"_a = 1) .def("get_block_size", &deepspeed_aio_handle_t::get_block_size) .def("get_queue_depth", &deepspeed_aio_handle_t::get_queue_depth) diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index c0b3a335e268..ed0d6372a274 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -96,9 +96,9 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer,is_managed, fd, filename, file_num_bytes, num_threads, validate) + : io_op_desc_t(read_op, buffer,is_managed, fd, filename, file_num_bytes, intra_op_parallelism, validate) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); @@ -124,7 +124,7 @@ void gds_op_desc_t::run(const int tid, std::unique_ptr& aio_ctxt, deepspeed_aio_config_t* aio_config) { - assert(tid < _num_threads); + assert(tid < _intra_op_parallelism); check_cudaruntimecall(cudaSetDevice(_buffer.get_device())); int64_t buf_offset = data_ptr() + (_num_bytes_per_thread * tid) - (char*)_base_ptr; const auto file_offset = _num_bytes_per_thread * tid; diff --git a/csrc/gds/py_lib/deepspeed_gds_op.h b/csrc/gds/py_lib/deepspeed_gds_op.h index 70c8f7ced4f5..5fd9540c21c0 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.h +++ b/csrc/gds/py_lib/deepspeed_gds_op.h @@ -24,7 +24,7 @@ struct gds_op_desc_t : io_op_desc_t { const int fd, const char* filename, const long long int file_num_bytes, - const int num_threads, + const int intra_op_parallelism, const bool validate); void run(const int tid, diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 79d606b14b3f..b33014bb4a05 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -19,22 +19,22 @@ deepspeed_gds_handle_t::deepspeed_gds_handle_t(const int block_size, const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads) + const int intra_op_parallelism) : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, 1), - _num_gpu_threads(num_threads) + _intra_gds_op_parallelism(intra_op_parallelism) { _init_cuFile(block_size,queue_depth); } deepspeed_gds_handle_t::~deepspeed_gds_handle_t() { _close_cuFile(); } -const int deepspeed_gds_handle_t::get_thread_count() const { return _num_gpu_threads; } +const int deepspeed_gds_handle_t::get_thread_count() const { return _intra_gds_op_parallelism; } void deepspeed_gds_handle_t::_init_cuFile(const int block_size, const int queue_depth) { if (deepspeed_gds_handle_t::s_cuFile_init == 0) { std::string depthStr = std::to_string(queue_depth); - std::string threadsStr = std::to_string(_num_gpu_threads); + std::string threadsStr = std::to_string(_intra_gds_op_parallelism); std::string json1 = R"({"execution": {"max_io_queue_depth": )" + depthStr + ", "; std::string json2 = R"("max_request_parallelism": )" + threadsStr + ", "; std::string json3 = R"("max_io_threads": )" + threadsStr + ", "; @@ -108,7 +108,7 @@ std::shared_ptr deepspeed_gds_handle_t::_create_io_op_desc( { if (buffer.is_cuda()) { return std::make_shared( - read_op, buffer, false, fd, filename, file_num_bytes, _num_threads, validate); + read_op, buffer, false, fd, filename, file_num_bytes, _intra_op_parallelism, validate); } return deepspeed_io_handle_t::_create_io_op_desc( read_op, buffer, fd, filename, file_num_bytes, validate); diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index 3cf49a4db453..8e8e8df3d6fe 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -12,13 +12,13 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. #include "deepspeed_py_io_handle.h" struct deepspeed_gds_handle_t : deepspeed_io_handle_t { - const int _num_gpu_threads; + const int _intra_gds_op_parallelism; deepspeed_gds_handle_t(const int block_size, const int queue_depth, const bool single_submit, const bool overlap_events, - const int num_threads); + const int intra_op_parallelism); ~deepspeed_gds_handle_t(); diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index 66eb34d4ea8c..14e3eec3fbbb 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -20,7 +20,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) "queue_depth"_a = 128, "single_submit"_a = false, "overlap_events"_a = false, - "num_threads"_a = 1) + "intra_op_parallelism"_a = 1) .def("get_block_size", &deepspeed_gds_handle_t::get_block_size) .def("get_queue_depth", &deepspeed_gds_handle_t::get_queue_depth) From ada1b8303b839b70c4f14ca5155cd1b1ba7427a9 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 20:44:01 +0000 Subject: [PATCH 5/9] updating function names to match --- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 4 ++-- csrc/aio/py_lib/deepspeed_aio_thread.h | 4 ++-- csrc/aio/py_lib/deepspeed_py_aio_handle.h | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 12 ++++++------ csrc/aio/py_lib/deepspeed_py_io_handle.h | 4 ++-- csrc/aio/py_lib/py_ds_aio.cpp | 2 +- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 2 +- csrc/gds/py_lib/deepspeed_py_gds_handle.h | 2 +- csrc/gds/py_lib/py_ds_gds.cpp | 2 +- docs/_tutorials/deepnvme.md | 4 ++-- 10 files changed, 19 insertions(+), 19 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 30c3b4914397..25e1df809a85 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -28,7 +28,7 @@ void deepspeed_aio_thread_t::run() { std::unique_lock lock(_work_sync._mutex); - _work_sync._cond_var.wait(lock, + _work_sync._cond_var2.wait(lock, [this] { return (!_work_queue.empty() || _time_to_exit); }); if (!_work_queue.empty()) { next_io_op = _work_queue.front(); @@ -43,7 +43,7 @@ void deepspeed_aio_thread_t::run() std::lock_guard lock(_complete_sync._mutex); _complete_queue.push(next_io_op); } - _complete_sync._cond_var.notify_one(); + _complete_sync._cond_var2.notify_one(); } if (_time_to_exit) { break; } diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.h b/csrc/aio/py_lib/deepspeed_aio_thread.h index a192804db13d..ef12b8178fa7 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.h +++ b/csrc/aio/py_lib/deepspeed_aio_thread.h @@ -7,14 +7,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include #include "deepspeed_cpu_op.h" struct thread_sync_t { std::mutex _mutex; - std::condition_variable _cond_var; + std::condition_var2iable _cond_var2; }; struct deepspeed_aio_thread_t { diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.h b/csrc/aio/py_lib/deepspeed_py_aio_handle.h index 1398df9a56c9..5af6736afc0f 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.h @@ -7,7 +7,7 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include "deepspeed_py_io_handle.h" diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 1e896464d7c2..12243c3d902c 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -56,7 +56,7 @@ const bool deepspeed_io_handle_t::get_single_submit() const { return _single_sub const bool deepspeed_io_handle_t::get_overlap_events() const { return _overlap_events; } -const int deepspeed_io_handle_t::get_thread_count() const { return _intra_op_parallelism; } +const int deepspeed_io_handle_t::get_intra_op_parallelism() const { return _intra_op_parallelism; } int deepspeed_io_handle_t::read(torch::Tensor& buffer, const char* filename, const bool validate) { @@ -137,7 +137,7 @@ void deepspeed_io_handle_t::_schedule_aio_work(std::shared_ptr lock(ctxt->_work_sync._mutex); ctxt->_work_queue.push(scheduled_op); } - ctxt->_work_sync._cond_var.notify_one(); + ctxt->_work_sync._cond_var2.notify_one(); } _num_pending_ops++; } @@ -147,7 +147,7 @@ std::shared_ptr deepspeed_io_handle_t::_wait_for_aio_work() std::shared_ptr completed_op = nullptr; for (auto& ctxt : _thread_contexts) { std::unique_lock lock(ctxt->_complete_sync._mutex); - ctxt->_complete_sync._cond_var.wait(lock, + ctxt->_complete_sync._cond_var2.wait(lock, [ctxt] { return !ctxt->_complete_queue.empty(); }); completed_op = ctxt->_complete_queue.front(); ctxt->_complete_queue.pop(); @@ -163,7 +163,7 @@ void deepspeed_io_handle_t::_stop_threads() std::lock_guard lock(ctxt->_work_sync._mutex); ctxt->_time_to_exit = true; } - ctxt->_work_sync._cond_var.notify_one(); + ctxt->_work_sync._cond_var2.notify_one(); } } @@ -192,9 +192,9 @@ bool deepspeed_io_handle_t::_is_valid_parallel_aio_op(const bool read_op, const long long int num_bytes) { const auto op_string = read_op ? "Read" : "Write"; - if (num_bytes % get_thread_count()) { + if (num_bytes % get_intra_op_parallelism()) { std::cout << "deepspeed_aio failure: parallel " << op_string << " num_bytes = " << num_bytes - << " not divisible by thread count = " << get_thread_count() << std::endl; + << " not divisible by thread count = " << get_intra_op_parallelism() << std::endl; return false; } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 9140d468d4f9..3e3bed73b229 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -7,7 +7,7 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include "deepspeed_aio_thread.h" #include "deepspeed_pin_tensor.h" @@ -36,7 +36,7 @@ struct deepspeed_io_handle_t { const int get_queue_depth() const; const bool get_single_submit() const; const bool get_overlap_events() const; - const int get_thread_count() const; + const int get_intra_op_parallelism() const; int read(torch::Tensor& buffer, const char* filename, const bool validate); diff --git a/csrc/aio/py_lib/py_ds_aio.cpp b/csrc/aio/py_lib/py_ds_aio.cpp index 1b0aa17d413f..b80fa2d6c8e6 100644 --- a/csrc/aio/py_lib/py_ds_aio.cpp +++ b/csrc/aio/py_lib/py_ds_aio.cpp @@ -33,7 +33,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) .def("get_queue_depth", &deepspeed_aio_handle_t::get_queue_depth) .def("get_single_submit", &deepspeed_aio_handle_t::get_single_submit) .def("get_overlap_events", &deepspeed_aio_handle_t::get_overlap_events) - .def("get_thread_count", &deepspeed_aio_handle_t::get_thread_count) + .def("get_intra_op_parallelism", &deepspeed_aio_handle_t::get_intra_op_parallelism) .def("read", &deepspeed_aio_handle_t::read, diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index b33014bb4a05..02bd5c990661 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -28,7 +28,7 @@ deepspeed_gds_handle_t::deepspeed_gds_handle_t(const int block_size, deepspeed_gds_handle_t::~deepspeed_gds_handle_t() { _close_cuFile(); } -const int deepspeed_gds_handle_t::get_thread_count() const { return _intra_gds_op_parallelism; } +const int deepspeed_gds_handle_t::get_intra_op_parallelism() const { return _intra_gds_op_parallelism; } void deepspeed_gds_handle_t::_init_cuFile(const int block_size, const int queue_depth) { diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.h b/csrc/gds/py_lib/deepspeed_py_gds_handle.h index 8e8e8df3d6fe..a3c10a4f6467 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.h +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.h @@ -35,7 +35,7 @@ struct deepspeed_gds_handle_t : deepspeed_io_handle_t { void _close_cuFile(); - const int get_thread_count() const; + const int get_intra_op_parallelism() const; std::shared_ptr _create_io_op_desc(const bool read_op, const torch::Tensor& buffer, diff --git a/csrc/gds/py_lib/py_ds_gds.cpp b/csrc/gds/py_lib/py_ds_gds.cpp index 14e3eec3fbbb..57bf8d2207c4 100644 --- a/csrc/gds/py_lib/py_ds_gds.cpp +++ b/csrc/gds/py_lib/py_ds_gds.cpp @@ -26,7 +26,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) .def("get_queue_depth", &deepspeed_gds_handle_t::get_queue_depth) .def("get_single_submit", &deepspeed_gds_handle_t::get_single_submit) .def("get_overlap_events", &deepspeed_gds_handle_t::get_overlap_events) - .def("get_thread_count", &deepspeed_gds_handle_t::get_thread_count) + .def("get_intra_op_parallelism", &deepspeed_gds_handle_t::get_intra_op_parallelism) .def("read", &deepspeed_gds_handle_t::read, diff --git a/docs/_tutorials/deepnvme.md b/docs/_tutorials/deepnvme.md index 70c6ac097963..f31621999a59 100644 --- a/docs/_tutorials/deepnvme.md +++ b/docs/_tutorials/deepnvme.md @@ -50,7 +50,7 @@ Type "help", "copyright", "credits" or "license" for more information. >>> h = AsyncIOBuilder().load().aio_handle() >>> h. h.async_pread( h.free_cpu_locked_tensor( h.get_overlap_events( h.get_single_submit( h.new_cpu_locked_tensor( h.pwrite( h.sync_pread( h.wait( -h.async_pwrite( h.get_block_size( h.get_queue_depth( h.get_thread_count( h.pread( h.read( h.sync_pwrite( h.write( +h.async_pwrite( h.get_block_size( h.get_queue_depth( h.get_intra_op_parallelism( h.pread( h.read( h.sync_pwrite( h.write( ``` The APIs of interest for performing I/O operations are those named with `pread` and `pwrite` substrings. For brevity, we will focus on the file write APIs, namely `sync_pwrite`, `async_pwrite`, and `pwrite`. We will discuss only `sync_pwrite` and `async_pwrite` below because they are specializations of `pwrite`. @@ -292,6 +292,6 @@ Function | Description |---|---| get_queue_depth | Return queue depth setting | get_single_submit | Return whether single_submit is enabled | -get_thread_count | Return I/O parallelism degree | +get_intra_op_parallelism | Return I/O parallelism degree | get_block_size | Return I/O block size setting | get_overlap_events | Return whether overlap_event is enabled | From 1cb88ce5e4b47ec7cde22ab982730527857b82d5 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 20:45:09 +0000 Subject: [PATCH 6/9] fix formatting --- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 2 +- csrc/aio/py_lib/deepspeed_cpu_op.cpp | 9 ++++++++- csrc/aio/py_lib/deepspeed_pin_tensor.cpp | 9 ++++----- csrc/aio/py_lib/deepspeed_pin_tensor.h | 2 +- csrc/aio/py_lib/deepspeed_py_aio_handle.cpp | 6 +++++- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.h | 3 ++- csrc/gds/py_lib/deepspeed_gds_op.cpp | 9 ++++++++- csrc/gds/py_lib/deepspeed_py_gds_handle.cpp | 9 ++++++--- tests/unit/ops/aio/test_aio.py | 2 +- 10 files changed, 37 insertions(+), 16 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 25e1df809a85..8c51087a0b5d 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -29,7 +29,7 @@ void deepspeed_aio_thread_t::run() { std::unique_lock lock(_work_sync._mutex); _work_sync._cond_var2.wait(lock, - [this] { return (!_work_queue.empty() || _time_to_exit); }); + [this] { return (!_work_queue.empty() || _time_to_exit); }); if (!_work_queue.empty()) { next_io_op = _work_queue.front(); _work_queue.pop(); diff --git a/csrc/aio/py_lib/deepspeed_cpu_op.cpp b/csrc/aio/py_lib/deepspeed_cpu_op.cpp index c03b6aae780a..170d6da75987 100644 --- a/csrc/aio/py_lib/deepspeed_cpu_op.cpp +++ b/csrc/aio/py_lib/deepspeed_cpu_op.cpp @@ -15,7 +15,14 @@ cpu_op_desc_t::cpu_op_desc_t(const bool read_op, const long long int file_num_bytes, const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer, is_managed, fd, filename, file_num_bytes, intra_op_parallelism, validate), + : io_op_desc_t(read_op, + buffer, + is_managed, + fd, + filename, + file_num_bytes, + intra_op_parallelism, + validate), _cpu_buffer(buffer) { // Need to use CPU bounce buffer if buffer is not a page-locked DRAM memory. diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.cpp b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp index f57b4394fc99..8d337c14d34c 100644 --- a/csrc/aio/py_lib/deepspeed_pin_tensor.cpp +++ b/csrc/aio/py_lib/deepspeed_pin_tensor.cpp @@ -19,7 +19,8 @@ deepspeed_pin_tensor_t::~deepspeed_pin_tensor_t() _locked_tensors.clear(); } -torch::Tensor deepspeed_pin_tensor_t::alloc(const long long int num_elem, const at::ScalarType& elem_type) +torch::Tensor deepspeed_pin_tensor_t::alloc(const long long int num_elem, + const at::ScalarType& elem_type) { const auto num_bytes = num_elem * elementSize(elem_type); auto pinned_buffer = ds_page_aligned_alloc(num_bytes, true); @@ -47,9 +48,7 @@ bool deepspeed_pin_tensor_t::free(torch::Tensor& locked_tensor) bool deepspeed_pin_tensor_t::is_managed(const torch::Tensor& buffer) { auto addr = buffer.data_ptr(); - if (!buffer.is_cpu()){ return false;} - if (_locked_tensors.find(addr) != _locked_tensors.end()) { - return true; - } + if (!buffer.is_cpu()) { return false; } + if (_locked_tensors.find(addr) != _locked_tensors.end()) { return true; } return false; }; diff --git a/csrc/aio/py_lib/deepspeed_pin_tensor.h b/csrc/aio/py_lib/deepspeed_pin_tensor.h index 195696a05833..95d2f1f92e10 100644 --- a/csrc/aio/py_lib/deepspeed_pin_tensor.h +++ b/csrc/aio/py_lib/deepspeed_pin_tensor.h @@ -24,6 +24,6 @@ struct deepspeed_pin_tensor_t { torch::Tensor alloc(const long long num_elem, const at::ScalarType& elem_type); bool free(torch::Tensor& locked_tensor); - + bool is_managed(const torch::Tensor& buffer); }; diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp index aed87d0c694d..2b1093e99286 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.cpp @@ -17,7 +17,11 @@ deepspeed_aio_handle_t::deepspeed_aio_handle_t(const int block_size, const bool single_submit, const bool overlap_events, const int intra_op_parallelism) - : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, intra_op_parallelism) + : deepspeed_io_handle_t(block_size, + queue_depth, + single_submit, + overlap_events, + intra_op_parallelism) { } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 12243c3d902c..31083e36ed8c 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -148,7 +148,7 @@ std::shared_ptr deepspeed_io_handle_t::_wait_for_aio_work() for (auto& ctxt : _thread_contexts) { std::unique_lock lock(ctxt->_complete_sync._mutex); ctxt->_complete_sync._cond_var2.wait(lock, - [ctxt] { return !ctxt->_complete_queue.empty(); }); + [ctxt] { return !ctxt->_complete_queue.empty(); }); completed_op = ctxt->_complete_queue.front(); ctxt->_complete_queue.pop(); } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 3e3bed73b229..3016da59d86d 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -61,7 +61,8 @@ struct deepspeed_io_handle_t { int async_pwrite(const torch::Tensor& buffer, const char* filename); // TODO: Make API's args to be shape and dtype. - torch::Tensor new_cpu_locked_tensor(const long long int num_elem, const torch::Tensor& example_tensor); + torch::Tensor new_cpu_locked_tensor(const long long int num_elem, + const torch::Tensor& example_tensor); bool free_cpu_locked_tensor(torch::Tensor&); diff --git a/csrc/gds/py_lib/deepspeed_gds_op.cpp b/csrc/gds/py_lib/deepspeed_gds_op.cpp index ed0d6372a274..c44b4655a9a9 100644 --- a/csrc/gds/py_lib/deepspeed_gds_op.cpp +++ b/csrc/gds/py_lib/deepspeed_gds_op.cpp @@ -98,7 +98,14 @@ gds_op_desc_t::gds_op_desc_t(const bool read_op, const long long int file_num_bytes, const int intra_op_parallelism, const bool validate) - : io_op_desc_t(read_op, buffer,is_managed, fd, filename, file_num_bytes, intra_op_parallelism, validate) + : io_op_desc_t(read_op, + buffer, + is_managed, + fd, + filename, + file_num_bytes, + intra_op_parallelism, + validate) { _contiguous_buffer = _buffer.contiguous(); const int64_t device = _buffer.get_device(); diff --git a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp index 02bd5c990661..d093187597d4 100644 --- a/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp +++ b/csrc/gds/py_lib/deepspeed_py_gds_handle.cpp @@ -21,14 +21,17 @@ deepspeed_gds_handle_t::deepspeed_gds_handle_t(const int block_size, const bool overlap_events, const int intra_op_parallelism) : deepspeed_io_handle_t(block_size, queue_depth, single_submit, overlap_events, 1), - _intra_gds_op_parallelism(intra_op_parallelism) + _intra_gds_op_parallelism(intra_op_parallelism) { - _init_cuFile(block_size,queue_depth); + _init_cuFile(block_size, queue_depth); } deepspeed_gds_handle_t::~deepspeed_gds_handle_t() { _close_cuFile(); } -const int deepspeed_gds_handle_t::get_intra_op_parallelism() const { return _intra_gds_op_parallelism; } +const int deepspeed_gds_handle_t::get_intra_op_parallelism() const +{ + return _intra_gds_op_parallelism; +} void deepspeed_gds_handle_t::_init_cuFile(const int block_size, const int queue_depth) { diff --git a/tests/unit/ops/aio/test_aio.py b/tests/unit/ops/aio/test_aio.py index 9d4b12a6daf7..f6d175ce67bc 100644 --- a/tests/unit/ops/aio/test_aio.py +++ b/tests/unit/ops/aio/test_aio.py @@ -144,7 +144,7 @@ def test_async_read(self, tmpdir, use_cuda_pinned_tensor, single_submit, overlap h.free_cpu_locked_tensor(aio_buffer) -@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) +@pytest.mark.parametrize("use_cuda_pinned_tensor", [True, False]) @pytest.mark.parametrize("single_submit", [True, False]) @pytest.mark.parametrize("overlap_events", [True, False]) class TestWrite(DistributedTest): From f5528daf5147b2dcb56e77a38e08d6068c7b8d2f Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 21:53:47 +0000 Subject: [PATCH 7/9] variable name change to fix compilation --- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 4 ++-- csrc/aio/py_lib/deepspeed_aio_thread.h | 4 ++-- csrc/aio/py_lib/deepspeed_py_aio_handle.h | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 6 +++--- csrc/aio/py_lib/deepspeed_py_io_handle.h | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 8c51087a0b5d..3b7b048f16bd 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -28,7 +28,7 @@ void deepspeed_aio_thread_t::run() { std::unique_lock lock(_work_sync._mutex); - _work_sync._cond_var2.wait(lock, + _work_sync._cond_var.wait(lock, [this] { return (!_work_queue.empty() || _time_to_exit); }); if (!_work_queue.empty()) { next_io_op = _work_queue.front(); @@ -43,7 +43,7 @@ void deepspeed_aio_thread_t::run() std::lock_guard lock(_complete_sync._mutex); _complete_queue.push(next_io_op); } - _complete_sync._cond_var2.notify_one(); + _complete_sync._cond_var.notify_one(); } if (_time_to_exit) { break; } diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.h b/csrc/aio/py_lib/deepspeed_aio_thread.h index ef12b8178fa7..a192804db13d 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.h +++ b/csrc/aio/py_lib/deepspeed_aio_thread.h @@ -7,14 +7,14 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include #include "deepspeed_cpu_op.h" struct thread_sync_t { std::mutex _mutex; - std::condition_var2iable _cond_var2; + std::condition_variable _cond_var; }; struct deepspeed_aio_thread_t { diff --git a/csrc/aio/py_lib/deepspeed_py_aio_handle.h b/csrc/aio/py_lib/deepspeed_py_aio_handle.h index 5af6736afc0f..1398df9a56c9 100644 --- a/csrc/aio/py_lib/deepspeed_py_aio_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_aio_handle.h @@ -7,7 +7,7 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include "deepspeed_py_io_handle.h" diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 31083e36ed8c..8bd7966ba086 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -137,7 +137,7 @@ void deepspeed_io_handle_t::_schedule_aio_work(std::shared_ptr lock(ctxt->_work_sync._mutex); ctxt->_work_queue.push(scheduled_op); } - ctxt->_work_sync._cond_var2.notify_one(); + ctxt->_work_sync._cond_var.notify_one(); } _num_pending_ops++; } @@ -147,7 +147,7 @@ std::shared_ptr deepspeed_io_handle_t::_wait_for_aio_work() std::shared_ptr completed_op = nullptr; for (auto& ctxt : _thread_contexts) { std::unique_lock lock(ctxt->_complete_sync._mutex); - ctxt->_complete_sync._cond_var2.wait(lock, + ctxt->_complete_sync._cond_var.wait(lock, [ctxt] { return !ctxt->_complete_queue.empty(); }); completed_op = ctxt->_complete_queue.front(); ctxt->_complete_queue.pop(); @@ -163,7 +163,7 @@ void deepspeed_io_handle_t::_stop_threads() std::lock_guard lock(ctxt->_work_sync._mutex); ctxt->_time_to_exit = true; } - ctxt->_work_sync._cond_var2.notify_one(); + ctxt->_work_sync._cond_var.notify_one(); } } diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.h b/csrc/aio/py_lib/deepspeed_py_io_handle.h index 3016da59d86d..8e649be1e4c2 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.h +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.h @@ -7,7 +7,7 @@ Functionality for swapping optimizer tensors to/from (NVMe) storage devices. */ -#include +#include #include #include "deepspeed_aio_thread.h" #include "deepspeed_pin_tensor.h" From f576d291a7675df1b85e4201b58ea1013b38938c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Wed, 2 Oct 2024 22:02:10 +0000 Subject: [PATCH 8/9] formatting --- csrc/aio/py_lib/deepspeed_aio_thread.cpp | 2 +- csrc/aio/py_lib/deepspeed_py_io_handle.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/csrc/aio/py_lib/deepspeed_aio_thread.cpp b/csrc/aio/py_lib/deepspeed_aio_thread.cpp index 3b7b048f16bd..30c3b4914397 100644 --- a/csrc/aio/py_lib/deepspeed_aio_thread.cpp +++ b/csrc/aio/py_lib/deepspeed_aio_thread.cpp @@ -29,7 +29,7 @@ void deepspeed_aio_thread_t::run() { std::unique_lock lock(_work_sync._mutex); _work_sync._cond_var.wait(lock, - [this] { return (!_work_queue.empty() || _time_to_exit); }); + [this] { return (!_work_queue.empty() || _time_to_exit); }); if (!_work_queue.empty()) { next_io_op = _work_queue.front(); _work_queue.pop(); diff --git a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp index 8bd7966ba086..596c2427feed 100644 --- a/csrc/aio/py_lib/deepspeed_py_io_handle.cpp +++ b/csrc/aio/py_lib/deepspeed_py_io_handle.cpp @@ -148,7 +148,7 @@ std::shared_ptr deepspeed_io_handle_t::_wait_for_aio_work() for (auto& ctxt : _thread_contexts) { std::unique_lock lock(ctxt->_complete_sync._mutex); ctxt->_complete_sync._cond_var.wait(lock, - [ctxt] { return !ctxt->_complete_queue.empty(); }); + [ctxt] { return !ctxt->_complete_queue.empty(); }); completed_op = ctxt->_complete_queue.front(); ctxt->_complete_queue.pop(); } From 5a47bf3a5a932ba9fac38401ae374d456c5c479c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Thu, 3 Oct 2024 16:03:41 +0000 Subject: [PATCH 9/9] update references in tutorial --- docs/_tutorials/deepnvme.md | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/docs/_tutorials/deepnvme.md b/docs/_tutorials/deepnvme.md index f31621999a59..4ed528412eae 100644 --- a/docs/_tutorials/deepnvme.md +++ b/docs/_tutorials/deepnvme.md @@ -107,7 +107,7 @@ Similar safety problems apply to reading the destination tensor of a non-blockin ### Parallel File Write -An important DeepNVMe optimization is the ability to parallelize individual I/O operations. This optimization is enabled by specifying the desired parallelism degree when constructing a DeepNVMe handle. Subsequent I/O operations with that handle are automatically parallelized over the requested number of host or device threads, as appropriate. I/O parallelism is composable with either the blocking or non-blocking I/O APIs. The example below illustrates 4-way parallelism of a file write using `async_pwrite`. Note the use of `num_threads` argument to specify the desired parallelism degree in handle creation. +An important DeepNVMe optimization is the ability to parallelize individual I/O operations. This optimization is enabled by specifying the desired parallelism degree when constructing a DeepNVMe handle. Subsequent I/O operations with that handle are automatically parallelized over the requested number of host or device threads, as appropriate. I/O parallelism is composable with either the blocking or non-blocking I/O APIs. The example below illustrates 4-way parallelism of a file write using `async_pwrite`. Note the use of `intra_op_parallelism` argument to specify the desired parallelism degree in handle creation. ```bash >>> import os @@ -116,7 +116,7 @@ False >>> import torch >>> t=torch.empty(1024**3, dtype=torch.uint8).cuda() >>> from deepspeed.ops.op_builder import AsyncIOBuilder ->>> h = AsyncIOBuilder().load().aio_handle(num_threads=4) +>>> h = AsyncIOBuilder().load().aio_handle(intra_op_parallelism=4) >>> h.async_pwrite(t,'/local_nvme/test_1GB.pt') >>> h.wait() 1 @@ -188,7 +188,7 @@ This tutorial has been significantly improved by feedback from [Guanhua Wang](ht ## Appendix ### Advanced Handle Creation -Achieving peak I/O performance with DeepNVMe requires careful configuration of handle creation. In particular, the parameters of `aio_handle` and `gds_handle` constructors are performance-critical because they determine how efficiently DeepNVMe interacts with the underlying storage subsystem (i.e., `libaio`, GDS, PCIe, and SSD). For convenience we make it possible to create handles using default parameter values which will provide decent performance in most scenarios. However, squeezing out every available performance in your environment will likely require tuning the constructor parameters, namely `block_size`, `queue_depth`, `single_submit`, `overlap_events`, and `num_threads`. The `aio_handle` constructor parameters and default values are illustrated below: +Achieving peak I/O performance with DeepNVMe requires careful configuration of handle creation. In particular, the parameters of `aio_handle` and `gds_handle` constructors are performance-critical because they determine how efficiently DeepNVMe interacts with the underlying storage subsystem (i.e., `libaio`, GDS, PCIe, and SSD). For convenience we make it possible to create handles using default parameter values which will provide decent performance in most scenarios. However, squeezing out every available performance in your environment will likely require tuning the constructor parameters, namely `block_size`, `queue_depth`, `single_submit`, `overlap_events`, and `intra_op_parallelism`. The `aio_handle` constructor parameters and default values are illustrated below: ```bash >>> from deepspeed.ops.op_builder import AsyncIOBuilder >>> help(AsyncIOBuilder().load().aio_handle()) @@ -203,7 +203,7 @@ class aio_handle(pybind11_builtins.pybind11_object) | Methods defined here: | | __init__(...) - | __init__(self: async_io.aio_handle, block_size: int = 1048576, queue_depth: int = 128, single_submit: bool = False, overlap_events: bool = False, num_threads: int = 1) -> None + | __init__(self: async_io.aio_handle, block_size: int = 1048576, queue_depth: int = 128, single_submit: bool = False, overlap_events: bool = False, intra_op_parallelism: int = 1) -> None | | AIO handle constructor ``` @@ -219,7 +219,7 @@ Best performance (GB/sec): read = 3.69, write = 3.18 "aio": { "single_submit": "false", "overlap_events": "true", - "num_threads": 8, + "intra_op_parallelism": 8, "queue_depth": 32, "block_size": 1048576 } @@ -233,7 +233,7 @@ The above tuning was executed on a Lambda workstation equipped with two NVIDIA A queue_depth=32, single_submit=False, overlap_events=True, - num_threads=8) + intra_op_parallelism=8) ```