From e9574ebbbfff7efe3768496462ae0cacf84b43bd Mon Sep 17 00:00:00 2001 From: Buqian Zheng Date: Fri, 26 Jan 2024 04:02:54 +0800 Subject: [PATCH] wrap IVF index train/build calls in lambdas passed to knowhere thread pool, so OMP threads spawned will have low nice values (#359) Signed-off-by: Buqian Zheng --- src/index/ivf/ivf.cc | 321 +++++++++++++++++++++++-------------------- 1 file changed, 171 insertions(+), 150 deletions(-) diff --git a/src/index/ivf/ivf.cc b/src/index/ivf/ivf.cc index 2c95761d2..64e6e5bb6 100644 --- a/src/index/ivf/ivf.cc +++ b/src/index/ivf/ivf.cc @@ -61,6 +61,7 @@ class IvfIndexNode : public IndexNode { static_assert(std::is_same_v || std::is_same_v, "IvfIndexNode only support float/bianry"); search_pool_ = ThreadPool::GetGlobalSearchThreadPool(); + build_pool_ = ThreadPool::GetGlobalBuildThreadPool(); } Status Train(const DataSet& dataset, const Config& cfg) override; @@ -224,9 +225,17 @@ class IvfIndexNode : public IndexNode { Status SerializeImpl(BinarySet& binset, IVFFlatTag) const; + Status + TrainInternal(const DataSet& dataset, const Config& cfg); + private: std::unique_ptr index_; std::shared_ptr search_pool_; + // Faiss uses OpenMP for training/building the index and we have no control + // over those threads. build_pool_ is used to make sure the OMP threads + // spawded during index training/building can inherit the low nice value of + // threads in build_pool_. + std::shared_ptr build_pool_; }; } // namespace knowhere @@ -279,6 +288,19 @@ to_index_flat(std::unique_ptr&& index) { template Status IvfIndexNode::Train(const DataSet& dataset, const Config& cfg) { + // use build_pool_ to make sure the OMP threads spawded by index_->train etc + // can inherit the low nice value of threads in build_pool_. + auto tryObj = build_pool_->push([&] { return TrainInternal(dataset, cfg); }).getTry(); + if (tryObj.hasValue()) { + return tryObj.value(); + } + LOG_KNOWHERE_WARNING_ << "faiss internal error: " << tryObj.exception().what(); + return Status::faiss_inner_error; +} + +template +Status +IvfIndexNode::TrainInternal(const DataSet& dataset, const Config& cfg) { const BaseConfig& base_cfg = static_cast(cfg); std::unique_ptr setter; if (base_cfg.num_build_thread.has_value()) { @@ -319,143 +341,137 @@ IvfIndexNode::Train(const DataSet& dataset, const Config& c std::unique_ptr index; // if cfg.use_elkan is used, then we'll use a temporary instance of // IndexFlatElkan for the training. - try { - if constexpr (std::is_same::value) { - const IvfFlatConfig& ivf_flat_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, ivf_flat_cfg.nlist.value()); - - const bool use_elkan = ivf_flat_cfg.use_elkan.value_or(true); - - // create quantizer for the training - std::unique_ptr qzr = - std::make_unique(dim, metric.value(), false, use_elkan); - // create index. Index does not own qzr - index = std::make_unique(qzr.get(), dim, nlist, metric.value(), is_cosine); - // train - index->train(rows, (const float*)data); - // replace quantizer with a regular IndexFlat - qzr = to_index_flat(std::move(qzr)); - index->quantizer = qzr.get(); - // transfer ownership of qzr to index - qzr.release(); - index->own_fields = true; - } - if constexpr (std::is_same::value) { - const IvfFlatCcConfig& ivf_flat_cc_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, ivf_flat_cc_cfg.nlist.value()); - - const bool use_elkan = ivf_flat_cc_cfg.use_elkan.value_or(true); - - // create quantizer for the training - std::unique_ptr qzr = - std::make_unique(dim, metric.value(), false, use_elkan); - // create index. Index does not own qzr - index = std::make_unique(qzr.get(), dim, nlist, ivf_flat_cc_cfg.ssize.value(), - metric.value(), is_cosine); - // train - index->train(rows, (const float*)data); - // replace quantizer with a regular IndexFlat - qzr = to_index_flat(std::move(qzr)); - index->quantizer = qzr.get(); - // transfer ownership of qzr to index - qzr.release(); - index->own_fields = true; - // ivfflat_cc has no serialize stage, make map at build stage - index->make_direct_map(true, faiss::DirectMap::ConcurrentArray); - } - if constexpr (std::is_same::value) { - const IvfPqConfig& ivf_pq_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, ivf_pq_cfg.nlist.value()); - auto nbits = MatchNbits(rows, ivf_pq_cfg.nbits.value()); - - const bool use_elkan = ivf_pq_cfg.use_elkan.value_or(true); - - // create quantizer for the training - std::unique_ptr qzr = - std::make_unique(dim, metric.value(), false, use_elkan); - // create index. Index does not own qzr - index = - std::make_unique(qzr.get(), dim, nlist, ivf_pq_cfg.m.value(), nbits, metric.value()); - // train - index->train(rows, (const float*)data); - // replace quantizer with a regular IndexFlat - qzr = to_index_flat(std::move(qzr)); - index->quantizer = qzr.get(); - // transfer ownership of qzr to index - qzr.release(); - index->own_fields = true; - } - if constexpr (std::is_same::value) { - const ScannConfig& scann_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, scann_cfg.nlist.value()); - bool is_cosine = base_cfg.metric_type.value() == metric::COSINE; - - const bool use_elkan = scann_cfg.use_elkan.value_or(true); - - // create quantizer for the training - std::unique_ptr qzr = - std::make_unique(dim, metric.value(), false, use_elkan); - // create base index. it does not own qzr - auto base_index = std::make_unique(qzr.get(), dim, nlist, (dim + 1) / 2, 4, - is_cosine, metric.value()); - // create scann index, which does not base_index by default, - // but owns the refine index by default omg - if (scann_cfg.with_raw_data.value()) { - index = std::make_unique(base_index.get(), (const float*)data); - } else { - index = std::make_unique(base_index.get(), nullptr); - } - // train - index->train(rows, (const float*)data); - // at this moment, we still own qzr. - // replace quantizer with a regular IndexFlat - qzr = to_index_flat(std::move(qzr)); - base_index->quantizer = qzr.get(); - // release qzr - qzr.release(); - base_index->own_fields = true; - // transfer ownership of the base index - base_index.release(); - index->own_fields = true; - } - if constexpr (std::is_same::value) { - const IvfSqConfig& ivf_sq_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, ivf_sq_cfg.nlist.value()); - - const bool use_elkan = ivf_sq_cfg.use_elkan.value_or(true); - - // create quantizer for the training - std::unique_ptr qzr = - std::make_unique(dim, metric.value(), false, use_elkan); - // create index. Index does not own qzr - index = std::make_unique( - qzr.get(), dim, nlist, faiss::ScalarQuantizer::QuantizerType::QT_8bit, metric.value()); - // train - index->train(rows, (const float*)data); - // replace quantizer with a regular IndexFlat - qzr = to_index_flat(std::move(qzr)); - index->quantizer = qzr.get(); - // transfer ownership of qzr to index - qzr.release(); - index->own_fields = true; + if constexpr (std::is_same::value) { + const IvfFlatConfig& ivf_flat_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, ivf_flat_cfg.nlist.value()); + + const bool use_elkan = ivf_flat_cfg.use_elkan.value_or(true); + + // create quantizer for the training + std::unique_ptr qzr = + std::make_unique(dim, metric.value(), false, use_elkan); + // create index. Index does not own qzr + index = std::make_unique(qzr.get(), dim, nlist, metric.value(), is_cosine); + // train + index->train(rows, (const float*)data); + // replace quantizer with a regular IndexFlat + qzr = to_index_flat(std::move(qzr)); + index->quantizer = qzr.get(); + // transfer ownership of qzr to index + qzr.release(); + index->own_fields = true; + } + if constexpr (std::is_same::value) { + const IvfFlatCcConfig& ivf_flat_cc_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, ivf_flat_cc_cfg.nlist.value()); + + const bool use_elkan = ivf_flat_cc_cfg.use_elkan.value_or(true); + + // create quantizer for the training + std::unique_ptr qzr = + std::make_unique(dim, metric.value(), false, use_elkan); + // create index. Index does not own qzr + index = std::make_unique(qzr.get(), dim, nlist, ivf_flat_cc_cfg.ssize.value(), + metric.value(), is_cosine); + // train + index->train(rows, (const float*)data); + // replace quantizer with a regular IndexFlat + qzr = to_index_flat(std::move(qzr)); + index->quantizer = qzr.get(); + // transfer ownership of qzr to index + qzr.release(); + index->own_fields = true; + // ivfflat_cc has no serialize stage, make map at build stage + index->make_direct_map(true, faiss::DirectMap::ConcurrentArray); + } + if constexpr (std::is_same::value) { + const IvfPqConfig& ivf_pq_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, ivf_pq_cfg.nlist.value()); + auto nbits = MatchNbits(rows, ivf_pq_cfg.nbits.value()); + + const bool use_elkan = ivf_pq_cfg.use_elkan.value_or(true); + + // create quantizer for the training + std::unique_ptr qzr = + std::make_unique(dim, metric.value(), false, use_elkan); + // create index. Index does not own qzr + index = std::make_unique(qzr.get(), dim, nlist, ivf_pq_cfg.m.value(), nbits, metric.value()); + // train + index->train(rows, (const float*)data); + // replace quantizer with a regular IndexFlat + qzr = to_index_flat(std::move(qzr)); + index->quantizer = qzr.get(); + // transfer ownership of qzr to index + qzr.release(); + index->own_fields = true; + } + if constexpr (std::is_same::value) { + const ScannConfig& scann_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, scann_cfg.nlist.value()); + bool is_cosine = base_cfg.metric_type.value() == metric::COSINE; + + const bool use_elkan = scann_cfg.use_elkan.value_or(true); + + // create quantizer for the training + std::unique_ptr qzr = + std::make_unique(dim, metric.value(), false, use_elkan); + // create base index. it does not own qzr + auto base_index = std::make_unique(qzr.get(), dim, nlist, (dim + 1) / 2, 4, + is_cosine, metric.value()); + // create scann index, which does not base_index by default, + // but owns the refine index by default omg + if (scann_cfg.with_raw_data.value()) { + index = std::make_unique(base_index.get(), (const float*)data); + } else { + index = std::make_unique(base_index.get(), nullptr); } - if constexpr (std::is_same::value) { - const IvfBinConfig& ivf_bin_cfg = static_cast(cfg); - auto nlist = MatchNlist(rows, ivf_bin_cfg.nlist.value()); - - // create quantizer - auto qzr = std::make_unique(dim, metric.value()); - // create index. Index does not own qzr - index = std::make_unique(qzr.get(), dim, nlist, metric.value()); - // train - index->train(rows, (const uint8_t*)data); - // transfer ownership of qzr to index - qzr.release(); - index->own_fields = true; - } - } catch (std::exception& e) { - LOG_KNOWHERE_WARNING_ << "faiss inner error: " << e.what(); - return Status::faiss_inner_error; + // train + index->train(rows, (const float*)data); + // at this moment, we still own qzr. + // replace quantizer with a regular IndexFlat + qzr = to_index_flat(std::move(qzr)); + base_index->quantizer = qzr.get(); + // release qzr + qzr.release(); + base_index->own_fields = true; + // transfer ownership of the base index + base_index.release(); + index->own_fields = true; + } + if constexpr (std::is_same::value) { + const IvfSqConfig& ivf_sq_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, ivf_sq_cfg.nlist.value()); + + const bool use_elkan = ivf_sq_cfg.use_elkan.value_or(true); + + // create quantizer for the training + std::unique_ptr qzr = + std::make_unique(dim, metric.value(), false, use_elkan); + // create index. Index does not own qzr + index = std::make_unique( + qzr.get(), dim, nlist, faiss::ScalarQuantizer::QuantizerType::QT_8bit, metric.value()); + // train + index->train(rows, (const float*)data); + // replace quantizer with a regular IndexFlat + qzr = to_index_flat(std::move(qzr)); + index->quantizer = qzr.get(); + // transfer ownership of qzr to index + qzr.release(); + index->own_fields = true; + } + if constexpr (std::is_same::value) { + const IvfBinConfig& ivf_bin_cfg = static_cast(cfg); + auto nlist = MatchNlist(rows, ivf_bin_cfg.nlist.value()); + + // create quantizer + auto qzr = std::make_unique(dim, metric.value()); + // create index. Index does not own qzr + index = std::make_unique(qzr.get(), dim, nlist, metric.value()); + // train + index->train(rows, (const uint8_t*)data); + // transfer ownership of qzr to index + qzr.release(); + index->own_fields = true; } index_ = std::move(index); @@ -472,20 +488,25 @@ IvfIndexNode::Add(const DataSet& dataset, const Config& cfg auto data = dataset.GetTensor(); auto rows = dataset.GetRows(); const BaseConfig& base_cfg = static_cast(cfg); - std::unique_ptr setter; - if (base_cfg.num_build_thread.has_value()) { - setter = std::make_unique(base_cfg.num_build_thread.value()); - } else { - setter = std::make_unique(); - } - try { - if constexpr (std::is_same::value) { - index_->add(rows, (const uint8_t*)data); - } else { - index_->add(rows, (const float*)data); - } - } catch (std::exception& e) { - LOG_KNOWHERE_WARNING_ << "faiss inner error: " << e.what(); + // use build_pool_ to make sure the OMP threads spawded by index_->add + // can inherit the low nice value of threads in build_pool_. + auto tryObj = build_pool_ + ->push([&] { + std::unique_ptr setter; + if (base_cfg.num_build_thread.has_value()) { + setter = std::make_unique(base_cfg.num_build_thread.value()); + } else { + setter = std::make_unique(); + } + if constexpr (std::is_same::value) { + index_->add(rows, (const uint8_t*)data); + } else { + index_->add(rows, (const float*)data); + } + }) + .getTry(); + if (tryObj.hasException()) { + LOG_KNOWHERE_WARNING_ << "faiss internal error: " << tryObj.exception().what(); return Status::faiss_inner_error; } return Status::success;