Skip to content

Commit

Permalink
wrap IVF index train/build calls in lambdas passed to knowhere thread…
Browse files Browse the repository at this point in the history
… pool, so OMP threads spawned will have low nice values (#359)

Signed-off-by: Buqian Zheng <[email protected]>
  • Loading branch information
zhengbuqian authored Jan 25, 2024
1 parent 0f63f2d commit e9574eb
Showing 1 changed file with 171 additions and 150 deletions.
321 changes: 171 additions & 150 deletions src/index/ivf/ivf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class IvfIndexNode : public IndexNode {
static_assert(std::is_same_v<DataType, fp32> || std::is_same_v<DataType, bin1>,
"IvfIndexNode only support float/bianry");
search_pool_ = ThreadPool::GetGlobalSearchThreadPool();
build_pool_ = ThreadPool::GetGlobalBuildThreadPool();
}
Status
Train(const DataSet& dataset, const Config& cfg) override;
Expand Down Expand Up @@ -224,9 +225,17 @@ class IvfIndexNode : public IndexNode {
Status
SerializeImpl(BinarySet& binset, IVFFlatTag) const;

Status
TrainInternal(const DataSet& dataset, const Config& cfg);

private:
std::unique_ptr<IndexType> index_;
std::shared_ptr<ThreadPool> search_pool_;
// Faiss uses OpenMP for training/building the index and we have no control
// over those threads. build_pool_ is used to make sure the OMP threads
// spawded during index training/building can inherit the low nice value of
// threads in build_pool_.
std::shared_ptr<ThreadPool> build_pool_;
};

} // namespace knowhere
Expand Down Expand Up @@ -279,6 +288,19 @@ to_index_flat(std::unique_ptr<faiss::IndexFlat>&& index) {
template <typename DataType, typename IndexType>
Status
IvfIndexNode<DataType, IndexType>::Train(const DataSet& dataset, const Config& cfg) {
// use build_pool_ to make sure the OMP threads spawded by index_->train etc
// can inherit the low nice value of threads in build_pool_.
auto tryObj = build_pool_->push([&] { return TrainInternal(dataset, cfg); }).getTry();
if (tryObj.hasValue()) {
return tryObj.value();
}
LOG_KNOWHERE_WARNING_ << "faiss internal error: " << tryObj.exception().what();
return Status::faiss_inner_error;
}

template <typename DataType, typename IndexType>
Status
IvfIndexNode<DataType, IndexType>::TrainInternal(const DataSet& dataset, const Config& cfg) {
const BaseConfig& base_cfg = static_cast<const IvfConfig&>(cfg);
std::unique_ptr<ThreadPool::ScopedOmpSetter> setter;
if (base_cfg.num_build_thread.has_value()) {
Expand Down Expand Up @@ -319,143 +341,137 @@ IvfIndexNode<DataType, IndexType>::Train(const DataSet& dataset, const Config& c
std::unique_ptr<IndexType> index;
// if cfg.use_elkan is used, then we'll use a temporary instance of
// IndexFlatElkan for the training.
try {
if constexpr (std::is_same<faiss::IndexIVFFlat, IndexType>::value) {
const IvfFlatConfig& ivf_flat_cfg = static_cast<const IvfFlatConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_flat_cfg.nlist.value());

const bool use_elkan = ivf_flat_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFFlat>(qzr.get(), dim, nlist, metric.value(), is_cosine);
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexIVFFlatCC, IndexType>::value) {
const IvfFlatCcConfig& ivf_flat_cc_cfg = static_cast<const IvfFlatCcConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_flat_cc_cfg.nlist.value());

const bool use_elkan = ivf_flat_cc_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFFlatCC>(qzr.get(), dim, nlist, ivf_flat_cc_cfg.ssize.value(),
metric.value(), is_cosine);
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
// ivfflat_cc has no serialize stage, make map at build stage
index->make_direct_map(true, faiss::DirectMap::ConcurrentArray);
}
if constexpr (std::is_same<faiss::IndexIVFPQ, IndexType>::value) {
const IvfPqConfig& ivf_pq_cfg = static_cast<const IvfPqConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_pq_cfg.nlist.value());
auto nbits = MatchNbits(rows, ivf_pq_cfg.nbits.value());

const bool use_elkan = ivf_pq_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index =
std::make_unique<faiss::IndexIVFPQ>(qzr.get(), dim, nlist, ivf_pq_cfg.m.value(), nbits, metric.value());
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexScaNN, IndexType>::value) {
const ScannConfig& scann_cfg = static_cast<const ScannConfig&>(cfg);
auto nlist = MatchNlist(rows, scann_cfg.nlist.value());
bool is_cosine = base_cfg.metric_type.value() == metric::COSINE;

const bool use_elkan = scann_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create base index. it does not own qzr
auto base_index = std::make_unique<faiss::IndexIVFPQFastScan>(qzr.get(), dim, nlist, (dim + 1) / 2, 4,
is_cosine, metric.value());
// create scann index, which does not base_index by default,
// but owns the refine index by default omg
if (scann_cfg.with_raw_data.value()) {
index = std::make_unique<faiss::IndexScaNN>(base_index.get(), (const float*)data);
} else {
index = std::make_unique<faiss::IndexScaNN>(base_index.get(), nullptr);
}
// train
index->train(rows, (const float*)data);
// at this moment, we still own qzr.
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
base_index->quantizer = qzr.get();
// release qzr
qzr.release();
base_index->own_fields = true;
// transfer ownership of the base index
base_index.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexIVFScalarQuantizer, IndexType>::value) {
const IvfSqConfig& ivf_sq_cfg = static_cast<const IvfSqConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_sq_cfg.nlist.value());

const bool use_elkan = ivf_sq_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFScalarQuantizer>(
qzr.get(), dim, nlist, faiss::ScalarQuantizer::QuantizerType::QT_8bit, metric.value());
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
if constexpr (std::is_same<faiss::IndexIVFFlat, IndexType>::value) {
const IvfFlatConfig& ivf_flat_cfg = static_cast<const IvfFlatConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_flat_cfg.nlist.value());

const bool use_elkan = ivf_flat_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFFlat>(qzr.get(), dim, nlist, metric.value(), is_cosine);
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexIVFFlatCC, IndexType>::value) {
const IvfFlatCcConfig& ivf_flat_cc_cfg = static_cast<const IvfFlatCcConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_flat_cc_cfg.nlist.value());

const bool use_elkan = ivf_flat_cc_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFFlatCC>(qzr.get(), dim, nlist, ivf_flat_cc_cfg.ssize.value(),
metric.value(), is_cosine);
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
// ivfflat_cc has no serialize stage, make map at build stage
index->make_direct_map(true, faiss::DirectMap::ConcurrentArray);
}
if constexpr (std::is_same<faiss::IndexIVFPQ, IndexType>::value) {
const IvfPqConfig& ivf_pq_cfg = static_cast<const IvfPqConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_pq_cfg.nlist.value());
auto nbits = MatchNbits(rows, ivf_pq_cfg.nbits.value());

