Skip to content

Commit

Permalink
Fix Save One file bugs.
Browse files Browse the repository at this point in the history
  • Loading branch information
REDMOND\ninchen committed Jan 17, 2024
1 parent df84a6d commit 4467272
Show file tree
Hide file tree
Showing 7 changed files with 90 additions and 76 deletions.
3 changes: 3 additions & 0 deletions include/abstract_data_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
#include "types.h"
#include "windows_customizations.h"
#include "distance.h"
#include "aligned_file_reader.h"


namespace diskann
{
Expand All @@ -22,6 +24,7 @@ template <typename data_t> class AbstractDataStore

// Return number of points returned
virtual location_t load(const std::string &filename, size_t offset) = 0;
virtual location_t load(AlignedFileReader &reader, size_t offset) = 0;

// Why does store take num_pts? Since store only has capacity, but we allow
// resizing we can end up in a situation where the store has spare capacity.
Expand Down
1 change: 1 addition & 0 deletions include/in_mem_data_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ template <typename data_t> class InMemDataStore : public AbstractDataStore<data_
virtual ~InMemDataStore();

virtual location_t load(const std::string &filename, size_t offset = 0) override;
virtual location_t load(AlignedFileReader &reader, size_t offset = 0) override;
virtual size_t save(const std::string &filename, const location_t num_pts) override;
virtual size_t save(std::ofstream &writer, const location_t num_pts, size_t offset) override;

Expand Down
2 changes: 1 addition & 1 deletion include/index_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ class IndexConfigBuilder

IndexConfigBuilder &with_load_from_single_file_version(uint64_t load_from_one_file_version)
{
this->_save_as_one_file_version = load_from_one_file_version;
this->_load_from_one_file_version = load_from_one_file_version;
return *this;
}

Expand Down
11 changes: 8 additions & 3 deletions src/in_mem_data_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ template <typename data_t> location_t InMemDataStore<data_t>::load(const std::st
return load_impl(filename, offset);
}

template <typename data_t> location_t InMemDataStore<data_t>::load(AlignedFileReader &reader, size_t offset)
{
return load_impl(reader, offset);
}

#ifdef EXEC_ENV_OLS
template <typename data_t> location_t InMemDataStore<data_t>::load_impl(AlignedFileReader &reader, size_t offset)
{
size_t file_dim, file_num_points;

diskann::get_bin_metadata(reader, file_num_points, file_dim);
diskann::get_bin_metadata(reader, file_num_points, file_dim, offset);

if (file_dim != this->_dim)
{
Expand All @@ -63,7 +67,8 @@ template <typename data_t> location_t InMemDataStore<data_t>::load_impl(AlignedF
{
this->resize((location_t)file_num_points);
}
copy_aligned_data_from_file<data_t>(reader, _data, file_num_points, file_dim, _aligned_dim);

copy_aligned_data_from_file<data_t>(reader, _data, file_num_points, file_dim, _aligned_dim, offset);

return (location_t)file_num_points;
}
Expand Down
5 changes: 3 additions & 2 deletions src/in_mem_graph_store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ void InMemGraphStore::clear_graph()
}

#ifdef EXEC_ENV_OLS
std::tuple<uint32_t, uint32_t, size_t> InMemGraphStore::load_impl(AlignedFileReader &reader, size_t expected_num_points,
std::tuple<uint32_t, uint32_t, size_t> InMemGraphStore::load_impl(AlignedFileReader &reader,
size_t expected_num_points,
size_t offset)
{
size_t expected_file_size;
Expand All @@ -114,7 +115,7 @@ std::tuple<uint32_t, uint32_t, size_t> InMemGraphStore::load_impl(AlignedFileRea
<< ", _max_observed_degree: " << _max_observed_degree << ", _start: " << start
<< ", file_frozen_pts: " << file_frozen_pts << std::endl;

diskann::cout << "Loading vamana graph from reader..." << std::flush;
diskann::cout << "Loading vamana graph from reader..." << std::endl << std::flush;

// If user provides more points than max_points
// resize the _graph to the larger size.
Expand Down
95 changes: 48 additions & 47 deletions src/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -471,21 +471,24 @@ void Index<T, TagT, LabelT>::save(const char *filename, bool compact_before_save
{
metadata.graph_offset = static_cast<uint64_t>(curr_pos);
_graph_store->store(writer, _nd + _num_frozen_pts, _num_frozen_pts, _start, curr_pos);
}

// Save metadata.
// Save metadata.
{
writer.seekp(meta_data_start, writer.beg);
writer.write((char *)&metadata, sizeof(SaveLoadMetaDataV1));
writer.close();
}

std::cout << "Metadata Saved. data_offset: " << std::to_string(metadata.data_offset)
<< " delete_list_offset: " << std::to_string(metadata.delete_list_offset)
<< " tag_offset: " << std::to_string(metadata.tags_offset)
<< " graph_offset: " << std::to_string(metadata.graph_offset) << std::endl;
writer.close();

diskann::cout << "Metadata Saved. data_offset: " << std::to_string(metadata.data_offset)
<< " delete_list_offset: " << std::to_string(metadata.delete_list_offset)
<< " tag_offset: " << std::to_string(metadata.tags_offset)
<< " graph_offset: " << std::to_string(metadata.graph_offset) << std::endl;
}
else
{
std::cout << "Save index in a single file currently only support _save_as_one_file_version = 1. "
diskann::cout << "Save index in a single file currently only support _save_as_one_file_version = 1. "
"Not saving the index."
<< std::endl;
}
Expand All @@ -496,7 +499,7 @@ void Index<T, TagT, LabelT>::save(const char *filename, bool compact_before_save
// _max_points.
reposition_frozen_point_to_end();

std::cout << "Time taken for save: " << timer.elapsed() / 1000000.0 << "s." << std::endl;
diskann::cout << "Time taken for save: " << timer.elapsed() / 1000000.0 << "s." << std::endl;
}

#ifdef EXEC_ENV_OLS
Expand Down Expand Up @@ -587,7 +590,6 @@ size_t Index<T, TagT, LabelT>::load_data(std::string filename, size_t offset)
diskann::cerr << stream.str() << std::endl;
throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}

if (file_num_points > _max_points + _num_frozen_pts)
{
// update and tag lock acquired in load() before calling load_data
Expand All @@ -597,7 +599,7 @@ size_t Index<T, TagT, LabelT>::load_data(std::string filename, size_t offset)
#ifdef EXEC_ENV_OLS
// REFACTOR TODO: Must figure out how to support aligned reader in a clean
// manner.
copy_aligned_data_from_file<T>(reader, _data, file_num_points, file_dim, _data_store->get_aligned_dim(), offset);
_data_store->load(reader, offset); // offset == 0.
#else
_data_store->load(filename, offset); // offset == 0.
#endif
Expand Down Expand Up @@ -656,7 +658,6 @@ void Index<T, TagT, LabelT>::load(const char *filename, uint32_t num_threads, ui
#endif
if (!_load_from_one_file)
{
std::cout << "DLVS should not load multiple files." << std::endl;
// For DLVS Store, we will not support saving the index in multiple
// files.
#ifndef EXEC_ENV_OLS
Expand All @@ -680,41 +681,33 @@ void Index<T, TagT, LabelT>::load(const char *filename, uint32_t num_threads, ui
{
if (_filtered_index)
{
std::cout << "Single index file saving/loading support for filtered index is not yet "
diskann::cout << "Single index file saving/loading support for filtered index is not yet "
"enabled. Not loading the index."
<< std::endl;
}
else
{
std::cout << "Start loading index from one file." << std::endl;
uint64_t version = 0;

#ifdef EXEC_ENV_OLS
std::cout << "Start Version Check." << std::endl;

std::vector<AlignedRead> readReqs;
AlignedRead readReq;
uint64_t buf[1];
uint8_t buf[sizeof(uint64_t)] = {};

readReq.buf = buf;
readReq.buf = (void *) buf;
readReq.offset = 0;
readReq.len = sizeof(uint64_t);
readReqs.push_back(readReq);
std::cout << "Load Version request is ready." << std::endl;

reader.read(readReqs, ctx); // synchronous
std::cout << "Load Version processed." << std::endl;

if ((*(ctx.m_pRequestsStatus.get()))[0] == IOContext::READ_SUCCESS)
{
version = buf[0];
std::cout << "Load Version is " << std::to_string(version) << "." << std::endl;
memcpy((void *)&version, (void *)buf, sizeof(uint64_t));
}
else
{
std::stringstream str;
str << "Could not read binary metadata from index file at offset: 0." << std::endl;
std::cout << str.str() << std::endl;
throw diskann::ANNException(str.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}

Expand All @@ -725,70 +718,75 @@ void Index<T, TagT, LabelT>::load(const char *filename, uint32_t num_threads, ui

if (version == _load_from_one_file_version)
{
std::cout << "Version Check passed, start loading meta data." << std::endl;
SaveLoadMetaDataV1 metadata;

#ifdef EXEC_ENV_OLS
std::vector<AlignedRead> metadata_readReqs;
AlignedRead metadata_readReq;
uint64_t metadata_buf[sizeof(SaveLoadMetaDataV1)];
uint8_t metadata_buf[sizeof(SaveLoadMetaDataV1)] = {};

metadata_readReq.buf = metadata_buf;
metadata_readReq.offset = sizeof(uint64_t);
metadata_readReq.buf = (void*) metadata_buf;
metadata_readReq.offset = sizeof(version);
metadata_readReq.len = sizeof(SaveLoadMetaDataV1);
metadata_readReqs.push_back(metadata_readReq);
reader.read(metadata_readReqs, ctx); // synchronous
if ((*(ctx.m_pRequestsStatus))[0] == IOContext::READ_SUCCESS)
{
memcpy((void *)&metadata, (void *)buf, sizeof(SaveLoadMetaDataV1));
memcpy((void *)&metadata, (void *)metadata_buf, sizeof(SaveLoadMetaDataV1));
}

std::cout << "Metadata loaded. data_offset: " << std::to_string(metadata.data_offset)
<< " delete_list_offset: " << std::to_string(metadata.delete_list_offset)
<< " tag_offset: " << std::to_string(metadata.tags_offset)
<< " graph_offset: " << std::to_string(metadata.graph_offset)
<< std::endl;
diskann::cout << "Metadata loaded. data_offset: " << std::to_string(metadata.data_offset)
<< " delete_list_offset: " << std::to_string(metadata.delete_list_offset)
<< " tag_offset: " << std::to_string(metadata.tags_offset)
<< " graph_offset: " << std::to_string(metadata.graph_offset)
<< std::endl;

#else
reader.read((char *)&metadata, sizeof(SaveLoadMetaDataV1));
#endif
// Load data
#ifdef EXEC_ENV_OLS
load_data(reader, metadata.data_offset);
data_file_num_pts = load_data(reader, metadata.data_offset);

#else
load_data(filename, metadata.data_offset);
data_file_num_pts = load_data(filename, metadata.data_offset);
#endif

// Load delete list when presents.
if (metadata.data_offset != metadata.delete_list_offset)
// Load delete list when presents.
if (metadata.delete_list_offset != metadata.tags_offset)
{
#ifdef EXEC_ENV_OLS
load_delete_set(reader, metadata.delete_list_offset);
#else
load_delete_set(filename, metadata.delete_list_offset);
#endif
}

// Load tags when presents.
if (metadata.delete_list_offset != metadata.tags_offset)
if (metadata.tags_offset != metadata.graph_offset)
{
#ifdef EXEC_ENV_OLS
load_tags(reader, metadata.tags_offset);
tags_file_num_pts = load_tags(reader, metadata.tags_offset);
#else
load_tags(filename, metadata.tags_offset);
tags_file_num_pts = load_tags(filename, metadata.tags_offset);
#endif
}
// Load graph
#ifdef EXEC_ENV_OLS
load_graph(reader, metadata.graph_offset);

graph_num_pts = load_graph(reader, data_file_num_pts, metadata.graph_offset);
#else
load_graph(filename, metadata.graph_offset);
graph_num_pts = load_graph(filename, data_file_num_pts, metadata.graph_offset);
#endif
}
else
{
std::cout << "load index from a single file currently only support _save_as_one_file_version = 1. "
"Not loading the index."
<< std::endl;
std::stringstream stream;
stream << "load index from a single file currently only support _save_as_one_file_version = 1 and _save_as_one_file_version = 1. "
<< "Not loading the index."
<< std::endl;
diskann::cerr << stream.str() << std::endl;

throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}
}
}
Expand All @@ -802,6 +800,8 @@ void Index<T, TagT, LabelT>::load(const char *filename, uint32_t num_threads, ui
diskann::cerr << stream.str() << std::endl;
throw diskann::ANNException(stream.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}


#ifndef EXEC_ENV_OLS
if (file_exists(labels_file))
{
Expand Down Expand Up @@ -849,6 +849,7 @@ void Index<T, TagT, LabelT>::load(const char *filename, uint32_t num_threads, ui
}
}
#endif

_nd = data_file_num_pts - _num_frozen_pts;
_empty_slots.clear();
_empty_slots.reserve(_max_points);
Expand Down Expand Up @@ -2013,7 +2014,7 @@ void Index<T, TagT, LabelT>::build(const std::string &data_file, const size_t nu
this->build_filtered_index(data_file.c_str(), labels_file_to_use, points_to_load);
}
std::chrono::duration<double> diff = std::chrono::high_resolution_clock::now() - s;
std::cout << "Indexing time: " << diff.count() << "\n";
diskann::cout << "Indexing time: " << diff.count() << "\n";
}

template <typename T, typename TagT, typename LabelT>
Expand Down
49 changes: 26 additions & 23 deletions src/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ template <typename T> void load_bin(AlignedFileReader &reader, T *&data, size_t
{
// Code assumes that the reader is already setup correctly.
get_bin_metadata(reader, npts, ndim, offset);

data = new T[npts * ndim];

size_t data_size = npts * ndim * sizeof(T);
Expand Down Expand Up @@ -333,7 +334,7 @@ void copy_aligned_data_from_file(AlignedFileReader &reader, T *&data, size_t &np
{
if (data == nullptr)
{
diskann::cerr << "Memory was not allocated for " << data << " before calling the load function. Exiting..."
diskann::cout << "Memory was not allocated for " << data << " before calling the load function. Exiting..."
<< std::endl;
throw diskann::ANNException("Null pointer passed to copy_aligned_data_from_file()", -1, __FUNCSIG__, __FILE__,
__LINE__);
Expand Down Expand Up @@ -391,29 +392,31 @@ template <typename T> void read_array(AlignedFileReader &reader, T *data, size_t
if (data == nullptr)
{
throw diskann::ANNException("read_array requires an allocated buffer.", -1);
if (size * sizeof(T) > MAX_REQUEST_SIZE)
{
std::stringstream ss;
ss << "Cannot read more than " << MAX_REQUEST_SIZE
<< " bytes. Current request size: " << std::to_string(size) << " sizeof(T): " << sizeof(T) << std::endl;
throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}
std::vector<AlignedRead> read_requests;
AlignedRead read_req;
read_req.buf = data;
read_req.len = size * sizeof(T);
read_req.offset = offset;
read_requests.push_back(read_req);
IOContext &ctx = reader.get_ctx();
reader.read(read_requests, ctx);
}

if ((*(ctx.m_pRequestsStatus))[0] != IOContext::READ_SUCCESS)
{
std::stringstream ss;
ss << "Failed to read_array() of size: " << size * sizeof(T) << " at offset: " << offset << " from reader. "
<< std::endl;
throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}
if (size * sizeof(T) > MAX_REQUEST_SIZE)
{
std::stringstream ss;
ss << "Cannot read more than " << MAX_REQUEST_SIZE
<< " bytes. Current request size: " << std::to_string(size) << " sizeof(T): " << sizeof(T) << std::endl;
throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}

std::vector<AlignedRead> read_requests;
AlignedRead read_req;
read_req.buf = data;
read_req.len = size * sizeof(T);
read_req.offset = offset;
read_requests.push_back(read_req);
IOContext &ctx = reader.get_ctx();
reader.read(read_requests, ctx);

if ((*(ctx.m_pRequestsStatus))[0] != IOContext::READ_SUCCESS)
{
std::stringstream ss;
ss << "Failed to read_array() of size: " << size * sizeof(T) << " at offset: " << offset << " from reader. "
<< std::endl;
throw diskann::ANNException(ss.str(), -1, __FUNCSIG__, __FILE__, __LINE__);
}
}

Expand Down

0 comments on commit 4467272

Please sign in to comment.