diff --git a/kvs/include/adaptive_heavy_hitters.hpp b/kvs/include/adaptive_heavy_hitters.hpp new file mode 100644 index 00000000..089cd95f --- /dev/null +++ b/kvs/include/adaptive_heavy_hitters.hpp @@ -0,0 +1,237 @@ +// Copyright 2018 U.C. Berkeley RISE Lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include "heavy_hitters.hpp" + +#define alpha 0.2 + +typedef std::string Key; + +class AdaptiveThresholdHeavyHitters { + protected: + HeavyHittersSketch* hh_sketch; + float threshold_percent = 0.01; + float gamma = 4127; + float epsilon = 0.001; + + std::unordered_set total_set; + + std::unordered_map hot_map; + std::unordered_map cold_map; + + int hot_threshold; + int cold_threshold; + + std::chrono::system_clock::time_point last_update_time; + + void set_values() { + int B_arg = (int)(ceil(exp(1) / epsilon)); + int l_arg = (int)(ceil(log(gamma))); + int** hash_functions_arg = get_hash_functions(l_arg); + std::unordered_set reset_total_set; + std::unordered_map reset_hot_map; + std::unordered_map reset_cold_map; + + total_set = reset_total_set; + hot_map = reset_hot_map; + cold_map = reset_cold_map; + + hh_sketch = new HeavyHittersSketch(hash_functions_arg, l_arg, B_arg); + + hot_threshold = 0; + cold_threshold = INT_MIN; + + last_update_time = std::chrono::system_clock::now(); + }; + + int partition(int list[], int left, int right, int pivotIndex) { + int pivotValue = list[pivotIndex]; + + int tmp = list[pivotIndex]; + list[pivotIndex] = list[right]; + list[right] = tmp; + + int storeIndex = left; + for (int i = left; i < right - 1; i++) { + if (list[i] < pivotValue) { + tmp = list[storeIndex]; + list[storeIndex] = list[i]; + list[i] = list[storeIndex]; + + storeIndex++; + } + } + + tmp = list[right]; + list[right] = list[storeIndex]; + list[storeIndex] = list[right]; + return storeIndex; + }; + + int select(int list[], int left, int right, int k) { + if (left == right) { + return list[left]; + } + + int pivotIndex = right; + + pivotIndex = partition(list, left, right, pivotIndex); + + if (k == pivotIndex) { + return list[k]; + } else if (k < pivotIndex) { + return select(list, left, pivotIndex - 1, k); + } else { + return select(list, pivotIndex + 1, right, k); + } + }; + + void update_hot(void) { + int* vals; + std::unordered_map new_hot_map; + std::vector val_vec; + int val_size = 0; + + for (auto kv : hot_map) { + val_size = val_size + 1; + val_vec.push_back(kv.second); + } + + vals = (int*)(&val_vec[0]); + + int median = select(vals, 0, val_size - 1, 1); + + for (auto kv : hot_map) { + if (kv.second > median) { + new_hot_map[kv.first] = kv.second; + } + } + + hot_map = new_hot_map; + hot_threshold = median; + }; + + void update_cold(void) { + int* vals; + std::unordered_map new_cold_map; + std::vector val_vec; + int val_size = 0; + + for (auto kv : cold_map) { + val_size = val_size + 1; + val_vec.push_back(kv.second); + } + + vals = &val_vec[0]; + + int median = (-1 * select(vals, 0, val_size - 1, 1)); + + for (auto kv : cold_map) { + if ((-1 * kv.second) >= median) { + new_cold_map[kv.first] = kv.second; + } + } + + cold_map = new_cold_map; + cold_threshold = median; + }; + + int** get_hash_functions(int l) { + int** hash_functions; + srand(time(NULL)); + hash_functions = new int*[l]; + for (unsigned i = 0; i < l; i++) { + hash_functions[i] = new int[2]; + hash_functions[i][0] = + int(float(rand()) * float(large_prime) / float(RAND_MAX) + 1); + hash_functions[i][1] = + int(float(rand()) * float(large_prime) / float(RAND_MAX) + 1); + } + return hash_functions; + }; + + public: + AdaptiveThresholdHeavyHitters() { set_values(); }; + + void report_key(Key key) { + total_set.insert(key); + + int new_count = (*hh_sketch).update(key); + + if (new_count > hot_threshold) { + hot_map[key] = new_count; + } + + if ((-1 * new_count) >= cold_threshold) { + cold_map[key] = new_count; + } else { + if (cold_map.find(key) != cold_map.end()) { + cold_map[key] = new_count; + } + } + + std::chrono::system_clock::time_point now = + std::chrono::system_clock::now(); + std::chrono::duration passed = now - last_update_time; + double passed_num = passed.count(); + + if (passed_num > 10) { + int total_size = total_set.size(); + int hot_size = hot_map.size(); + if (hot_size > (threshold_percent * total_size)) { + update_hot(); + } + + int cold_size = cold_map.size(); + if (cold_size > (threshold_percent * total_size)) { + update_cold(); + } + last_update_time = now; + } + }; + + int get_key_count(Key key) { return (*hh_sketch).estimate(key); }; + + std::unordered_map get_hot_map(void) { return hot_map; }; + + std::unordered_map get_cold_map(void) { return cold_map; }; + + int get_hot_threshold() { return hot_threshold; }; + + int get_cold_threshold() { return cold_threshold; }; + + int get_total_size() { return total_set.size(); }; + + void reset() { set_values(); }; + + // static void reset_threshold_percent(float new_threshold) { + // threshold_percent = new_threshold; + // }; + + // static void reset_error(float new_epsilon) { + // epsilon = new_epsilon; + // }; + + // static void update_gamma(int total_hits, int num_hh) { + // float avg_hit = (1.0 * total_hits) / num_hh; + // gamma = (alpha * avg_hit) + ((1 - alpha) * gamma); + // }; +}; diff --git a/kvs/include/heavy_hitters.hpp b/kvs/include/heavy_hitters.hpp new file mode 100644 index 00000000..2f71bee1 --- /dev/null +++ b/kvs/include/heavy_hitters.hpp @@ -0,0 +1,105 @@ +// Copyright 2018 U.C. Berkeley RISE Lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + +typedef std::string Key; + +const long large_prime = 4294967311l; + +class HeavyHittersSketch { + protected: + unsigned l, B; + + int **hash_functions; + int **sketch_array; + + unsigned long hash_key(Key key) { + unsigned long hash = 5381; + int char_int = 0; + for (unsigned i = 0; i < key.length(); i++) { + char_int = (int)(key.at(i)); + hash = ((hash << 5) + hash) + char_int; + } + + return hash; + }; + + unsigned location(int i, unsigned long hash) { + return ( + unsigned)(((long)hash_functions[i][0] * hash + hash_functions[i][1]) % + large_prime % B); + }; + + void set_values(int **hash_functions_arg, int l_arg, int B_arg) { + hash_functions = hash_functions_arg; + l = l_arg; + B = B_arg; + + sketch_array = new int *[l]; + for (unsigned i = 0; i < l; i++) { + sketch_array[i] = new int[B]; + for (unsigned j = 0; j < B; j++) { + sketch_array[i][j] = 0; + } + } + }; + + public: + HeavyHittersSketch(int **hash_functions_arg, int l_arg, int B_arg) { + set_values(hash_functions_arg, l_arg, B_arg); + }; + + int update(Key key) { + unsigned long hash = hash_key(key); + + int mincount = 0; + unsigned hashed_location = 0; + for (unsigned i = 0; i < l; i++) { + hashed_location = location(i, hash); + sketch_array[i][hashed_location] = sketch_array[i][hashed_location] + 1; + if ((sketch_array[i][hashed_location] < mincount) or (i == 0)) { + mincount = sketch_array[i][hashed_location]; + } + } + + return mincount; + }; + + int estimate(Key key) { + unsigned long hash = hash_key(key); + + int mincount = 0; + unsigned hashed_location = 0; + for (unsigned i = 0; i < l; i++) { + hashed_location = location(i, hash); + if ((sketch_array[i][hashed_location] < mincount) or (i == 0)) { + mincount = sketch_array[i][hashed_location]; + } + } + + return mincount; + }; + + void reset(int **hash_functions_arg, int l_arg, int B_arg) { + set_values(hash_functions_arg, l_arg, B_arg); + }; +}; \ No newline at end of file diff --git a/kvs/include/kvs/kvs_handlers.hpp b/kvs/include/kvs/kvs_handlers.hpp index c64b20c6..b1cee1c2 100644 --- a/kvs/include/kvs/kvs_handlers.hpp +++ b/kvs/include/kvs/kvs_handlers.hpp @@ -54,7 +54,8 @@ void user_request_handler( map>& key_access_tracker, map& stored_key_map, map& key_replication_map, set& local_changeset, - ServerThread& wt, SerializerMap& serializers, SocketCache& pushers); + ServerThread& wt, SerializerMap& serializers, SocketCache& pushers, + AdaptiveThresholdHeavyHitters* sketch); void gossip_handler(unsigned& seed, string& serialized, map& global_hash_rings, @@ -74,7 +75,8 @@ void replication_response_handler( map>& key_access_tracker, map& stored_key_map, map& key_replication_map, set& local_changeset, - ServerThread& wt, SerializerMap& serializers, SocketCache& pushers); + ServerThread& wt, SerializerMap& serializers, SocketCache& pushers, + AdaptiveThresholdHeavyHitters* sketch); void replication_change_handler(Address public_ip, Address private_ip, unsigned thread_id, unsigned& seed, logger log, diff --git a/kvs/include/kvs_common.hpp b/kvs/include/kvs_common.hpp index 765692ff..4cb188bc 100644 --- a/kvs/include/kvs_common.hpp +++ b/kvs/include/kvs_common.hpp @@ -15,6 +15,7 @@ #ifndef KVS_INCLUDE_KVS_COMMON_HPP_ #define KVS_INCLUDE_KVS_COMMON_HPP_ +#include "adaptive_heavy_hitters.hpp" #include "kvs_types.hpp" const unsigned kMetadataReplicationFactor = 1; diff --git a/kvs/include/metadata.hpp b/kvs/include/metadata.hpp index 16841bc3..7681179d 100644 --- a/kvs/include/metadata.hpp +++ b/kvs/include/metadata.hpp @@ -97,7 +97,14 @@ inline bool is_metadata(Key key) { // NOTE: This needs to be here because it needs the definition of TierMetadata extern map kTierMetadata; -enum MetadataType { replication, server_stats, key_access, key_size }; +enum MetadataType { + replication, + server_stats, + key_access, + key_size, + key_access_hot, + key_access_cold +}; inline Key get_metadata_key(const ServerThread& st, unsigned tier_id, unsigned thread_num, MetadataType type) { @@ -106,6 +113,8 @@ inline Key get_metadata_key(const ServerThread& st, unsigned tier_id, switch (type) { case MetadataType::server_stats: metadata_type = "stats"; break; case MetadataType::key_access: metadata_type = "access"; break; + case MetadataType::key_access_hot: metadata_type = "hot_access"; break; + case MetadataType::key_access_cold: metadata_type = "cold_access"; break; case MetadataType::key_size: metadata_type = "size"; break; default: return ""; // this should never happen; see note below about diff --git a/kvs/include/monitor/monitoring_handlers.hpp b/kvs/include/monitor/monitoring_handlers.hpp index d8f4c195..dc586b1c 100644 --- a/kvs/include/monitor/monitoring_handlers.hpp +++ b/kvs/include/monitor/monitoring_handlers.hpp @@ -18,14 +18,16 @@ #include "hash_ring.hpp" #include "metadata.pb.h" -void membership_handler(logger log, string& serialized, - map& global_hash_rings, - unsigned& new_memory_count, unsigned& new_ebs_count, - TimePoint& grace_start, vector
& routing_ips, - StorageStats& memory_storage, StorageStats& ebs_storage, - OccupancyStats& memory_occupancy, - OccupancyStats& ebs_occupancy, - map>& key_access_frequency); +void membership_handler( + logger log, string& serialized, + map& global_hash_rings, unsigned& new_memory_count, + unsigned& new_ebs_count, TimePoint& grace_start, + vector
& routing_ips, StorageStats& memory_storage, + StorageStats& ebs_storage, OccupancyStats& memory_occupancy, + OccupancyStats& ebs_occupancy, + map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency); void depart_done_handler(logger log, string& serialized, map& departing_node_map, diff --git a/kvs/include/monitor/monitoring_utils.hpp b/kvs/include/monitor/monitoring_utils.hpp index 12033cc1..7d846bc7 100644 --- a/kvs/include/monitor/monitoring_utils.hpp +++ b/kvs/include/monitor/monitoring_utils.hpp @@ -50,6 +50,10 @@ struct SummaryStats { void clear() { key_access_mean = 0; key_access_std = 0; + hot_key_access_mean = 0; + hot_key_access_std = 0; + cold_key_access_mean = 0; + cold_key_access_std = 0; total_memory_access = 0; total_ebs_access = 0; total_memory_consumption = 0; @@ -74,6 +78,10 @@ struct SummaryStats { SummaryStats() { clear(); } double key_access_mean; double key_access_std; + double hot_key_access_mean; + double hot_key_access_std; + double cold_key_access_mean; + double cold_key_access_std; unsigned total_memory_access; unsigned total_ebs_access; unsigned long long total_memory_consumption; @@ -101,6 +109,8 @@ void collect_internal_stats( map& local_hash_rings, SocketCache& pushers, MonitoringThread& mt, zmq::socket_t& response_puller, logger log, unsigned& rid, map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency, map& key_size, StorageStats& memory_storage, StorageStats& ebs_storage, OccupancyStats& memory_occupancy, OccupancyStats& ebs_occupancy, AccessStats& memory_access, @@ -108,10 +118,14 @@ void collect_internal_stats( void compute_summary_stats( map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency, StorageStats& memory_storage, StorageStats& ebs_storage, OccupancyStats& memory_occupancy, OccupancyStats& ebs_occupancy, AccessStats& memory_access, AccessStats& ebs_access, - map& key_access_summary, SummaryStats& ss, logger log, + map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, SummaryStats& ss, logger log, unsigned& server_monitoring_epoch); void collect_external_stats(map& user_latency, diff --git a/kvs/include/monitor/policies.hpp b/kvs/include/monitor/policies.hpp index 83e4fda6..2e53f521 100644 --- a/kvs/include/monitor/policies.hpp +++ b/kvs/include/monitor/policies.hpp @@ -34,6 +34,8 @@ void movement_policy(logger log, map& global_hash_rings, Address management_ip, map& key_replication_map, map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, map& key_size, MonitoringThread& mt, SocketCache& pushers, zmq::socket_t& response_puller, vector
& routing_ips, unsigned& rid); @@ -44,7 +46,10 @@ void slo_policy(logger log, map& global_hash_rings, unsigned& memory_node_number, unsigned& adding_memory_node, bool& removing_memory_node, Address management_ip, map& key_replication_map, - map& key_access_summary, MonitoringThread& mt, + map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, + MonitoringThread& mt, map& departing_node_map, SocketCache& pushers, zmq::socket_t& response_puller, vector
& routing_ips, unsigned& rid, diff --git a/kvs/src/kvs/replication_response_handler.cpp b/kvs/src/kvs/replication_response_handler.cpp index 5b592dbf..2848e562 100644 --- a/kvs/src/kvs/replication_response_handler.cpp +++ b/kvs/src/kvs/replication_response_handler.cpp @@ -23,7 +23,8 @@ void replication_response_handler( map>& key_access_tracker, map& stored_key_map, map& key_replication_map, set& local_changeset, - ServerThread& wt, SerializerMap& serializers, SocketCache& pushers) { + ServerThread& wt, SerializerMap& serializers, SocketCache& pushers, + AdaptiveThresholdHeavyHitters* sketch) { KeyResponse response; response.ParseFromString(serialized); @@ -119,6 +120,7 @@ void replication_response_handler( process_put(key, request.lattice_type_, request.payload_, serializers[request.lattice_type_], stored_key_map); key_access_tracker[key].insert(now); + sketch->report_key(key); access_count += 1; local_changeset.insert(key); @@ -168,6 +170,7 @@ void replication_response_handler( } } key_access_tracker[key].insert(now); + sketch->report_key(key); access_count += 1; string serialized_response; diff --git a/kvs/src/kvs/server.cpp b/kvs/src/kvs/server.cpp index 4bfd2f3f..ccdb620b 100644 --- a/kvs/src/kvs/server.cpp +++ b/kvs/src/kvs/server.cpp @@ -101,10 +101,10 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, map key_replication_map; // ZMQ socket for asking kops server for IP addrs of functional nodes. - zmq::socket_t func_nodes_requester(context, ZMQ_REQ); - func_nodes_requester.setsockopt(ZMQ_SNDTIMEO, 1000); // 1s - func_nodes_requester.setsockopt(ZMQ_RCVTIMEO, 1000); // 1s - func_nodes_requester.connect(get_func_nodes_req_address(management_ip)); + // zmq::socket_t func_nodes_requester(context, ZMQ_REQ); + // func_nodes_requester.setsockopt(ZMQ_SNDTIMEO, 1000); // 1s + // func_nodes_requester.setsockopt(ZMQ_RCVTIMEO, 1000); // 1s + // func_nodes_requester.connect(get_func_nodes_req_address(management_ip)); // request server addresses from the seed node zmq::socket_t addr_requester(context, ZMQ_REQ); @@ -282,6 +282,9 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, unsigned long long working_time_map[9] = {0, 0, 0, 0, 0, 0, 0, 0, 0}; unsigned epoch = 0; + // Initialize a Heavy Hitters Sketch + AdaptiveThresholdHeavyHitters* sketch = new AdaptiveThresholdHeavyHitters(); + // enter event loop while (true) { kZmqUtil->poll(0, &pollitems); @@ -335,7 +338,7 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); auto time_elapsed = std::chrono::duration_cast( std::chrono::system_clock::now() - work_start) @@ -369,7 +372,7 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, seed, access_count, log, serialized, global_hash_rings, local_hash_rings, pending_requests, pending_gossip, key_access_tracker, stored_key_map, key_replication_map, - local_changeset, wt, serializers, pushers); + local_changeset, wt, serializers, pushers, sketch); auto time_elapsed = std::chrono::duration_cast( std::chrono::system_clock::now() - work_start) @@ -523,33 +526,22 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, kZmqUtil->send_string(serialized, &pushers[target_address]); } - // compute key access stats + // Get map of hot keys + std::unordered_map hot_key_map = sketch->get_hot_map(); KeyAccessData access; - auto current_time = std::chrono::system_clock::now(); - - for (const auto& key_access_pair : key_access_tracker) { + for (const auto& key_access_pair : hot_key_map) { Key key = key_access_pair.first; - auto access_times = key_access_pair.second; - - // garbage collect - for (const auto& time : access_times) { - if (std::chrono::duration_cast(current_time - - time) - .count() >= kKeyMonitoringThreshold) { - access_times.erase(time); - break; - } - } + unsigned count = key_access_pair.second; // update key_access_frequency KeyAccessData_KeyCount* tp = access.add_keys(); tp->set_key(key); - tp->set_access_count(access_times.size()); + tp->set_access_count(count); } - // report key access stats - key = - get_metadata_key(wt, kSelfTierId, wt.tid(), MetadataType::key_access); + // report hot key access stats + key = get_metadata_key(wt, kSelfTierId, wt.tid(), + MetadataType::key_access_hot); string serialized_access; access.SerializeToString(&serialized_access); @@ -571,6 +563,46 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, kZmqUtil->send_string(serialized, &pushers[target_address]); } + // Get map of cold keys + std::unordered_map cold_key_map = sketch->get_cold_map(); + KeyAccessData cold_access; + for (const auto& key_access_pair : cold_key_map) { + Key key = key_access_pair.first; + unsigned count = key_access_pair.second; + + // update key_access_frequency + KeyAccessData_KeyCount* tp = cold_access.add_keys(); + tp->set_key(key); + tp->set_access_count(count); + } + + // report hot key access stats + key = get_metadata_key(wt, kSelfTierId, wt.tid(), + MetadataType::key_access_cold); + string cold_serialized_access; + cold_access.SerializeToString(&cold_serialized_access); + + req.Clear(); + req.set_type(RequestType::PUT); + prepare_put_tuple(req, key, LatticeType::LWW, + serialize(ts, cold_serialized_access)); + + threads = kHashRingUtil->get_responsible_threads_metadata( + key, global_hash_rings[kMemoryTierId], + local_hash_rings[kMemoryTierId]); + + if (threads.size() != 0) { + Address target_address = + std::next(begin(threads), rand_r(&seed) % threads.size()) + ->key_request_connect_address(); + string serialized; + req.SerializeToString(&serialized); + kZmqUtil->send_string(serialized, &pushers[target_address]); + } + + // Reset sketch for next epoch + sketch->reset(); + KeySizeData primary_key_size; for (const auto& key_pair : stored_key_map) { if (is_primary_replica(key_pair.first, key_replication_map, @@ -609,10 +641,10 @@ void run(unsigned thread_id, Address public_ip, Address private_ip, // Get the most recent list of cache IPs. // (Actually gets the list of all current function executor nodes.) // (The message content doesn't matter here; it's an argless RPC call.) - kZmqUtil->send_string("", &func_nodes_requester); + // kZmqUtil->send_string("", &func_nodes_requester); // Get the response. KeySet func_nodes; - func_nodes.ParseFromString(kZmqUtil->recv_string(&func_nodes_requester)); + // func_nodes.ParseFromString(kZmqUtil->recv_string(&func_nodes_requester)); // Update extant_caches with the response. set
deleted_caches = std::move(extant_caches); diff --git a/kvs/src/kvs/user_request_handler.cpp b/kvs/src/kvs/user_request_handler.cpp index c5ab0a69..cf624e78 100644 --- a/kvs/src/kvs/user_request_handler.cpp +++ b/kvs/src/kvs/user_request_handler.cpp @@ -22,7 +22,8 @@ void user_request_handler( map>& key_access_tracker, map& stored_key_map, map& key_replication_map, set& local_changeset, - ServerThread& wt, SerializerMap& serializers, SocketCache& pushers) { + ServerThread& wt, SerializerMap& serializers, SocketCache& pushers, + AdaptiveThresholdHeavyHitters* sketch) { KeyRequest request; request.ParseFromString(serialized); @@ -114,6 +115,7 @@ void user_request_handler( } key_access_tracker[key].insert(std::chrono::system_clock::now()); + sketch->report_key(key); access_count += 1; } } else { diff --git a/kvs/src/monitor/membership_handler.cpp b/kvs/src/monitor/membership_handler.cpp index ce1d5648..2370e199 100644 --- a/kvs/src/monitor/membership_handler.cpp +++ b/kvs/src/monitor/membership_handler.cpp @@ -21,7 +21,9 @@ void membership_handler( vector
& routing_ips, StorageStats& memory_storage, StorageStats& ebs_storage, OccupancyStats& memory_occupancy, OccupancyStats& ebs_occupancy, - map>& key_access_frequency) { + map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency) { vector v; split(serialized, ':', v); @@ -76,7 +78,13 @@ void membership_handler( memory_occupancy.erase(new_server_private_ip); // NOTE: No const here because we are calling erase - for (auto& key_access_pair : key_access_frequency) { + for (auto& key_access_pair : hot_key_access_frequency) { + for (unsigned i = 0; i < kMemoryThreadCount; i++) { + key_access_pair.second.erase(new_server_private_ip + ":" + + std::to_string(i)); + } + } + for (auto& key_access_pair : cold_key_access_frequency) { for (unsigned i = 0; i < kMemoryThreadCount; i++) { key_access_pair.second.erase(new_server_private_ip + ":" + std::to_string(i)); @@ -87,7 +95,13 @@ void membership_handler( ebs_occupancy.erase(new_server_private_ip); // NOTE: No const here because we are calling erase - for (auto& key_access_pair : key_access_frequency) { + for (auto& key_access_pair : hot_key_access_frequency) { + for (unsigned i = 0; i < kEbsThreadCount; i++) { + key_access_pair.second.erase(new_server_private_ip + ":" + + std::to_string(i)); + } + } + for (auto& key_access_pair : cold_key_access_frequency) { for (unsigned i = 0; i < kEbsThreadCount; i++) { key_access_pair.second.erase(new_server_private_ip + ":" + std::to_string(i)); diff --git a/kvs/src/monitor/monitoring.cpp b/kvs/src/monitor/monitoring.cpp index 75b979d7..9241b947 100644 --- a/kvs/src/monitor/monitoring.cpp +++ b/kvs/src/monitor/monitoring.cpp @@ -92,8 +92,16 @@ int main(int argc, char *argv[]) { unsigned ebs_node_number; // keep track of the keys' access by worker address map> key_access_frequency; + // keep track of the hot keys' access by worker address + map> hot_key_access_frequency; + // keep track of the cold keys' access by worker address + map> cold_key_access_frequency; // keep track of the keys' access summary map key_access_summary; + // keep track of the hot keys' access summary + map hot_key_access_summary; + // keep track of the cold keys' access summary + map cold_key_access_summary; // keep track of the size of each key-value pair map key_size; // keep track of memory tier storage consumption @@ -176,7 +184,8 @@ int main(int argc, char *argv[]) { membership_handler(log, serialized, global_hash_rings, adding_memory_node, adding_ebs_node, grace_start, routing_ips, memory_storage, ebs_storage, memory_occupancy, - ebs_occupancy, key_access_frequency); + ebs_occupancy, key_access_frequency, + hot_key_access_frequency, cold_key_access_frequency); } // handle a depart done notification @@ -207,6 +216,10 @@ int main(int argc, char *argv[]) { // clear stats key_access_frequency.clear(); key_access_summary.clear(); + hot_key_access_frequency.clear(); + hot_key_access_summary.clear(); + cold_key_access_frequency.clear(); + cold_key_access_summary.clear(); memory_storage.clear(); ebs_storage.clear(); @@ -223,20 +236,29 @@ int main(int argc, char *argv[]) { // collect internal statistics collect_internal_stats( global_hash_rings, local_hash_rings, pushers, mt, response_puller, - log, rid, key_access_frequency, key_size, memory_storage, ebs_storage, + log, rid, key_access_frequency, hot_key_access_frequency, + cold_key_access_frequency, key_size, memory_storage, ebs_storage, memory_occupancy, ebs_occupancy, memory_accesses, ebs_accesses); // compute summary statistics - compute_summary_stats(key_access_frequency, memory_storage, ebs_storage, - memory_occupancy, ebs_occupancy, memory_accesses, - ebs_accesses, key_access_summary, ss, log, - server_monitoring_epoch); - + compute_summary_stats(key_access_frequency, hot_key_access_frequency, + cold_key_access_frequency, memory_storage, + ebs_storage, memory_occupancy, ebs_occupancy, + memory_accesses, ebs_accesses, key_access_summary, + hot_key_access_summary, cold_key_access_summary, ss, + log, server_monitoring_epoch); // collect external statistics collect_external_stats(user_latency, user_throughput, ss, log); // initialize replication factor for new keys - for (const auto &key_access_pair : key_access_summary) { + for (const auto &key_access_pair : hot_key_access_summary) { + Key key = key_access_pair.first; + if (!is_metadata(key) && + key_replication_map.find(key) == key_replication_map.end()) { + init_replication(key_replication_map, key); + } + } + for (const auto &key_access_pair : cold_key_access_summary) { Key key = key_access_pair.first; if (!is_metadata(key) && key_replication_map.find(key) == key_replication_map.end()) { @@ -253,12 +275,14 @@ int main(int argc, char *argv[]) { movement_policy(log, global_hash_rings, local_hash_rings, grace_start, ss, memory_node_number, ebs_node_number, adding_memory_node, adding_ebs_node, management_ip, key_replication_map, - key_access_summary, key_size, mt, pushers, + key_access_summary, hot_key_access_summary, + cold_key_access_summary, key_size, mt, pushers, response_puller, routing_ips, rid); slo_policy(log, global_hash_rings, local_hash_rings, grace_start, ss, memory_node_number, adding_memory_node, removing_memory_node, - management_ip, key_replication_map, key_access_summary, mt, + management_ip, key_replication_map, key_access_summary, + hot_key_access_summary, cold_key_access_summary, mt, departing_node_map, pushers, response_puller, routing_ips, rid, latency_miss_ratio_map); diff --git a/kvs/src/monitor/movement_policy.cpp b/kvs/src/monitor/movement_policy.cpp index e64fcd6f..74020143 100644 --- a/kvs/src/monitor/movement_policy.cpp +++ b/kvs/src/monitor/movement_policy.cpp @@ -23,6 +23,8 @@ void movement_policy(logger log, map& global_hash_rings, Address management_ip, map& key_replication_map, map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, map& key_size, MonitoringThread& mt, SocketCache& pushers, zmq::socket_t& response_puller, vector
& routing_ips, unsigned& rid) { @@ -36,7 +38,7 @@ void movement_policy(logger log, map& global_hash_rings, ss.total_memory_consumption); bool overflow = false; - for (const auto& key_access_pair : key_access_summary) { + for (const auto& key_access_pair : hot_key_access_summary) { Key key = key_access_pair.first; unsigned access_count = key_access_pair.second; @@ -89,7 +91,7 @@ void movement_policy(logger log, map& global_hash_rings, ss.total_ebs_consumption); overflow = false; - for (const auto& key_access_pair : key_access_summary) { + for (const auto& key_access_pair : cold_key_access_summary) { Key key = key_access_pair.first; unsigned access_count = key_access_pair.second; @@ -129,14 +131,12 @@ void movement_policy(logger log, map& global_hash_rings, // reduce the replication factor of some keys that are not so hot anymore KeyReplication minimum_rep = create_new_replication_vector(1, kMinimumReplicaNumber - 1, 1, 1); - for (const auto& key_access_pair : key_access_summary) { + for (const auto& key_access_pair : cold_key_access_summary) { Key key = key_access_pair.first; unsigned access_count = key_access_pair.second; - - if (!is_metadata(key) && access_count <= ss.key_access_mean && - !(key_replication_map[key] == minimum_rep)) { + if (!is_metadata(key) && !(key_replication_map[key] == minimum_rep)) { log->info("Key {} accessed {} times (threshold is {}).", key, - access_count, ss.key_access_mean); + access_count, ss.cold_key_access_mean + ss.cold_key_access_std); requests[key] = create_new_replication_vector(1, kMinimumReplicaNumber - 1, 1, 1); log->info("Dereplication for key {}. M: {}->{}. E: {}->{}", key, diff --git a/kvs/src/monitor/slo_policy.cpp b/kvs/src/monitor/slo_policy.cpp index 631b6b73..4932d4d1 100644 --- a/kvs/src/monitor/slo_policy.cpp +++ b/kvs/src/monitor/slo_policy.cpp @@ -21,7 +21,10 @@ void slo_policy(logger log, map& global_hash_rings, unsigned& memory_node_number, unsigned& adding_memory_node, bool& removing_memory_node, Address management_ip, map& key_replication_map, - map& key_access_summary, MonitoringThread& mt, + map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, + MonitoringThread& mt, map& departing_node_map, SocketCache& pushers, zmq::socket_t& response_puller, vector
& routing_ips, unsigned& rid, @@ -48,15 +51,14 @@ void slo_policy(logger log, map& global_hash_rings, } else { // hot key replication // find hot keys log->info("Classifying hot keys..."); - for (const auto& key_access_pair : key_access_summary) { + for (const auto& key_access_pair : hot_key_access_summary) { Key key = key_access_pair.first; unsigned access_count = key_access_pair.second; - if (!is_metadata(key) && - access_count > ss.key_access_mean + ss.key_access_std && + if (!is_metadata(key) && access_count > ss.hot_key_access_mean && latency_miss_ratio_map.find(key) != latency_miss_ratio_map.end()) { log->info("Key {} accessed {} times (threshold is {}).", key, - access_count, ss.key_access_mean + ss.key_access_std); + access_count, ss.hot_key_access_mean); unsigned target_rep_factor = key_replication_map[key].global_replication_[kMemoryTierId] * latency_miss_ratio_map[key].first; @@ -116,7 +118,29 @@ void slo_policy(logger log, map& global_hash_rings, if (time_elapsed > kGracePeriod) { // before sending remove command, first adjust relevant key's replication // factor - for (const auto& key_access_pair : key_access_summary) { + for (const auto& key_access_pair : hot_key_access_summary) { + Key key = key_access_pair.first; + + if (!is_metadata(key) && + key_replication_map[key].global_replication_[kMemoryTierId] == + (global_hash_rings[kMemoryTierId].size() / kVirtualThreadNum)) { + unsigned new_mem_rep = + key_replication_map[key].global_replication_[kMemoryTierId] - 1; + unsigned new_ebs_rep = + std::max(kMinimumReplicaNumber - new_mem_rep, (unsigned)0); + requests[key] = create_new_replication_vector( + new_mem_rep, new_ebs_rep, + key_replication_map[key].local_replication_[kMemoryTierId], + key_replication_map[key].local_replication_[kEbsTierId]); + log->info("Dereplication for key {}. M: {}->{}. E: {}->{}", key, + key_replication_map[key].global_replication_[kMemoryTierId], + requests[key].global_replication_[kMemoryTierId], + key_replication_map[key].global_replication_[kEbsTierId], + requests[key].global_replication_[kEbsTierId]); + } + } + + for (const auto& key_access_pair : cold_key_access_summary) { Key key = key_access_pair.first; if (!is_metadata(key) && diff --git a/kvs/src/monitor/stats_helpers.cpp b/kvs/src/monitor/stats_helpers.cpp index 8bb276ea..c159adf1 100644 --- a/kvs/src/monitor/stats_helpers.cpp +++ b/kvs/src/monitor/stats_helpers.cpp @@ -20,6 +20,8 @@ void collect_internal_stats( map& local_hash_rings, SocketCache& pushers, MonitoringThread& mt, zmq::socket_t& response_puller, logger log, unsigned& rid, map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency, map& key_size, StorageStats& memory_storage, StorageStats& ebs_storage, OccupancyStats& memory_occupancy, OccupancyStats& ebs_occupancy, AccessStats& memory_accesses, @@ -37,7 +39,13 @@ void collect_internal_stats( addr_request_map, mt.response_connect_address(), rid); - key = get_metadata_key(st, tier_id, i, MetadataType::key_access); + key = get_metadata_key(st, tier_id, i, MetadataType::key_access_hot); + prepare_metadata_get_request(key, global_hash_rings[kMemoryTierId], + local_hash_rings[kMemoryTierId], + addr_request_map, + mt.response_connect_address(), rid); + + key = get_metadata_key(st, tier_id, i, MetadataType::key_access_cold); prepare_metadata_get_request(key, global_hash_rings[kMemoryTierId], local_hash_rings[kMemoryTierId], addr_request_map, @@ -97,6 +105,28 @@ void collect_internal_stats( key_access_frequency[key][ip_pair + ":" + std::to_string(tid)] = key_count.access_count(); } + } else if (metadata_type == "hot_access") { + // deserialized the value + KeyAccessData access; + access.ParseFromString(lww_value.value()); + + for (const auto& key_count : access.keys()) { + Key key = key_count.key(); + hot_key_access_frequency[key] + [ip_pair + ":" + std::to_string(tid)] = + key_count.access_count(); + } + } else if (metadata_type == "cold_access") { + // deserialized the value + KeyAccessData access; + access.ParseFromString(lww_value.value()); + + for (const auto& key_count : access.keys()) { + Key key = key_count.key(); + cold_key_access_frequency[key] + [ip_pair + ":" + std::to_string(tid)] = + key_count.access_count(); + } } else if (metadata_type == "size") { // deserialized the size KeySizeData key_size_msg; @@ -122,17 +152,52 @@ void collect_internal_stats( void compute_summary_stats( map>& key_access_frequency, + map>& hot_key_access_frequency, + map>& cold_key_access_frequency, StorageStats& memory_storage, StorageStats& ebs_storage, OccupancyStats& memory_occupancy, OccupancyStats& ebs_occupancy, AccessStats& memory_accesses, AccessStats& ebs_accesses, - map& key_access_summary, SummaryStats& ss, logger log, + map& key_access_summary, + map& hot_key_access_summary, + map& cold_key_access_summary, SummaryStats& ss, logger log, unsigned& server_monitoring_epoch) { // compute key access summary unsigned cnt = 0; double mean = 0; double ms = 0; - for (const auto& key_access_pair : key_access_frequency) { + for (const auto& key_access_pair : hot_key_access_frequency) { + Key key = key_access_pair.first; + unsigned access_count = 0; + + for (const auto& per_machine_pair : key_access_pair.second) { + access_count += per_machine_pair.second; + } + + hot_key_access_summary[key] = access_count; + + if (access_count > 0) { + cnt += 1; + + double delta = access_count - mean; + mean += (double)delta / cnt; + + double delta2 = access_count - mean; + ms += delta * delta2; + } + } + + ss.hot_key_access_mean = mean; + ss.hot_key_access_std = sqrt((double)ms / cnt); + + log->info("Hot Access: mean={}, std={}", ss.hot_key_access_mean, + ss.hot_key_access_std); + + cnt = 0; + mean = 0; + ms = 0; + + for (const auto& key_access_pair : cold_key_access_frequency) { Key key = key_access_pair.first; unsigned access_count = 0; @@ -140,7 +205,7 @@ void compute_summary_stats( access_count += per_machine_pair.second; } - key_access_summary[key] = access_count; + cold_key_access_summary[key] = access_count; if (access_count > 0) { cnt += 1; @@ -153,10 +218,11 @@ void compute_summary_stats( } } - ss.key_access_mean = mean; - ss.key_access_std = sqrt((double)ms / cnt); + ss.cold_key_access_mean = mean; + ss.cold_key_access_std = sqrt((double)ms / cnt); - log->info("Access: mean={}, std={}", ss.key_access_mean, ss.key_access_std); + log->info("Cold Access: mean={}, std={}", ss.cold_key_access_mean, + ss.cold_key_access_std); // compute tier access summary for (const auto& accesses : memory_accesses) { diff --git a/kvs/tests/kvs/test_user_request_handler.hpp b/kvs/tests/kvs/test_user_request_handler.hpp index bfaf6014..fa583540 100644 --- a/kvs/tests/kvs/test_user_request_handler.hpp +++ b/kvs/tests/kvs/test_user_request_handler.hpp @@ -14,6 +14,8 @@ #include "kvs/kvs_handlers.hpp" +AdaptiveThresholdHeavyHitters* sketch = new AdaptiveThresholdHeavyHitters(); + TEST_F(ServerHandlerTest, UserGetLWWTest) { Key key = "key"; string value = "value"; @@ -30,7 +32,7 @@ TEST_F(ServerHandlerTest, UserGetLWWTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -71,7 +73,7 @@ TEST_F(ServerHandlerTest, UserGetSetTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -113,7 +115,7 @@ TEST_F(ServerHandlerTest, UserGetOrderedSetTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -158,7 +160,7 @@ TEST_F(ServerHandlerTest, UserGetCausalTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -224,7 +226,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetLWWTest) { user_request_handler(access_count, seed, put_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -249,7 +251,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetLWWTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 2); @@ -287,7 +289,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetSetTest) { user_request_handler(access_count, seed, put_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -312,7 +314,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetSetTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 2); @@ -350,7 +352,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetOrderedSetTest) { user_request_handler(access_count, seed, put_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -375,7 +377,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetOrderedSetTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 2); @@ -416,7 +418,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetCausalTest) { user_request_handler(access_count, seed, put_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); vector messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 1); @@ -441,7 +443,7 @@ TEST_F(ServerHandlerTest, UserPutAndGetCausalTest) { user_request_handler(access_count, seed, get_request, log_, global_hash_rings, local_hash_rings, pending_requests, key_access_tracker, stored_key_map, key_replication_map, local_changeset, wt, - serializers, pushers); + serializers, pushers, sketch); messages = get_zmq_messages(); EXPECT_EQ(messages.size(), 2); diff --git a/scripts/start-kvs-local.sh b/scripts/start-kvs-local.sh index 5170d1a4..a2b288da 100755 --- a/scripts/start-kvs-local.sh +++ b/scripts/start-kvs-local.sh @@ -40,5 +40,5 @@ echo $RPID >> pids echo $SPID >> pids if [ "$2" = "y" ] || [ "$2" = "yes" ]; then - ./build/kvs/src/cli/flkvs-cli + ./build/kvs/client/cpp/flkvs-cli conf/kvs-config.yml fi