const bool use_elkan = ivf_pq_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFPQ>(qzr.get(), dim, nlist, ivf_pq_cfg.m.value(), nbits, metric.value());
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexScaNN, IndexType>::value) {
const ScannConfig& scann_cfg = static_cast<const ScannConfig&>(cfg);
auto nlist = MatchNlist(rows, scann_cfg.nlist.value());
bool is_cosine = base_cfg.metric_type.value() == metric::COSINE;

const bool use_elkan = scann_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create base index. it does not own qzr
auto base_index = std::make_unique<faiss::IndexIVFPQFastScan>(qzr.get(), dim, nlist, (dim + 1) / 2, 4,
is_cosine, metric.value());
// create scann index, which does not base_index by default,
// but owns the refine index by default omg
if (scann_cfg.with_raw_data.value()) {
index = std::make_unique<faiss::IndexScaNN>(base_index.get(), (const float*)data);
} else {
index = std::make_unique<faiss::IndexScaNN>(base_index.get(), nullptr);
}
if constexpr (std::is_same<faiss::IndexBinaryIVF, IndexType>::value) {
const IvfBinConfig& ivf_bin_cfg = static_cast<const IvfBinConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_bin_cfg.nlist.value());

// create quantizer
auto qzr = std::make_unique<faiss::IndexBinaryFlat>(dim, metric.value());
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexBinaryIVF>(qzr.get(), dim, nlist, metric.value());
// train
index->train(rows, (const uint8_t*)data);
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
} catch (std::exception& e) {
LOG_KNOWHERE_WARNING_ << "faiss inner error: " << e.what();
return Status::faiss_inner_error;
// train
index->train(rows, (const float*)data);
// at this moment, we still own qzr.
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
base_index->quantizer = qzr.get();
// release qzr
qzr.release();
base_index->own_fields = true;
// transfer ownership of the base index
base_index.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexIVFScalarQuantizer, IndexType>::value) {
const IvfSqConfig& ivf_sq_cfg = static_cast<const IvfSqConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_sq_cfg.nlist.value());

