Skip to content

Commit

Permalink
Fix diskann memory leak after failure (#406)
Browse files Browse the repository at this point in the history
Signed-off-by: Patrick Weizhi Xu <[email protected]>
  • Loading branch information
PwzXxm authored Feb 26, 2024
1 parent 643054b commit 7349e42
Show file tree
Hide file tree
Showing 15 changed files with 193 additions and 250 deletions.
7 changes: 2 additions & 5 deletions src/index/diskann/diskann.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,9 @@ DiskANNIndexNode<DataType>::Deserialize(const BinarySet& binset, const Config& c
if (file_exists(cached_nodes_file)) {
LOG_KNOWHERE_INFO_ << "Reading cached nodes from file.";
size_t num_nodes, nodes_id_dim;
uint32_t* cached_nodes_ids = nullptr;
std::unique_ptr<uint32_t[]> cached_nodes_ids = nullptr;
diskann::load_bin<uint32_t>(cached_nodes_file, cached_nodes_ids, num_nodes, nodes_id_dim);
node_list.assign(cached_nodes_ids, cached_nodes_ids + num_nodes);
if (cached_nodes_ids != nullptr) {
delete[] cached_nodes_ids;
}
node_list.assign(cached_nodes_ids.get(), cached_nodes_ids.get() + num_nodes);
} else {
auto num_nodes_to_cache = GetCachedNodeNum(prep_conf.search_cache_budget_gb.value(),
pq_flash_index_->get_data_dim(), pq_flash_index_->get_max_degree());
Expand Down
28 changes: 13 additions & 15 deletions thirdparty/DiskANN/include/diskann/cached_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ class cached_ifstream {
this->open(filename, cache_size);
}
~cached_ifstream() {
delete[] cache_buf;
reader.close();
}

Expand All @@ -37,8 +36,8 @@ class cached_ifstream {
assert(cacheSize > 0);
cacheSize = (std::min)(cacheSize, fsize);
this->cache_size = cacheSize;
cache_buf = new char[cacheSize];
reader.read(cache_buf, cacheSize);
cache_buf = std::make_unique<char[]>(cacheSize);
reader.read(cache_buf.get(), cacheSize);
LOG_KNOWHERE_DEBUG_ << "Opened: " << filename.c_str()
<< ", size: " << fsize
<< ", cache_size: " << cacheSize;
Expand All @@ -58,7 +57,7 @@ class cached_ifstream {

if (n_bytes <= (cache_size - cur_off)) {
// case 1: cache contains all data
memcpy(read_buf, cache_buf + cur_off, n_bytes);
memcpy(read_buf, cache_buf.get() + cur_off, n_bytes);
cur_off += n_bytes;
} else {
// case 2: cache contains some data
Expand All @@ -73,7 +72,7 @@ class cached_ifstream {
throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__,
__LINE__);
}
memcpy(read_buf, cache_buf + cur_off, cached_bytes);
memcpy(read_buf, cache_buf.get() + cur_off, cached_bytes);

// go to disk and fetch more data
reader.read(read_buf + cached_bytes, n_bytes - cached_bytes);
Expand All @@ -83,7 +82,7 @@ class cached_ifstream {
uint64_t size_left = fsize - reader.tellg();

if (size_left >= cache_size) {
reader.read(cache_buf, cache_size);
reader.read(cache_buf.get(), cache_size);
cur_off = 0;
}
// note that if size_left < cache_size, then cur_off = cache_size, so
Expand All @@ -97,7 +96,7 @@ class cached_ifstream {
// # bytes to cache in one shot read
uint64_t cache_size = 0;
// underlying buf for cache
char* cache_buf = nullptr;
std::unique_ptr<char[]> cache_buf = nullptr;
// offset into cache_buf for cur_pos
uint64_t cur_off = 0;
// file size
Expand All @@ -114,7 +113,7 @@ class cached_ofstream {
writer.open(filename, std::ios::binary);
assert(writer.is_open());
assert(cache_size > 0);
cache_buf = new char[cache_size];
cache_buf = std::make_unique<char[]>(cache_size);
LOG_KNOWHERE_DEBUG_ << "Opened: " << filename.c_str()
<< ", cache_size: " << cache_size;
} catch (std::system_error& e) {
Expand All @@ -129,7 +128,6 @@ class cached_ofstream {
this->flush_cache();
}

delete[] cache_buf;
writer.close();
LOG_KNOWHERE_DEBUG_ << "Finished writing " << fsize << "B";
}
Expand All @@ -142,27 +140,27 @@ class cached_ofstream {
assert(cache_buf != nullptr);
if (n_bytes <= (cache_size - cur_off)) {
// case 1: cache can take all data
memcpy(cache_buf + cur_off, write_buf, n_bytes);
memcpy(cache_buf.get() + cur_off, write_buf, n_bytes);
cur_off += n_bytes;
} else {
// case 2: cache cant take all data
// go to disk and write existing cache data
writer.write(cache_buf, cur_off);
writer.write(cache_buf.get(), cur_off);
fsize += cur_off;
// write the new data to disk
writer.write(write_buf, n_bytes);
fsize += n_bytes;
// memset all cache data and reset cur_off
memset(cache_buf, 0, cache_size);
memset(cache_buf.get(), 0, cache_size);
cur_off = 0;
}
}

void flush_cache() {
assert(cache_buf != nullptr);
writer.write(cache_buf, cur_off);
writer.write(cache_buf.get(), cur_off);
fsize += cur_off;
memset(cache_buf, 0, cache_size);
memset(cache_buf.get(), 0, cache_size);
cur_off = 0;
}

Expand All @@ -177,7 +175,7 @@ class cached_ofstream {
// # bytes to cache for one shot write
uint64_t cache_size = 0;
// underlying buf for cache
char* cache_buf = nullptr;
std::unique_ptr<char[]> cache_buf = nullptr;
// offset into cache_buf for cur_pos
uint64_t cur_off = 0;

Expand Down
3 changes: 2 additions & 1 deletion thirdparty/DiskANN/include/diskann/logger_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#pragma once

#include <memory>
#include <sstream>
#include <mutex>

Expand Down Expand Up @@ -36,7 +37,7 @@ namespace diskann {

private:
FILE* _fp;
char* _buf;
std::unique_ptr<char[]> _buf;
int _bufIndex;
std::mutex _mutex;
ANNIndex::LogLevel _logLevel;
Expand Down
24 changes: 13 additions & 11 deletions thirdparty/DiskANN/include/diskann/math_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace math_utils {
// compute l2-squared norms of data stored in row major num_points * dim,
// needs
// to be pre-allocated
void compute_vecs_l2sq(float* vecs_l2sq, float* data, const size_t num_points,
void compute_vecs_l2sq(float* vecs_l2sq, const float* data, const size_t num_points,
const size_t dim);

void rotate_data_randomly(float* data, size_t num_points, size_t dim,
Expand Down Expand Up @@ -48,9 +48,9 @@ namespace math_utils {
// those
// values

void compute_closest_centers(float* data, size_t num_points, size_t dim,
float* pivot_data, size_t num_centers, size_t k,
uint32_t* closest_centers_ivf,
void compute_closest_centers(const float* data, size_t num_points, size_t dim,
const float* pivot_data, size_t num_centers,
size_t k, uint32_t* closest_centers_ivf,
std::vector<size_t>* inverted_index = NULL,
float* pts_norms_squared = NULL);

Expand All @@ -74,25 +74,27 @@ namespace kmeans {
// If closest_centers == NULL, will allocate memory and return.
// Similarly, if closest_docs == NULL, will allocate memory and return.

float lloyds_iter(float* data, size_t num_points, size_t dim, float* centers,
size_t num_centers, std::vector<size_t>* closest_docs,
uint32_t*& closest_center);
float lloyds_iter(const float* data, size_t num_points, size_t dim,
float* centers, size_t num_centers,
std::unique_ptr<std::vector<size_t>[]>& closest_docs,
std::unique_ptr<uint32_t[]>& closest_center);

// Run Lloyds until max_reps or stopping criterion
// If you pass NULL for closest_docs and closest_center, it will NOT return
// the results, else it will assume appriate allocation as closest_docs = new
// vector<size_t> [num_centers], and closest_center = new size_t[num_points]
// Final centers are output in centers as row major num_centers * dim
//
float run_lloyds(float* data, size_t num_points, size_t dim, float* centers,
const size_t num_centers, const size_t max_reps,
std::vector<size_t>* closest_docs, uint32_t* closest_center);
float run_lloyds(const float* data, size_t num_points, size_t dim,
float* centers, const size_t num_centers,
const size_t max_reps, std::vector<size_t>* closest_docs,
uint32_t* closest_center);

// assumes already memory allocated for pivot_data as new
// float[num_centers*dim] and select randomly num_centers points as pivots
void selecting_pivots(float* data, size_t num_points, size_t dim,
float* pivot_data, size_t num_centers);

void kmeanspp_selecting_pivots(float* data, size_t num_points, size_t dim,
void kmeanspp_selecting_pivots(const float* data, size_t num_points, size_t dim,
float* pivot_data, size_t num_centers);
} // namespace kmeans
8 changes: 5 additions & 3 deletions thirdparty/DiskANN/include/diskann/partition_and_pq.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,16 @@ void gen_random_slice(const std::string base_file,

template<typename T>
void gen_random_slice(const std::string data_file, double p_val,
float *&sampled_data, size_t &slice_size, size_t &ndims);
std::unique_ptr<float[]> &sampled_data,
size_t &slice_size, size_t &ndims);

template<typename T>
void gen_random_slice(const T *inputdata, size_t npts, size_t ndims,
double p_val, float *&sampled_data, size_t &slice_size);
double p_val, std::unique_ptr<float[]> &sampled_data,
size_t &slice_size);

int estimate_cluster_sizes(float *test_data_float, size_t num_test,
float *pivots, const size_t num_centers,
const float *pivots, const size_t num_centers,
const size_t dim, const size_t k_base,
std::vector<size_t> &cluster_sizes);

Expand Down
8 changes: 4 additions & 4 deletions thirdparty/DiskANN/include/diskann/pq_flash_index.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ namespace diskann {
float max_base_norm = 0.0f;

// used only for cosine search to re-scale the caculated distance.
float *base_norms = nullptr;
std::unique_ptr<float[]> base_norms = nullptr;

// data info
bool long_node = false;
Expand Down Expand Up @@ -225,7 +225,7 @@ namespace diskann {
// data: _u8 * n_chunks
// chunk_size = chunk size of each dimension chunk
// pq_tables = float* [[2^8 * [chunk_size]] * n_chunks]
_u8 *data = nullptr;
std::unique_ptr<_u8[]> data = nullptr;
_u64 n_chunks;
FixedChunkPQTable pq_table;

Expand Down Expand Up @@ -259,7 +259,7 @@ namespace diskann {

// graph has one entry point by default,
// we can optionally have multiple starting points
uint32_t *medoids = nullptr;
std::unique_ptr<uint32_t[]> medoids = nullptr;
// defaults to 1
size_t num_medoids;
// by default, it is empty. If there are multiple
Expand All @@ -271,7 +271,7 @@ namespace diskann {
std::shared_mutex cache_mtx;

// nhood_cache
unsigned *nhood_cache_buf = nullptr;
std::unique_ptr<unsigned[]> nhood_cache_buf = nullptr;
tsl::robin_map<_u32, std::pair<_u32, _u32 *>>
nhood_cache; // <id, <neihbors_num, neihbors>>

Expand Down
40 changes: 15 additions & 25 deletions thirdparty/DiskANN/include/diskann/pq_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,21 @@ namespace diskann {

class FixedChunkPQTable {
// data_dim = n_chunks * chunk_size;
float* tables =
std::unique_ptr<float[]> tables =
nullptr; // pq_tables = float* [[2^8 * [chunk_size]] * n_chunks]
// _u64 n_chunks; // n_chunks = # of chunks ndims is split into
// _u64 chunk_size; // chunk_size = chunk size of each dimension chunk
_u64 ndims = 0; // ndims = chunk_size * n_chunks
_u64 n_chunks = 0;
_u32* chunk_offsets = nullptr;
_u32* rearrangement = nullptr;
float* centroid = nullptr;
float* tables_T = nullptr; // same as pq_tables, but col-major
std::unique_ptr<_u32[]> chunk_offsets = nullptr;
std::unique_ptr<_u32[]> rearrangement = nullptr;
std::unique_ptr<float[]> centroid = nullptr;
std::unique_ptr<float[]> tables_T = nullptr; // same as pq_tables, but col-major
public:
FixedChunkPQTable() {
}

virtual ~FixedChunkPQTable() {
if (tables != nullptr)
delete[] tables;
if (tables_T != nullptr)
delete[] tables_T;
if (rearrangement != nullptr)
delete[] rearrangement;
if (chunk_offsets != nullptr)
delete[] chunk_offsets;
if (centroid != nullptr)
delete[] centroid;
}

void load_pq_centroid_bin(const char* pq_table_file, size_t num_chunks) {
Expand Down Expand Up @@ -117,23 +107,23 @@ namespace diskann {
}
} else {
this->n_chunks = num_chunks;
rearrangement = new uint32_t[ndims];
rearrangement = std::make_unique<uint32_t[]>(ndims);

uint64_t chunk_size = DIV_ROUND_UP(ndims, num_chunks);
for (uint32_t d = 0; d < ndims; d++)
rearrangement[d] = d;
chunk_offsets = new uint32_t[num_chunks + 1];
chunk_offsets = std::make_unique<uint32_t[]>(num_chunks + 1);
for (uint32_t d = 0; d <= num_chunks; d++)
chunk_offsets[d] = (_u32) (std::min)(ndims, d * chunk_size);
centroid = new float[ndims];
std::memset(centroid, 0, ndims * sizeof(float));
centroid = std::make_unique<float[]>(ndims);
std::memset(centroid.get(), 0, ndims * sizeof(float));
}

LOG_KNOWHERE_INFO_ << "PQ Pivots: #ctrs: " << npts_u64
<< ", #dims: " << ndims_u64 << ", #chunks: " << n_chunks;
// assert((_u64) ndims_u32 == n_chunks * chunk_size);
// alloc and compute transpose
tables_T = new float[256 * ndims_u64];
tables_T = std::make_unique<float[]>(256 * ndims_u64);
for (_u64 i = 0; i < 256; i++) {
for (_u64 j = 0; j < ndims_u64; j++) {
tables_T[j * 256 + i] = tables[i * ndims_u64 + j];
Expand All @@ -157,7 +147,7 @@ namespace diskann {
float* chunk_dists = dist_vec + (256 * chunk);
for (_u64 j = chunk_offsets[chunk]; j < chunk_offsets[chunk + 1]; j++) {
_u64 permuted_dim_in_query = rearrangement[j];
const float* centers_dim_vec = tables_T + (256 * j);
const float* centers_dim_vec = tables_T.get() + (256 * j);
for (_u64 idx = 0; idx < 256; idx++) {
double diff =
centers_dim_vec[idx] - (query_vec[permuted_dim_in_query] -
Expand All @@ -173,7 +163,7 @@ namespace diskann {
for (_u64 chunk = 0; chunk < n_chunks; chunk++) {
for (_u64 j = chunk_offsets[chunk]; j < chunk_offsets[chunk + 1]; j++) {
_u64 permuted_dim_in_query = rearrangement[j];
const float* centers_dim_vec = tables_T + (256 * j);
const float* centers_dim_vec = tables_T.get() + (256 * j);
float diff = centers_dim_vec[base_vec[chunk]] -
(query_vec[permuted_dim_in_query] -
centroid[permuted_dim_in_query]);
Expand All @@ -188,7 +178,7 @@ namespace diskann {
for (_u64 chunk = 0; chunk < n_chunks; chunk++) {
for (_u64 j = chunk_offsets[chunk]; j < chunk_offsets[chunk + 1]; j++) {
_u64 permuted_dim_in_query = rearrangement[j];
const float* centers_dim_vec = tables_T + (256 * j);
const float* centers_dim_vec = tables_T.get() + (256 * j);
float diff =
centers_dim_vec[base_vec[chunk]] *
query_vec[permuted_dim_in_query]; // assumes centroid is 0 to
Expand All @@ -204,7 +194,7 @@ namespace diskann {
for (_u64 chunk = 0; chunk < n_chunks; chunk++) {
for (_u64 j = chunk_offsets[chunk]; j < chunk_offsets[chunk + 1]; j++) {
_u64 original_dim = rearrangement[j];
const float* centers_dim_vec = tables_T + (256 * j);
const float* centers_dim_vec = tables_T.get() + (256 * j);
out_vec[original_dim] =
centers_dim_vec[base_vec[chunk]] + centroid[original_dim];
}
Expand All @@ -219,7 +209,7 @@ namespace diskann {
float* chunk_dists = dist_vec + (256 * chunk);
for (_u64 j = chunk_offsets[chunk]; j < chunk_offsets[chunk + 1]; j++) {
_u64 permuted_dim_in_query = rearrangement[j];
const float* centers_dim_vec = tables_T + (256 * j);
const float* centers_dim_vec = tables_T.get() + (256 * j);
for (_u64 idx = 0; idx < 256; idx++) {
double prod =
centers_dim_vec[idx] *
Expand Down
Loading

0 comments on commit 7349e42

Please sign in to comment.