Skip to content
This repository has been archived by the owner on Aug 10, 2019. It is now read-only.

Added Heavy Hitter Sketch #91

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
237 changes: 237 additions & 0 deletions kvs/include/adaptive_heavy_hitters.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
// Copyright 2018 U.C. Berkeley RISE Lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <stdbool.h>
#include <stdio.h>
#include <chrono>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these imports for? Some of them seem unrelated to what you're doing here.

#include <climits>
#include <iomanip>
#include <thread>
#include <utility>
#include "heavy_hitters.hpp"

#define alpha 0.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this defined outside of the class and gamma and epsilon inside?


typedef std::string Key;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be re-typedefed. It's in includes/types.hpp, I think.


class AdaptiveThresholdHeavyHitters {
protected:
HeavyHittersSketch* hh_sketch;
float threshold_percent = 0.01;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments should explain the role of these variables.

float gamma = 4127;
float epsilon = 0.001;

std::unordered_set<Key> total_set;

std::unordered_map<Key, int> hot_map;
std::unordered_map<Key, int> cold_map;

int hot_threshold;
int cold_threshold;

std::chrono::system_clock::time_point last_update_time;

void set_values() {
int B_arg = (int)(ceil(exp(1) / epsilon));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't these be the same every time?

int l_arg = (int)(ceil(log(gamma)));
int** hash_functions_arg = get_hash_functions(l_arg);
std::unordered_set<Key> reset_total_set;
std::unordered_map<Key, int> reset_hot_map;
std::unordered_map<Key, int> reset_cold_map;

total_set = reset_total_set;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this meant to clear the collection? There are methods for doing that.

hot_map = reset_hot_map;
cold_map = reset_cold_map;

hh_sketch = new HeavyHittersSketch(hash_functions_arg, l_arg, B_arg);

hot_threshold = 0;
cold_threshold = INT_MIN;

last_update_time = std::chrono::system_clock::now();
};

int partition(int list[], int left, int right, int pivotIndex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this method doing? It's not super clear.

int pivotValue = list[pivotIndex];

int tmp = list[pivotIndex];
list[pivotIndex] = list[right];
list[right] = tmp;

int storeIndex = left;
for (int i = left; i < right - 1; i++) {
if (list[i] < pivotValue) {
tmp = list[storeIndex];
list[storeIndex] = list[i];
list[i] = list[storeIndex];

storeIndex++;
}
}

tmp = list[right];
list[right] = list[storeIndex];
list[storeIndex] = list[right];
return storeIndex;
};

int select(int list[], int left, int right, int k) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment about clarity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like you're doing binary search.

if (left == right) {
return list[left];
}

int pivotIndex = right;

pivotIndex = partition(list, left, right, pivotIndex);

if (k == pivotIndex) {
return list[k];
} else if (k < pivotIndex) {
return select(list, left, pivotIndex - 1, k);
} else {
return select(list, pivotIndex + 1, right, k);
}
};

void update_hot(void) {
int* vals;
std::unordered_map<Key, int> new_hot_map;
std::vector<int> val_vec;
int val_size = 0;

for (auto kv : hot_map) {
val_size = val_size + 1;
val_vec.push_back(kv.second);
}

vals = (int*)(&val_vec[0]);

int median = select(vals, 0, val_size - 1, 1);

for (auto kv : hot_map) {
if (kv.second > median) {
new_hot_map[kv.first] = kv.second;
}
}

hot_map = new_hot_map;
hot_threshold = median;
};

void update_cold(void) {
int* vals;
std::unordered_map<Key, int> new_cold_map;
std::vector<int> val_vec;
int val_size = 0;

for (auto kv : cold_map) {
val_size = val_size + 1;
val_vec.push_back(kv.second);
}

vals = &val_vec[0];

int median = (-1 * select(vals, 0, val_size - 1, 1));

for (auto kv : cold_map) {
if ((-1 * kv.second) >= median) {
new_cold_map[kv.first] = kv.second;
}
}

cold_map = new_cold_map;
cold_threshold = median;
};

int** get_hash_functions(int l) {
int** hash_functions;
srand(time(NULL));
hash_functions = new int*[l];
for (unsigned i = 0; i < l; i++) {
hash_functions[i] = new int[2];
hash_functions[i][0] =
int(float(rand()) * float(large_prime) / float(RAND_MAX) + 1);
hash_functions[i][1] =
int(float(rand()) * float(large_prime) / float(RAND_MAX) + 1);
}
return hash_functions;
};

public:
AdaptiveThresholdHeavyHitters() { set_values(); };

void report_key(Key key) {
total_set.insert(key);

int new_count = (*hh_sketch).update(key);

if (new_count > hot_threshold) {
hot_map[key] = new_count;
}

if ((-1 * new_count) >= cold_threshold) {
cold_map[key] = new_count;
} else {
if (cold_map.find(key) != cold_map.end()) {
cold_map[key] = new_count;
}
}

std::chrono::system_clock::time_point now =
std::chrono::system_clock::now();
std::chrono::duration<double, std::milli> passed = now - last_update_time;
double passed_num = passed.count();

if (passed_num > 10) {
int total_size = total_set.size();
int hot_size = hot_map.size();
if (hot_size > (threshold_percent * total_size)) {
update_hot();
}

int cold_size = cold_map.size();
if (cold_size > (threshold_percent * total_size)) {
update_cold();
}
last_update_time = now;
}
};

int get_key_count(Key key) { return (*hh_sketch).estimate(key); };

std::unordered_map<Key, int> get_hot_map(void) { return hot_map; };

std::unordered_map<Key, int> get_cold_map(void) { return cold_map; };

int get_hot_threshold() { return hot_threshold; };