const bool use_elkan = ivf_sq_cfg.use_elkan.value_or(true);

// create quantizer for the training
std::unique_ptr<faiss::IndexFlat> qzr =
std::make_unique<faiss::IndexFlatElkan>(dim, metric.value(), false, use_elkan);
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexIVFScalarQuantizer>(
qzr.get(), dim, nlist, faiss::ScalarQuantizer::QuantizerType::QT_8bit, metric.value());
// train
index->train(rows, (const float*)data);
// replace quantizer with a regular IndexFlat
qzr = to_index_flat(std::move(qzr));
index->quantizer = qzr.get();
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
if constexpr (std::is_same<faiss::IndexBinaryIVF, IndexType>::value) {
const IvfBinConfig& ivf_bin_cfg = static_cast<const IvfBinConfig&>(cfg);
auto nlist = MatchNlist(rows, ivf_bin_cfg.nlist.value());

// create quantizer
auto qzr = std::make_unique<faiss::IndexBinaryFlat>(dim, metric.value());
// create index. Index does not own qzr
index = std::make_unique<faiss::IndexBinaryIVF>(qzr.get(), dim, nlist, metric.value());
// train
index->train(rows, (const uint8_t*)data);
// transfer ownership of qzr to index
qzr.release();
index->own_fields = true;
}
index_ = std::move(index);

Expand All @@ -472,20 +488,25 @@ IvfIndexNode<DataType, IndexType>::Add(const DataSet& dataset, const Config& cfg
auto data = dataset.GetTensor();
auto rows = dataset.GetRows();
const BaseConfig& base_cfg = static_cast<const IvfConfig&>(cfg);
std::unique_ptr<ThreadPool::ScopedOmpSetter> setter;
if (base_cfg.num_build_thread.has_value()) {
setter = std::make_unique<ThreadPool::ScopedOmpSetter>(base_cfg.num_build_thread.value());
} else {
setter = std::make_unique<ThreadPool::ScopedOmpSetter>();
}
try {
if constexpr (std::is_same<faiss::IndexBinaryIVF, IndexType>::value) {
index_->add(rows, (const uint8_t*)data);
} else {
index_->add(rows, (const float*)data);
}
} catch (std::exception& e) {
LOG_KNOWHERE_WARNING_ << "faiss inner error: " << e.what();
// use build_pool_ to make sure the OMP threads spawded by index_->add
// can inherit the low nice value of threads in build_pool_.
auto tryObj = build_pool_
->push([&] {
std::unique_ptr<ThreadPool::ScopedOmpSetter> setter;
if (base_cfg.num_build_thread.has_value()) {
setter = std::make_unique<ThreadPool::ScopedOmpSetter>(base_cfg.num_build_thread.value());
} else {
setter = std::make_unique<ThreadPool::ScopedOmpSetter>();
}
if constexpr (std::is_same<faiss::IndexBinaryIVF, IndexType>::value) {
index_->add(rows, (const uint8_t*)data);
} else {
index_->add(rows, (const float*)data);
}
})
.getTry();
if (tryObj.hasException()) {
LOG_KNOWHERE_WARNING_ << "faiss internal error: " << tryObj.exception().what();
return Status::faiss_inner_error;
}
return Status::success;
Expand Down

0 comments on commit e9574eb

Please sign in to comment.