Skip to content

Commit

Permalink
parallelize index building
Browse files Browse the repository at this point in the history
  • Loading branch information
mahmudhera committed Nov 15, 2024
1 parent 653a247 commit c58282f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 13 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,6 @@ src/cpp/main.o
src/cpp/utils.o
src/yacht/run_yacht_train_core
.vscode/settings.json
src/yacht/run_compute_similarity
src/cpp/yacht_train_core.o
src/cpp/compute_similarity.o
2 changes: 1 addition & 1 deletion src/cpp/compute_similarity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ int main(int argc, char** argv) {
// compute the index from the target sketches
cout << "Building index from target sketches..." << endl;
unordered_map<hash_t, vector<int>> hash_index_target;
compute_index_from_sketches(sketches_target, hash_index_target);
compute_index_from_sketches(sketches_target, hash_index_target, args.number_of_threads);

// compute the similarity matrix
cout << "Computing similarity matrix..." << endl;
Expand Down
63 changes: 53 additions & 10 deletions src/cpp/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,65 @@ std::vector<hash_t> read_min_hashes(const std::string& json_filename) {
}


void compute_index_from_sketches(std::vector<std::vector<hash_t>>& sketches, std::unordered_map<hash_t, std::vector<int>>& hash_index) {
// create the index using all the hashes
for (uint i = 0; i < sketches.size(); i++) {
/*
Is there a way to make this function more efficient?
Say we have n threads
Each thread handles some sketches
To store the hash value h,
we will need to lock mutex_list[ get_mutex_index(h) ]
then push_back the index to the vector
then unlock the mutex
should be many times faster??
*/



void compute_index_from_sketches_one_chunk( int sketch_index_start, int sketch_index_end,
std::vector<std::vector<hash_t>>& sketches,
std::unordered_map<hash_t, std::vector<int>>& hash_index,
std::mutex * mutex_list, int num_mutexes = 1024) {

for (int i = sketch_index_start; i < sketch_index_end; i++) {
for (uint j = 0; j < sketches[i].size(); j++) {
hash_t hash = sketches[i][j];
if (hash_index.find(hash) == hash_index.end()) {
hash_index[hash] = std::vector<int>();
hash_t hash_value = sketches[i][j];
int mutex_index = num_mutexes * (long double)hash_value / 0xFFFFFFFFFFFFFFFF;
mutex_list[mutex_index].lock();
if (hash_index.find(hash_value) == hash_index.end()) {
hash_index[hash_value] = std::vector<int>();
}
hash_index[hash].push_back(i);
hash_index[hash_value].push_back(i);
mutex_list[mutex_index].unlock();
}
}

size_t num_hashes = hash_index.size();

// cannot remove hashes that are in only one sketch, because
// we do not have an index for the query sketches
}



void compute_index_from_sketches(std::vector<std::vector<hash_t>>& sketches, std::unordered_map<hash_t, std::vector<int>>& hash_index, const int num_threads) {

// create mutexes
int num_mutexes = 1024;
std::mutex * mutex_list = new std::mutex[num_mutexes];

// create threads
int num_sketches = sketches.size();
int chunk_size = num_sketches / num_threads;
std::vector<std::thread> threads;
for (int i = 0; i < num_threads; i++) {
int start_index = i * chunk_size;
int end_index = (i == num_threads - 1) ? num_sketches : (i + 1) * chunk_size;
threads.push_back(std::thread(compute_index_from_sketches_one_chunk,
start_index, end_index,
std::ref(sketches), std::ref(hash_index),
mutex_list, num_mutexes));
}

// join threads
for (int i = 0; i < num_threads; i++) {
threads[i].join();
}

}

Expand Down
5 changes: 4 additions & 1 deletion src/cpp/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,11 @@ std::vector<hash_t> read_min_hashes(const std::string& sketch_path);
*
* @param sketches The sketches
* @param hash_index The reference to the hash index (where the index will be stored)
* @param num_threads The number of threads to use
*/
void compute_index_from_sketches(std::vector<std::vector<hash_t>>& sketches, std::unordered_map<hash_t, std::vector<int>>& hash_index);
void compute_index_from_sketches(std::vector<std::vector<hash_t>>& sketches,
std::unordered_map<hash_t, std::vector<int>>& hash_index,
int num_threads);



Expand Down
2 changes: 1 addition & 1 deletion src/cpp/yacht_train_core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ int main(int argc, char *argv[]) {
// ****************************************************************
auto index_build_start = chrono::high_resolution_clock::now();
cout << "Building index from sketches..." << endl;
compute_index_from_sketches(sketches, hash_index);
compute_index_from_sketches(sketches, hash_index, arguments.number_of_threads);
auto index_build_end = chrono::high_resolution_clock::now();
auto index_build_duration = chrono::duration_cast<chrono::milliseconds>(index_build_end - index_build_start);
cout << "Time taken to build index: " << index_build_duration.count() << " milliseconds" << endl;
Expand Down

0 comments on commit c58282f

Please sign in to comment.