Skip to content

Commit

Permalink
[Fix] loss data on a big batch
Browse files Browse the repository at this point in the history
  • Loading branch information
rhdong committed Oct 19, 2022
1 parent f43cea8 commit e452a8d
Show file tree
Hide file tree
Showing 2 changed files with 155 additions and 12 deletions.
15 changes: 10 additions & 5 deletions include/merlin_hashtable.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class HashTable {
return;
}

if (!reach_max_capacity_ && fast_load_factor() > options_.max_load_factor) {
while (!reach_max_capacity_ &&
fast_load_factor(n) > options_.max_load_factor) {
reserve(capacity() * 2);
}

Expand Down Expand Up @@ -410,7 +411,8 @@ class HashTable {
return;
}

if (!reach_max_capacity_ && fast_load_factor() > options_.max_load_factor) {
while (!reach_max_capacity_ &&
fast_load_factor(n) > options_.max_load_factor) {
reserve(capacity() * 2);
}

Expand Down Expand Up @@ -1125,11 +1127,13 @@ class HashTable {
* inaccurate but within an error in 1% empirically which is enough for
* capacity control. But it's not suitable for end-users.
*
* @param delta A hypothetical upcoming change on table size.
* @param stream The CUDA stream used to execute the operation.
*
* @return The evaluated load factor
*/
inline float fast_load_factor(cudaStream_t stream = 0) const {
inline float fast_load_factor(size_type delta = 0,
cudaStream_t stream = 0) const {
size_t h_size = 0;

std::shared_lock<std::shared_timed_mutex> lock(mutex_, std::defer_lock);
Expand All @@ -1149,8 +1153,9 @@ class HashTable {
thrust::plus<int>());

CudaCheckError();
return static_cast<float>((h_size * 1.0) /
(options_.max_bucket_size * N * 1.0));
return static_cast<float>((delta * 1.0) / (capacity() * 1.0) +
(h_size * 1.0) /
(options_.max_bucket_size * N * 1.0));
}

inline void check_evict_strategy(const meta_type* metas) {
Expand Down
152 changes: 145 additions & 7 deletions tests/merlin_hashtable_test.cc.cu
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void create_random_keys(K* h_keys, M* h_metas, V* h_vectors, int KEY_NUM) {
for (const K num : numbers) {
h_keys[i] = num;
if (h_metas != nullptr) {
h_metas[i] = getTimestamp();
h_metas[i] = num;
}
if (h_vectors != nullptr) {
for (size_t j = 0; j < DIM; j++) {
Expand Down Expand Up @@ -407,7 +407,7 @@ void test_erase_if_pred() {
void test_rehash() {
constexpr uint64_t BUCKET_MAX_SIZE = 128ul;
constexpr uint64_t INIT_CAPACITY = BUCKET_MAX_SIZE;
constexpr uint64_t MAX_CAPACITY = 2 * INIT_CAPACITY;
constexpr uint64_t MAX_CAPACITY = 4 * INIT_CAPACITY;
constexpr uint64_t KEY_NUM = BUCKET_MAX_SIZE * 2;
constexpr uint64_t TEST_TIMES = 100;
K* h_keys;
Expand Down Expand Up @@ -467,21 +467,21 @@ void test_rehash() {

total_size = table->size(stream);
CUDA_CHECK(cudaDeviceSynchronize());
ASSERT_TRUE(total_size == BUCKET_MAX_SIZE);
ASSERT_EQ(total_size, KEY_NUM);

dump_counter = table->export_batch(table->capacity(), 0, d_keys,
reinterpret_cast<float*>(d_vectors),
d_metas, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_TRUE(dump_counter == BUCKET_MAX_SIZE);
ASSERT_EQ(dump_counter, KEY_NUM);

table->reserve(MAX_CAPACITY, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_TRUE(table->capacity() == MAX_CAPACITY);
ASSERT_EQ(table->capacity(), MAX_CAPACITY);

total_size = table->size(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_TRUE(total_size == BUCKET_MAX_SIZE);
ASSERT_EQ(total_size, KEY_NUM);

table->find(BUCKET_MAX_SIZE, d_keys, reinterpret_cast<float*>(d_vectors),
d_found, d_metas, stream);
Expand Down Expand Up @@ -534,6 +534,140 @@ void test_rehash() {
CudaCheckError();
}

void test_rehash_on_big_batch() {
constexpr uint64_t INIT_CAPACITY = 1024;
constexpr uint64_t MAX_CAPACITY = 16 * 1024;
constexpr uint64_t INIT_KEY_NUM = 1024;
constexpr uint64_t KEY_NUM = 2048;
K* h_keys;
M* h_metas;
Vector* h_vectors;
bool* h_found;

TableOptions options;

options.init_capacity = INIT_CAPACITY;
options.max_capacity = MAX_CAPACITY;
options.max_bucket_size = 128;
options.max_load_factor = 0.6;
options.max_hbm_for_vectors = nv::merlin::GB(16);
options.evict_strategy = nv::merlin::EvictStrategy::kCustomized;

CUDA_CHECK(cudaMallocHost(&h_keys, KEY_NUM * sizeof(K)));
CUDA_CHECK(cudaMallocHost(&h_metas, KEY_NUM * sizeof(M)));
CUDA_CHECK(cudaMallocHost(&h_vectors, KEY_NUM * sizeof(Vector)));
CUDA_CHECK(cudaMallocHost(&h_found, KEY_NUM * sizeof(bool)));

K* d_keys;
M* d_metas = nullptr;
Vector* d_vectors;
bool* d_found;
size_t dump_counter = 0;

CUDA_CHECK(cudaMalloc(&d_keys, KEY_NUM * sizeof(K)));
CUDA_CHECK(cudaMalloc(&d_metas, KEY_NUM * sizeof(M)));
CUDA_CHECK(cudaMalloc(&d_vectors, KEY_NUM * sizeof(Vector)));
CUDA_CHECK(cudaMalloc(&d_found, KEY_NUM * sizeof(bool)));

cudaStream_t stream;
CUDA_CHECK(cudaStreamCreate(&stream));

uint64_t total_size = 0;
uint64_t expected_size = 0;
std::unique_ptr<Table> table = std::make_unique<Table>();
table->init(options);

create_random_keys<K, M, float, DIM>(
h_keys, h_metas, reinterpret_cast<float*>(h_vectors), KEY_NUM);

CUDA_CHECK(
cudaMemcpy(d_keys, h_keys, KEY_NUM * sizeof(K), cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_metas, h_metas, KEY_NUM * sizeof(M),
cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemcpy(d_vectors, h_vectors, KEY_NUM * sizeof(Vector),
cudaMemcpyHostToDevice));
CUDA_CHECK(cudaMemset(d_found, 0, KEY_NUM * sizeof(bool)));

total_size = table->size(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_EQ(total_size, 0);

table->insert_or_assign(INIT_KEY_NUM, d_keys,
reinterpret_cast<float*>(d_vectors), d_metas, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
expected_size = INIT_KEY_NUM;

total_size = table->size(stream);
CUDA_CHECK(cudaDeviceSynchronize());
ASSERT_EQ(total_size, expected_size);
ASSERT_EQ(table->capacity(), (INIT_CAPACITY * 2));

table->insert_or_assign(KEY_NUM, d_keys, reinterpret_cast<float*>(d_vectors),
d_metas, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
expected_size = KEY_NUM;

total_size = table->size(stream);
CUDA_CHECK(cudaDeviceSynchronize());
ASSERT_EQ(total_size, expected_size);
ASSERT_EQ(table->capacity(), KEY_NUM * 4);

dump_counter =
table->export_batch(table->capacity(), 0, d_keys,
reinterpret_cast<float*>(d_vectors), d_metas, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_EQ(dump_counter, expected_size);

table->find(KEY_NUM, d_keys, reinterpret_cast<float*>(d_vectors), d_found,
d_metas, stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
int found_num = 0;

CUDA_CHECK(cudaMemset(h_found, 0, KEY_NUM * sizeof(bool)));
CUDA_CHECK(cudaMemset(h_metas, 0, KEY_NUM * sizeof(M)));
CUDA_CHECK(cudaMemset(h_vectors, 0, KEY_NUM * sizeof(Vector)));
CUDA_CHECK(
cudaMemcpy(h_keys, d_keys, KEY_NUM * sizeof(K), cudaMemcpyDeviceToHost));
CUDA_CHECK(cudaMemcpy(h_found, d_found, KEY_NUM * sizeof(bool),
cudaMemcpyDeviceToHost));
CUDA_CHECK(cudaMemcpy(h_metas, d_metas, KEY_NUM * sizeof(M),
cudaMemcpyDeviceToHost));
CUDA_CHECK(cudaMemcpy(h_vectors, d_vectors, KEY_NUM * sizeof(Vector),
cudaMemcpyDeviceToHost));
for (int i = 0; i < KEY_NUM; i++) {
if (h_found[i]) {
found_num++;
ASSERT_TRUE(h_metas[i] == h_keys[i]);
for (int j = 0; j < DIM; j++) {
ASSERT_TRUE(h_vectors[i].value[j] ==
static_cast<float>(h_keys[i] * 0.00001));
}
}
}
ASSERT_TRUE(found_num == KEY_NUM);

table->clear(stream);
total_size = table->size(stream);
CUDA_CHECK(cudaStreamSynchronize(stream));
ASSERT_TRUE(total_size == 0);
CUDA_CHECK(cudaStreamDestroy(stream));

CUDA_CHECK(cudaMemcpy(h_vectors, d_vectors, KEY_NUM * sizeof(Vector),
cudaMemcpyDeviceToHost));

CUDA_CHECK(cudaFreeHost(h_keys));
CUDA_CHECK(cudaFreeHost(h_metas));
CUDA_CHECK(cudaFreeHost(h_found));

CUDA_CHECK(cudaFree(d_keys));
CUDA_CHECK(cudaFree(d_metas))
CUDA_CHECK(cudaFree(d_vectors));
CUDA_CHECK(cudaFree(d_found));
CUDA_CHECK(cudaDeviceSynchronize());

CudaCheckError();
}

void test_dynamic_rehash_on_multi_threads() {
constexpr uint64_t BUCKET_MAX_SIZE = 128ul;
constexpr uint64_t INIT_CAPACITY = 1 * 1024;
Expand Down Expand Up @@ -719,14 +853,15 @@ void test_export_batch_if() {
reinterpret_cast<float*>(d_vectors), d_metas,
stream);

CUDA_CHECK(cudaStreamSynchronize(stream));
CUDA_CHECK(cudaMemcpy(&h_dump_counter, d_dump_counter, sizeof(size_t),
cudaMemcpyDeviceToHost));

size_t expected_export_count = 0;
for (int i = 0; i < KEY_NUM; i++) {
if (h_metas[i] > threshold) expected_export_count++;
}
ASSERT_TRUE(expected_export_count == h_dump_counter);
ASSERT_EQ(expected_export_count, h_dump_counter);

CUDA_CHECK(cudaMemset(h_metas, 0, KEY_NUM * sizeof(M)));
CUDA_CHECK(cudaMemset(h_vectors, 0, KEY_NUM * sizeof(Vector)));
Expand Down Expand Up @@ -907,6 +1042,9 @@ void test_basic_for_cpu_io() {
TEST(MerlinHashTableTest, test_basic) { test_basic(); }
TEST(MerlinHashTableTest, test_erase_if_pred) { test_erase_if_pred(); }
TEST(MerlinHashTableTest, test_rehash) { test_rehash(); }
TEST(MerlinHashTableTest, test_rehash_on_big_batch) {
test_rehash_on_big_batch();
}
TEST(MerlinHashTableTest, test_dynamic_rehash_on_multi_threads) {
test_dynamic_rehash_on_multi_threads();
}
Expand Down

0 comments on commit e452a8d

Please sign in to comment.