Skip to content

Commit

Permalink
fix diskann async cache generation
Browse files Browse the repository at this point in the history
Signed-off-by: xianliang <[email protected]>
  • Loading branch information
foxspy committed Feb 26, 2024
1 parent 7349e42 commit 39b7104
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 25 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ list( APPEND CMAKE_MODULE_PATH ${CMAKE_BINARY_DIR}/)

add_definitions(-DNOT_COMPILE_FOR_SWIG)

include(cmake/utils/compile_flags.cmake)
include(cmake/utils/platform_check.cmake)
include(cmake/utils/compile_flags.cmake)
include(cmake/libs/libfaiss.cmake)
include(cmake/libs/libhnsw.cmake)

Expand Down
4 changes: 4 additions & 0 deletions cmake/utils/compile_flags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ endif()

set(CMAKE_CXX_FLAGS "-Wall -fPIC ${CMAKE_CXX_FLAGS}")

if(__X86_64)
set(CMAKE_CXX_FLAGS "-msse4.2 ${CMAKE_CXX_FLAGS}")
endif()

set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g")
set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG")

Expand Down
51 changes: 37 additions & 14 deletions include/knowhere/comp/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,16 @@
#include <utility>

#include "folly/executors/CPUThreadPoolExecutor.h"
#include "folly/executors/task_queue/UnboundedBlockingQueue.h"
#include "folly/futures/Future.h"
#include "knowhere/expected.h"
#include "knowhere/log.h"

namespace knowhere {

class ThreadPool {
public:
enum class QueueType { LIFO, FIFO };
#ifdef __linux__
private:
class LowPriorityThreadFactory : public folly::NamedThreadFactory {
Expand All @@ -55,23 +58,33 @@ class ThreadPool {
};

public:
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix)
: pool_(folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<
folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask, folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))) {
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO)
: pool_(queueT == QueueType::LIFO
? folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask,
folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))
: folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask>>(),
std::make_shared<LowPriorityThreadFactory>(thread_name_prefix))) {
}
#else
public:
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix)
: pool_(folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<
folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask, folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))) {
explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO)
: pool_(queueT == QueueType::LIFO
? folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::LifoSemMPMCQueue<folly::CPUThreadPoolExecutor::CPUTask,
folly::QueueBehaviorIfFull::BLOCK>>(
num_threads * kTaskQueueFactor),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))
: folly::CPUThreadPoolExecutor(
num_threads,
std::make_unique<folly::UnboundedBlockingQueue<folly::CPUThreadPoolExecutor::CPUTask>>(),
std::make_shared<folly::NamedThreadFactory>(thread_name_prefix))) {
}
#endif

Expand Down Expand Up @@ -110,6 +123,16 @@ class ThreadPool {
}
}

static ThreadPool
CreateFIFO(uint32_t num_threads, const std::string& thread_name_prefix) {
return ThreadPool(num_threads, thread_name_prefix, QueueType::FIFO);
}

static ThreadPool
CreateLIFO(uint32_t num_threads, const std::string& thread_name_prefix) {
return ThreadPool(num_threads, thread_name_prefix, QueueType::LIFO);
}

static void
InitGlobalBuildThreadPool(uint32_t num_threads) {
if (num_threads <= 0) {
Expand Down
35 changes: 25 additions & 10 deletions thirdparty/DiskANN/src/pq_flash_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@
((((_u64) (id)) % nvecs_per_sector) * data_dim * sizeof(float))

namespace {
static auto async_pool =
knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making");
static auto async_pool = knowhere::ThreadPool::CreateFIFO(1, "DiskANN_Async_Cache_Making");

constexpr _u64 kRefineBeamWidthFactor = 2;
constexpr _u64 kBruteForceTopkRefineExpansionFactor = 2;
Expand Down Expand Up @@ -181,19 +180,21 @@ namespace diskann {
_u64 num_cached_nodes = node_list.size();
LOG_KNOWHERE_DEBUG_ << "Loading the cache list(" << num_cached_nodes
<< " points) into memory...";
assert(this->nhood_cache_buf == nullptr && "nhoodc_cache_buf is not null");
assert(this->coord_cache_buf == nullptr && "coord_cache_buf is not null");

auto ctx = this->reader->get_ctx();

nhood_cache_buf =
std::make_unique<unsigned[]>(num_cached_nodes * (max_degree + 1));
memset(nhood_cache_buf.get(), 0, num_cached_nodes * (max_degree + 1));
if (nhood_cache_buf == nullptr) {
nhood_cache_buf =
std::make_unique<unsigned[]>(num_cached_nodes * (max_degree + 1));
memset(nhood_cache_buf.get(), 0, num_cached_nodes * (max_degree + 1) * sizeof(unsigned));
}

_u64 coord_cache_buf_len = num_cached_nodes * aligned_dim;
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
if (coord_cache_buf == nullptr) {
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
}

size_t BLOCK_SIZE = 32;
size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, BLOCK_SIZE);
Expand Down Expand Up @@ -260,6 +261,20 @@ namespace diskann {
}
this->count_visited_nodes.store(true);

// sync allocate memory
if (nhood_cache_buf == nullptr) {
nhood_cache_buf =
std::make_unique<unsigned[]>(num_nodes_to_cache * (max_degree + 1));
memset(nhood_cache_buf.get(), 0, num_nodes_to_cache * (max_degree + 1) * sizeof(unsigned));
}

_u64 coord_cache_buf_len = num_nodes_to_cache * aligned_dim;
if (coord_cache_buf == nullptr) {
diskann::alloc_aligned((void **) &coord_cache_buf,
coord_cache_buf_len * sizeof(T), 8 * sizeof(T));
memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T));
}

async_pool.push([&, state_controller = this->state_controller, sample_bin,
l_search, beamwidth, num_nodes_to_cache]() {
{
Expand Down

0 comments on commit 39b7104

Please sign in to comment.