diff --git a/CMakeLists.txt b/CMakeLists.txt index 8c2f574e7..2fe845a27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -88,8 +88,8 @@ list( APPEND CMAKE_MODULE_PATH ${CMAKE_BINARY_DIR}/) add_definitions(-DNOT_COMPILE_FOR_SWIG) -include(cmake/utils/compile_flags.cmake) include(cmake/utils/platform_check.cmake) +include(cmake/utils/compile_flags.cmake) include(cmake/libs/libfaiss.cmake) include(cmake/libs/libhnsw.cmake) diff --git a/cmake/utils/compile_flags.cmake b/cmake/utils/compile_flags.cmake index 26c0d40f8..1cf21303b 100644 --- a/cmake/utils/compile_flags.cmake +++ b/cmake/utils/compile_flags.cmake @@ -17,6 +17,10 @@ endif() set(CMAKE_CXX_FLAGS "-Wall -fPIC ${CMAKE_CXX_FLAGS}") +if(__X86_64) + set(CMAKE_CXX_FLAGS "-msse4.2 ${CMAKE_CXX_FLAGS}") +endif() + set(CMAKE_CXX_FLAGS_DEBUG "-O0 -g") set(CMAKE_CXX_FLAGS_RELEASE "-O3 -DNDEBUG") diff --git a/include/knowhere/comp/thread_pool.h b/include/knowhere/comp/thread_pool.h index a98403010..5567ae7a5 100644 --- a/include/knowhere/comp/thread_pool.h +++ b/include/knowhere/comp/thread_pool.h @@ -28,6 +28,7 @@ #include #include "folly/executors/CPUThreadPoolExecutor.h" +#include "folly/executors/task_queue/UnboundedBlockingQueue.h" #include "folly/futures/Future.h" #include "knowhere/expected.h" #include "knowhere/log.h" @@ -35,6 +36,8 @@ namespace knowhere { class ThreadPool { + public: + enum class QueueType { LIFO, FIFO }; #ifdef __linux__ private: class LowPriorityThreadFactory : public folly::NamedThreadFactory { @@ -55,23 +58,33 @@ class ThreadPool { }; public: - explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix) - : pool_(folly::CPUThreadPoolExecutor( - num_threads, - std::make_unique< - folly::LifoSemMPMCQueue>( - num_threads * kTaskQueueFactor), - std::make_shared(thread_name_prefix))) { + explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO) + : pool_(queueT == QueueType::LIFO + ? folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>( + num_threads * kTaskQueueFactor), + std::make_shared(thread_name_prefix)) + : folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>(), + std::make_shared(thread_name_prefix))) { } #else public: - explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix) - : pool_(folly::CPUThreadPoolExecutor( - num_threads, - std::make_unique< - folly::LifoSemMPMCQueue>( - num_threads * kTaskQueueFactor), - std::make_shared(thread_name_prefix))) { + explicit ThreadPool(uint32_t num_threads, const std::string& thread_name_prefix, QueueType queueT = QueueType::LIFO) + : pool_(queueT == QueueType::LIFO + ? folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>( + num_threads * kTaskQueueFactor), + std::make_shared(thread_name_prefix)) + : folly::CPUThreadPoolExecutor( + num_threads, + std::make_unique>(), + std::make_shared(thread_name_prefix))) { } #endif @@ -110,6 +123,16 @@ class ThreadPool { } } + static ThreadPool + CreateFIFO(uint32_t num_threads, const std::string& thread_name_prefix) { + return ThreadPool(num_threads, thread_name_prefix, QueueType::FIFO); + } + + static ThreadPool + CreateLIFO(uint32_t num_threads, const std::string& thread_name_prefix) { + return ThreadPool(num_threads, thread_name_prefix, QueueType::LIFO); + } + static void InitGlobalBuildThreadPool(uint32_t num_threads) { if (num_threads <= 0) { diff --git a/thirdparty/DiskANN/src/pq_flash_index.cpp b/thirdparty/DiskANN/src/pq_flash_index.cpp index b0efa20b7..a6a8d0b35 100644 --- a/thirdparty/DiskANN/src/pq_flash_index.cpp +++ b/thirdparty/DiskANN/src/pq_flash_index.cpp @@ -48,8 +48,7 @@ ((((_u64) (id)) % nvecs_per_sector) * data_dim * sizeof(float)) namespace { - static auto async_pool = - knowhere::ThreadPool(1, "DiskANN_Async_Cache_Making"); + static auto async_pool = knowhere::ThreadPool::CreateFIFO(1, "DiskANN_Async_Cache_Making"); constexpr _u64 kRefineBeamWidthFactor = 2; constexpr _u64 kBruteForceTopkRefineExpansionFactor = 2; @@ -181,19 +180,21 @@ namespace diskann { _u64 num_cached_nodes = node_list.size(); LOG_KNOWHERE_DEBUG_ << "Loading the cache list(" << num_cached_nodes << " points) into memory..."; - assert(this->nhood_cache_buf == nullptr && "nhoodc_cache_buf is not null"); - assert(this->coord_cache_buf == nullptr && "coord_cache_buf is not null"); auto ctx = this->reader->get_ctx(); - nhood_cache_buf = - std::make_unique(num_cached_nodes * (max_degree + 1)); - memset(nhood_cache_buf.get(), 0, num_cached_nodes * (max_degree + 1)); + if (nhood_cache_buf == nullptr) { + nhood_cache_buf = + std::make_unique(num_cached_nodes * (max_degree + 1)); + memset(nhood_cache_buf.get(), 0, num_cached_nodes * (max_degree + 1) * sizeof(unsigned)); + } _u64 coord_cache_buf_len = num_cached_nodes * aligned_dim; - diskann::alloc_aligned((void **) &coord_cache_buf, - coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); - memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + if (coord_cache_buf == nullptr) { + diskann::alloc_aligned((void **) &coord_cache_buf, + coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); + memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + } size_t BLOCK_SIZE = 32; size_t num_blocks = DIV_ROUND_UP(num_cached_nodes, BLOCK_SIZE); @@ -260,6 +261,19 @@ namespace diskann { } this->count_visited_nodes.store(true); + // sync allocate memory + if (nhood_cache_buf == nullptr) { + nhood_cache_buf = new unsigned[num_nodes_to_cache * (max_degree + 1)]; + memset(nhood_cache_buf, 0, num_nodes_to_cache * (max_degree + 1) * sizeof(unsigned)); + } + + _u64 coord_cache_buf_len = num_nodes_to_cache * aligned_dim; + if (coord_cache_buf == nullptr) { + diskann::alloc_aligned((void **) &coord_cache_buf, + coord_cache_buf_len * sizeof(T), 8 * sizeof(T)); + memset(coord_cache_buf, 0, coord_cache_buf_len * sizeof(T)); + } + async_pool.push([&, state_controller = this->state_controller, sample_bin, l_search, beamwidth, num_nodes_to_cache]() { {