int get_cold_threshold() { return cold_threshold; };

int get_total_size() { return total_set.size(); };

void reset() { set_values(); };

// static void reset_threshold_percent(float new_threshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commented out code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

// threshold_percent = new_threshold;
// };

// static void reset_error(float new_epsilon) {
// epsilon = new_epsilon;
// };

// static void update_gamma(int total_hits, int num_hh) {
// float avg_hit = (1.0 * total_hits) / num_hh;
// gamma = (alpha * avg_hit) + ((1 - alpha) * gamma);
// };
};
105 changes: 105 additions & 0 deletions kvs/include/heavy_hitters.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2018 U.C. Berkeley RISE Lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <stdlib.h>
#include <cmath>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <unordered_map>
#include <unordered_set>
#include <vector>

typedef std::string Key;

const long large_prime = 4294967311l;

class HeavyHittersSketch {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this your code? or copied from some other HH library?

protected:
unsigned l, B;

int **hash_functions;
int **sketch_array;

unsigned long hash_key(Key key) {
unsigned long hash = 5381;
int char_int = 0;
for (unsigned i = 0; i < key.length(); i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you use the for (key: key type syntax rather than int index

char_int = (int)(key.at(i));
hash = ((hash << 5) + hash) + char_int;
}

return hash;
};

unsigned location(int i, unsigned long hash) {
return (
unsigned)(((long)hash_functions[i][0] * hash + hash_functions[i][1]) %
large_prime % B);
};

void set_values(int **hash_functions_arg, int l_arg, int B_arg) {
hash_functions = hash_functions_arg;
l = l_arg;
B = B_arg;

sketch_array = new int *[l];
for (unsigned i = 0; i < l; i++) {
sketch_array[i] = new int[B];
for (unsigned j = 0; j < B; j++) {
sketch_array[i][j] = 0;
}
}
};

public:
HeavyHittersSketch(int **hash_functions_arg, int l_arg, int B_arg) {
set_values(hash_functions_arg, l_arg, B_arg);
};

int update(Key key) {
unsigned long hash = hash_key(key);

int mincount = 0;
unsigned hashed_location = 0;
for (unsigned i = 0; i < l; i++) {
hashed_location = location(i, hash);
sketch_array[i][hashed_location] = sketch_array[i][hashed_location] + 1;
if ((sketch_array[i][hashed_location] < mincount) or (i == 0)) {
mincount = sketch_array[i][hashed_location];
}
}

return mincount;
};

int estimate(Key key) {
unsigned long hash = hash_key(key);

int mincount = 0;
unsigned hashed_location = 0;
for (unsigned i = 0; i < l; i++) {
hashed_location = location(i, hash);
if ((sketch_array[i][hashed_location] < mincount) or (i == 0)) {
mincount = sketch_array[i][hashed_location];
}
}

return mincount;
};

void reset(int **hash_functions_arg, int l_arg, int B_arg) {
set_values(hash_functions_arg, l_arg, B_arg);
};
};
6 changes: 4 additions & 2 deletions kvs/include/kvs/kvs_handlers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ void user_request_handler(
map<Key, std::multiset<TimePoint>>& key_access_tracker,
map<Key, KeyProperty>& stored_key_map,
map<Key, KeyReplication>& key_replication_map, set<Key>& local_changeset,
ServerThread& wt, SerializerMap& serializers, SocketCache& pushers);
ServerThread& wt, SerializerMap& serializers, SocketCache& pushers,
AdaptiveThresholdHeavyHitters* sketch);

void gossip_handler(unsigned& seed, string& serialized,
map<TierId, GlobalHashRing>& global_hash_rings,
Expand All @@ -74,7 +75,8 @@ void replication_response_handler(
map<Key, std::multiset<TimePoint>>& key_access_tracker,
map<Key, KeyProperty>& stored_key_map,
map<Key, KeyReplication>& key_replication_map, set<Key>& local_changeset,
ServerThread& wt, SerializerMap& serializers, SocketCache& pushers);
ServerThread& wt, SerializerMap& serializers, SocketCache& pushers,
AdaptiveThresholdHeavyHitters* sketch);

void replication_change_handler(Address public_ip, Address private_ip,
unsigned thread_id, unsigned& seed, logger log,
Expand Down
1 change: 1 addition & 0 deletions kvs/include/kvs_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef KVS_INCLUDE_KVS_COMMON_HPP_
#define KVS_INCLUDE_KVS_COMMON_HPP_

#include "adaptive_heavy_hitters.hpp"
#include "kvs_types.hpp"

const unsigned kMetadataReplicationFactor = 1;
Expand Down
11 changes: 10 additions & 1 deletion kvs/include/metadata.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ inline bool is_metadata(Key key) {
// NOTE: This needs to be here because it needs the definition of TierMetadata
extern map<TierId, TierMetadata> kTierMetadata;

enum MetadataType { replication, server_stats, key_access, key_size };
enum MetadataType {
replication,
server_stats,
key_access,
key_size,
key_access_hot,
key_access_cold
};

inline Key get_metadata_key(const ServerThread& st, unsigned tier_id,
unsigned thread_num, MetadataType type) {
Expand All @@ -106,6 +113,8 @@ inline Key get_metadata_key(const ServerThread& st, unsigned tier_id,
switch (type) {
case MetadataType::server_stats: metadata_type = "stats"; break;
case MetadataType::key_access: metadata_type = "access"; break;
case MetadataType::key_access_hot: metadata_type = "hot_access"; break;
case MetadataType::key_access_cold: metadata_type = "cold_access"; break;
case MetadataType::key_size: metadata_type = "size"; break;
default:
return ""; // this should never happen; see note below about
Expand Down
Loading